##// END OF EJS Templates
wireproto: implement command executor interface for version 1 peers...
Gregory Szorc -
r37648:e1b32dc4 default
parent child Browse files
Show More
@@ -11,6 +11,7 b' import errno'
11 import hashlib
11 import hashlib
12 import os
12 import os
13 import random
13 import random
14 import sys
14 import time
15 import time
15 import weakref
16 import weakref
16
17
@@ -167,6 +168,49 b' class localiterbatcher(wireprotov1peer.i'
167 resref.set(getattr(self.local, name)(*args, **opts))
168 resref.set(getattr(self.local, name)(*args, **opts))
168 yield resref.value
169 yield resref.value
169
170
171 @zi.implementer(repository.ipeercommandexecutor)
172 class localcommandexecutor(object):
173 def __init__(self, peer):
174 self._peer = peer
175 self._sent = False
176 self._closed = False
177
178 def __enter__(self):
179 return self
180
181 def __exit__(self, exctype, excvalue, exctb):
182 self.close()
183
184 def callcommand(self, command, args):
185 if self._sent:
186 raise error.ProgrammingError('callcommand() cannot be used after '
187 'sendcommands()')
188
189 if self._closed:
190 raise error.ProgrammingError('callcommand() cannot be used after '
191 'close()')
192
193 # We don't need to support anything fancy. Just call the named
194 # method on the peer and return a resolved future.
195 fn = getattr(self._peer, pycompat.sysstr(command))
196
197 f = pycompat.futures.Future()
198
199 try:
200 result = fn(**args)
201 except Exception:
202 f.set_exception_info(*sys.exc_info()[1:])
203 else:
204 f.set_result(result)
205
206 return f
207
208 def sendcommands(self):
209 self._sent = True
210
211 def close(self):
212 self._closed = True
213
170 class localpeer(repository.peer):
214 class localpeer(repository.peer):
171 '''peer for a local repo; reflects only the most recent API'''
215 '''peer for a local repo; reflects only the most recent API'''
172
216
@@ -286,6 +330,9 b' class localpeer(repository.peer):'
286
330
287 # Begin of peer interface.
331 # Begin of peer interface.
288
332
333 def commandexecutor(self):
334 return localcommandexecutor(self)
335
289 def iterbatch(self):
336 def iterbatch(self):
290 return localiterbatcher(self)
337 return localiterbatcher(self)
291
338
@@ -278,7 +278,8 b' class ipeerrequests(zi.Interface):'
278 being issued.
278 being issued.
279 """
279 """
280
280
281 class ipeerbase(ipeerconnection, ipeercapabilities, ipeercommands):
281 class ipeerbase(ipeerconnection, ipeercapabilities, ipeercommands,
282 ipeerrequests):
282 """Unified interface for peer repositories.
283 """Unified interface for peer repositories.
283
284
284 All peer instances must conform to this interface.
285 All peer instances must conform to this interface.
@@ -228,7 +228,12 b' def findcommonheads(ui, local, remote,'
228 % (roundtrips, len(undecided), len(sample)))
228 % (roundtrips, len(undecided), len(sample)))
229 # indices between sample and externalized version must match
229 # indices between sample and externalized version must match
230 sample = list(sample)
230 sample = list(sample)
231 yesno = remote.known(dag.externalizeall(sample))
231
232 with remote.commandexecutor() as e:
233 yesno = e.callcommand('known', {
234 'nodes': dag.externalizeall(sample),
235 }).result()
236
232 full = True
237 full = True
233
238
234 if sample:
239 if sample:
@@ -8,12 +8,15 b''
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import sys
11
12
12 from .i18n import _
13 from .i18n import _
13 from .node import (
14 from .node import (
14 bin,
15 bin,
15 )
16 )
16
17 from .thirdparty.zope import (
18 interface as zi,
19 )
17 from . import (
20 from . import (
18 bundle2,
21 bundle2,
19 changegroup as changegroupmod,
22 changegroup as changegroupmod,
@@ -177,6 +180,93 b' def encodebatchcmds(req):'
177
180
178 return ';'.join(cmds)
181 return ';'.join(cmds)
179
182
183 @zi.implementer(repository.ipeercommandexecutor)
184 class peerexecutor(object):
185 def __init__(self, peer):
186 self._peer = peer
187 self._sent = False
188 self._closed = False
189 self._calls = []
190
191 def __enter__(self):
192 return self
193
194 def __exit__(self, exctype, excvalee, exctb):
195 self.close()
196
197 def callcommand(self, command, args):
198 if self._sent:
199 raise error.ProgrammingError('callcommand() cannot be used '
200 'after commands are sent')
201
202 if self._closed:
203 raise error.ProgrammingError('callcommand() cannot be used '
204 'after close()')
205
206 # Commands are dispatched through methods on the peer.
207 fn = getattr(self._peer, pycompat.sysstr(command), None)
208
209 if not fn:
210 raise error.ProgrammingError(
211 'cannot call command %s: method of same name not available '
212 'on peer' % command)
213
214 # Commands are either batchable or they aren't. If a command
215 # isn't batchable, we send it immediately because the executor
216 # can no longer accept new commands after a non-batchable command.
217 # If a command is batchable, we queue it for later.
218
219 if getattr(fn, 'batchable', False):
220 pass
221 else:
222 if self._calls:
223 raise error.ProgrammingError(
224 '%s is not batchable and cannot be called on a command '
225 'executor along with other commands' % command)
226
227 # We don't support batching yet. So resolve it immediately.
228 f = pycompat.futures.Future()
229 self._calls.append((command, args, fn, f))
230 self.sendcommands()
231 return f
232
233 def sendcommands(self):
234 if self._sent:
235 return
236
237 if not self._calls:
238 return
239
240 self._sent = True
241
242 calls = self._calls
243 # Mainly to destroy references to futures.
244 self._calls = None
245
246 if len(calls) == 1:
247 command, args, fn, f = calls[0]
248
249 # Future was cancelled. Ignore it.
250 if not f.set_running_or_notify_cancel():
251 return
252
253 try:
254 result = fn(**pycompat.strkwargs(args))
255 except Exception:
256 f.set_exception_info(*sys.exc_info()[1:])
257 else:
258 f.set_result(result)
259
260 return
261
262 raise error.ProgrammingError('support for multiple commands not '
263 'yet implemented')
264
265 def close(self):
266 self.sendcommands()
267
268 self._closed = True
269
180 class wirepeer(repository.legacypeer):
270 class wirepeer(repository.legacypeer):
181 """Client-side interface for communicating with a peer repository.
271 """Client-side interface for communicating with a peer repository.
182
272
@@ -185,6 +275,9 b' class wirepeer(repository.legacypeer):'
185 See also httppeer.py and sshpeer.py for protocol-specific
275 See also httppeer.py and sshpeer.py for protocol-specific
186 implementations of this interface.
276 implementations of this interface.
187 """
277 """
278 def commandexecutor(self):
279 return peerexecutor(self)
280
188 # Begin of ipeercommands interface.
281 # Begin of ipeercommands interface.
189
282
190 def iterbatch(self):
283 def iterbatch(self):
@@ -23,6 +23,7 b' from mercurial import ('
23 vfs as vfsmod,
23 vfs as vfsmod,
24 wireprotoserver,
24 wireprotoserver,
25 wireprototypes,
25 wireprototypes,
26 wireprotov1peer,
26 wireprotov2server,
27 wireprotov2server,
27 )
28 )
28
29
@@ -102,6 +103,14 b' def main():'
102 localrepo.localpeer)
103 localrepo.localpeer)
103 checkzobject(localrepo.localpeer(dummyrepo()))
104 checkzobject(localrepo.localpeer(dummyrepo()))
104
105
106 ziverify.verifyClass(repository.ipeercommandexecutor,
107 localrepo.localcommandexecutor)
108 checkzobject(localrepo.localcommandexecutor(None))
109
110 ziverify.verifyClass(repository.ipeercommandexecutor,
111 wireprotov1peer.peerexecutor)
112 checkzobject(wireprotov1peer.peerexecutor(None))
113
105 ziverify.verifyClass(repository.ipeerbaselegacycommands,
114 ziverify.verifyClass(repository.ipeerbaselegacycommands,
106 sshpeer.sshv1peer)
115 sshpeer.sshv1peer)
107 checkzobject(sshpeer.sshv1peer(ui, 'ssh://localhost/foo', None, dummypipe(),
116 checkzobject(sshpeer.sshv1peer(ui, 'ssh://localhost/foo', None, dummypipe(),
General Comments 0
You need to be logged in to leave comments. Login now