##// END OF EJS Templates
wireproto: remove iterbatch() from peer interface (API)...
Gregory Szorc -
r37651:33a6eee0 default
parent child Browse files
Show More
@@ -66,7 +66,6 b' from . import ('
66 66 txnutil,
67 67 util,
68 68 vfs as vfsmod,
69 wireprotov1peer,
70 69 )
71 70 from .utils import (
72 71 procutil,
@@ -154,20 +153,6 b" moderncaps = {'lookup', 'branchmap', 'pu"
154 153 'unbundle'}
155 154 legacycaps = moderncaps.union({'changegroupsubset'})
156 155
157 class localiterbatcher(wireprotov1peer.iterbatcher):
158 def __init__(self, local):
159 super(localiterbatcher, self).__init__()
160 self.local = local
161
162 def submit(self):
163 # submit for a local iter batcher is a noop
164 pass
165
166 def results(self):
167 for name, args, opts, resref in self.calls:
168 resref.set(getattr(self.local, name)(*args, **opts))
169 yield resref.value
170
171 156 @zi.implementer(repository.ipeercommandexecutor)
172 157 class localcommandexecutor(object):
173 158 def __init__(self, peer):
@@ -333,9 +318,6 b' class localpeer(repository.peer):'
333 318 def commandexecutor(self):
334 319 return localcommandexecutor(self)
335 320
336 def iterbatch(self):
337 return localiterbatcher(self)
338
339 321 # End of peer interface.
340 322
341 323 class locallegacypeer(repository.legacypeer, localpeer):
@@ -284,29 +284,6 b' class ipeerbase(ipeerconnection, ipeerca'
284 284
285 285 All peer instances must conform to this interface.
286 286 """
287 def iterbatch():
288 """Obtain an object to be used for multiple method calls.
289
290 Various operations call several methods on peer instances. If each
291 method call were performed immediately and serially, this would
292 require round trips to remote peers and/or would slow down execution.
293
294 Some peers have the ability to "batch" method calls to avoid costly
295 round trips or to facilitate concurrent execution.
296
297 This method returns an object that can be used to indicate intent to
298 perform batched method calls.
299
300 The returned object is a proxy of this peer. It intercepts calls to
301 batchable methods and queues them instead of performing them
302 immediately. This proxy object has a ``submit`` method that will
303 perform all queued batchable method calls. A ``results()`` method
304 exposes the results of queued/batched method calls. It is a generator
305 of results in the order they were called.
306
307 Not all peers or wire protocol implementations may actually batch method
308 calls. However, they must all support this API.
309 """
310 287
311 288 class ipeerbaselegacycommands(ipeerbase, ipeerlegacycommands):
312 289 """Unified peer interface that supports legacy commands."""
@@ -73,97 +73,6 b' class future(object):'
73 73 raise error.RepoError("future is already set")
74 74 self.value = value
75 75
76 class batcher(object):
77 '''base class for batches of commands submittable in a single request
78
79 All methods invoked on instances of this class are simply queued and
80 return a a future for the result. Once you call submit(), all the queued
81 calls are performed and the results set in their respective futures.
82 '''
83 def __init__(self):
84 self.calls = []
85 def __getattr__(self, name):
86 def call(*args, **opts):
87 resref = future()
88 # Please don't invent non-ascii method names, or you will
89 # give core hg a very sad time.
90 self.calls.append((name.encode('ascii'), args, opts, resref,))
91 return resref
92 return call
93 def submit(self):
94 raise NotImplementedError()
95
96 class iterbatcher(batcher):
97
98 def submit(self):
99 raise NotImplementedError()
100
101 def results(self):
102 raise NotImplementedError()
103
104 class remoteiterbatcher(iterbatcher):
105 def __init__(self, remote):
106 super(remoteiterbatcher, self).__init__()
107 self._remote = remote
108
109 def __getattr__(self, name):
110 # Validate this method is batchable, since submit() only supports
111 # batchable methods.
112 fn = getattr(self._remote, name)
113 if not getattr(fn, 'batchable', None):
114 raise error.ProgrammingError('Attempted to batch a non-batchable '
115 'call to %r' % name)
116
117 return super(remoteiterbatcher, self).__getattr__(name)
118
119 def submit(self):
120 """Break the batch request into many patch calls and pipeline them.
121
122 This is mostly valuable over http where request sizes can be
123 limited, but can be used in other places as well.
124 """
125 # 2-tuple of (command, arguments) that represents what will be
126 # sent over the wire.
127 requests = []
128
129 # 4-tuple of (command, final future, @batchable generator, remote
130 # future).
131 results = []
132
133 for command, args, opts, finalfuture in self.calls:
134 mtd = getattr(self._remote, command)
135 batchable = mtd.batchable(mtd.__self__, *args, **opts)
136
137 commandargs, fremote = next(batchable)
138 assert fremote
139 requests.append((command, commandargs))
140 results.append((command, finalfuture, batchable, fremote))
141
142 if requests:
143 self._resultiter = self._remote._submitbatch(requests)
144
145 self._results = results
146
147 def results(self):
148 for command, finalfuture, batchable, remotefuture in self._results:
149 # Get the raw result, set it in the remote future, feed it
150 # back into the @batchable generator so it can be decoded, and
151 # set the result on the final future to this value.
152 remoteresult = next(self._resultiter)
153 remotefuture.set(remoteresult)
154 finalfuture.set(next(batchable))
155
156 # Verify our @batchable generators only emit 2 values.
157 try:
158 next(batchable)
159 except StopIteration:
160 pass
161 else:
162 raise error.ProgrammingError('%s @batchable generator emitted '
163 'unexpected value count' % command)
164
165 yield finalfuture.value
166
167 76 def encodebatchcmds(req):
168 77 """Return a ``cmds`` argument value for the ``batch`` command."""
169 78 escapearg = wireprototypes.escapebatcharg
@@ -412,9 +321,6 b' class wirepeer(repository.legacypeer):'
412 321
413 322 # Begin of ipeercommands interface.
414 323
415 def iterbatch(self):
416 return remoteiterbatcher(self)
417
418 324 @batchable
419 325 def lookup(self, key):
420 326 self.requirecap('lookup', _('look up remote revision'))
@@ -7,10 +7,10 b''
7 7
8 8 from __future__ import absolute_import, print_function
9 9
10 import contextlib
11
10 12 from mercurial import (
11 error,
12 13 localrepo,
13 util,
14 14 wireprotov1peer,
15 15
16 16 )
@@ -30,9 +30,14 b' class localthing(thing):'
30 30 return "%s und %s" % (b, a,)
31 31 def greet(self, name=None):
32 32 return "Hello, %s" % name
33 def batchiter(self):
34 '''Support for local batching.'''
35 return localrepo.localiterbatcher(self)
33
34 @contextlib.contextmanager
35 def commandexecutor(self):
36 e = localrepo.localcommandexecutor(self)
37 try:
38 yield e
39 finally:
40 e.close()
36 41
37 42 # usage of "thing" interface
38 43 def use(it):
@@ -45,52 +50,15 b' def use(it):'
45 50 print(it.bar("Eins", "Zwei"))
46 51
47 52 # Batched call to a couple of proxied methods.
48 batch = it.batchiter()
49 # The calls return futures to eventually hold results.
50 foo = batch.foo(one="One", two="Two")
51 bar = batch.bar("Eins", "Zwei")
52 bar2 = batch.bar(b="Uno", a="Due")
53
54 # Future shouldn't be set until we submit().
55 assert isinstance(foo, wireprotov1peer.future)
56 assert not util.safehasattr(foo, 'value')
57 assert not util.safehasattr(bar, 'value')
58 batch.submit()
59 # Call results() to obtain results as a generator.
60 results = batch.results()
61 53
62 # Future results shouldn't be set until we consume a value.
63 assert not util.safehasattr(foo, 'value')
64 foovalue = next(results)
65 assert util.safehasattr(foo, 'value')
66 assert foovalue == foo.value
67 print(foo.value)
68 next(results)
69 print(bar.value)
70 next(results)
71 print(bar2.value)
54 with it.commandexecutor() as e:
55 ffoo = e.callcommand('foo', {'one': 'One', 'two': 'Two'})
56 fbar = e.callcommand('bar', {'b': 'Eins', 'a': 'Zwei'})
57 fbar2 = e.callcommand('bar', {'b': 'Uno', 'a': 'Due'})
72 58
73 # We should be at the end of the results generator.
74 try:
75 next(results)
76 except StopIteration:
77 print('proper end of results generator')
78 else:
79 print('extra emitted element!')
80
81 # Attempting to call a non-batchable method inside a batch fails.
82 batch = it.batchiter()
83 try:
84 batch.greet(name='John Smith')
85 except error.ProgrammingError as e:
86 print(e)
87
88 # Attempting to call a local method inside a batch fails.
89 batch = it.batchiter()
90 try:
91 batch.hello()
92 except error.ProgrammingError as e:
93 print(e)
59 print(ffoo.result())
60 print(fbar.result())
61 print(fbar2.result())
94 62
95 63 # local usage
96 64 mylocal = localthing()
@@ -177,8 +145,13 b' class remotething(thing):'
177 145 for r in res.split(';'):
178 146 yield r
179 147
180 def batchiter(self):
181 return wireprotov1peer.remoteiterbatcher(self)
148 @contextlib.contextmanager
149 def commandexecutor(self):
150 e = wireprotov1peer.peerexecutor(self)
151 try:
152 yield e
153 finally:
154 e.close()
182 155
183 156 @wireprotov1peer.batchable
184 157 def foo(self, one, two=None):
@@ -6,7 +6,6 b' Eins und Zwei'
6 6 One and Two
7 7 Eins und Zwei
8 8 Uno und Due
9 proper end of results generator
10 9
11 10 == Remote
12 11 Ready.
@@ -21,6 +20,3 b' REQ: batch?cmds=foo:one=Pof,two=Uxp;bar:'
21 20 One and Two
22 21 Eins und Zwei
23 22 Uno und Due
24 proper end of results generator
25 Attempted to batch a non-batchable call to 'greet'
26 Attempted to batch a non-batchable call to 'hello'
@@ -93,7 +93,9 b' srv = serverrepo()'
93 93 clt = clientpeer(srv, uimod.ui())
94 94
95 95 print(clt.greet(b"Foobar"))
96 b = clt.iterbatch()
97 list(map(b.greet, (b'Fo, =;:<o', b'Bar')))
98 b.submit()
99 print([r for r in b.results()])
96
97 with clt.commandexecutor() as e:
98 fgreet1 = e.callcommand(b'greet', {b'name': b'Fo, =;:<o'})
99 fgreet2 = e.callcommand(b'greet', {b'name': b'Bar'})
100
101 print([f.result() for f in (fgreet1, fgreet2)])
General Comments 0
You need to be logged in to leave comments. Login now