##// END OF EJS Templates
wireproto: remove iterbatch() from peer interface (API)...
Gregory Szorc -
r37651:33a6eee0 default
parent child Browse files
Show More
@@ -66,7 +66,6 b' from . import ('
66 txnutil,
66 txnutil,
67 util,
67 util,
68 vfs as vfsmod,
68 vfs as vfsmod,
69 wireprotov1peer,
70 )
69 )
71 from .utils import (
70 from .utils import (
72 procutil,
71 procutil,
@@ -154,20 +153,6 b" moderncaps = {'lookup', 'branchmap', 'pu"
154 'unbundle'}
153 'unbundle'}
155 legacycaps = moderncaps.union({'changegroupsubset'})
154 legacycaps = moderncaps.union({'changegroupsubset'})
156
155
157 class localiterbatcher(wireprotov1peer.iterbatcher):
158 def __init__(self, local):
159 super(localiterbatcher, self).__init__()
160 self.local = local
161
162 def submit(self):
163 # submit for a local iter batcher is a noop
164 pass
165
166 def results(self):
167 for name, args, opts, resref in self.calls:
168 resref.set(getattr(self.local, name)(*args, **opts))
169 yield resref.value
170
171 @zi.implementer(repository.ipeercommandexecutor)
156 @zi.implementer(repository.ipeercommandexecutor)
172 class localcommandexecutor(object):
157 class localcommandexecutor(object):
173 def __init__(self, peer):
158 def __init__(self, peer):
@@ -333,9 +318,6 b' class localpeer(repository.peer):'
333 def commandexecutor(self):
318 def commandexecutor(self):
334 return localcommandexecutor(self)
319 return localcommandexecutor(self)
335
320
336 def iterbatch(self):
337 return localiterbatcher(self)
338
339 # End of peer interface.
321 # End of peer interface.
340
322
341 class locallegacypeer(repository.legacypeer, localpeer):
323 class locallegacypeer(repository.legacypeer, localpeer):
@@ -284,29 +284,6 b' class ipeerbase(ipeerconnection, ipeerca'
284
284
285 All peer instances must conform to this interface.
285 All peer instances must conform to this interface.
286 """
286 """
287 def iterbatch():
288 """Obtain an object to be used for multiple method calls.
289
290 Various operations call several methods on peer instances. If each
291 method call were performed immediately and serially, this would
292 require round trips to remote peers and/or would slow down execution.
293
294 Some peers have the ability to "batch" method calls to avoid costly
295 round trips or to facilitate concurrent execution.
296
297 This method returns an object that can be used to indicate intent to
298 perform batched method calls.
299
300 The returned object is a proxy of this peer. It intercepts calls to
301 batchable methods and queues them instead of performing them
302 immediately. This proxy object has a ``submit`` method that will
303 perform all queued batchable method calls. A ``results()`` method
304 exposes the results of queued/batched method calls. It is a generator
305 of results in the order they were called.
306
307 Not all peers or wire protocol implementations may actually batch method
308 calls. However, they must all support this API.
309 """
310
287
311 class ipeerbaselegacycommands(ipeerbase, ipeerlegacycommands):
288 class ipeerbaselegacycommands(ipeerbase, ipeerlegacycommands):
312 """Unified peer interface that supports legacy commands."""
289 """Unified peer interface that supports legacy commands."""
@@ -73,97 +73,6 b' class future(object):'
73 raise error.RepoError("future is already set")
73 raise error.RepoError("future is already set")
74 self.value = value
74 self.value = value
75
75
76 class batcher(object):
77 '''base class for batches of commands submittable in a single request
78
79 All methods invoked on instances of this class are simply queued and
80 return a a future for the result. Once you call submit(), all the queued
81 calls are performed and the results set in their respective futures.
82 '''
83 def __init__(self):
84 self.calls = []
85 def __getattr__(self, name):
86 def call(*args, **opts):
87 resref = future()
88 # Please don't invent non-ascii method names, or you will
89 # give core hg a very sad time.
90 self.calls.append((name.encode('ascii'), args, opts, resref,))
91 return resref
92 return call
93 def submit(self):
94 raise NotImplementedError()
95
96 class iterbatcher(batcher):
97
98 def submit(self):
99 raise NotImplementedError()
100
101 def results(self):
102 raise NotImplementedError()
103
104 class remoteiterbatcher(iterbatcher):
105 def __init__(self, remote):
106 super(remoteiterbatcher, self).__init__()
107 self._remote = remote
108
109 def __getattr__(self, name):
110 # Validate this method is batchable, since submit() only supports
111 # batchable methods.
112 fn = getattr(self._remote, name)
113 if not getattr(fn, 'batchable', None):
114 raise error.ProgrammingError('Attempted to batch a non-batchable '
115 'call to %r' % name)
116
117 return super(remoteiterbatcher, self).__getattr__(name)
118
119 def submit(self):
120 """Break the batch request into many patch calls and pipeline them.
121
122 This is mostly valuable over http where request sizes can be
123 limited, but can be used in other places as well.
124 """
125 # 2-tuple of (command, arguments) that represents what will be
126 # sent over the wire.
127 requests = []
128
129 # 4-tuple of (command, final future, @batchable generator, remote
130 # future).
131 results = []
132
133 for command, args, opts, finalfuture in self.calls:
134 mtd = getattr(self._remote, command)
135 batchable = mtd.batchable(mtd.__self__, *args, **opts)
136
137 commandargs, fremote = next(batchable)
138 assert fremote
139 requests.append((command, commandargs))
140 results.append((command, finalfuture, batchable, fremote))
141
142 if requests:
143 self._resultiter = self._remote._submitbatch(requests)
144
145 self._results = results
146
147 def results(self):
148 for command, finalfuture, batchable, remotefuture in self._results:
149 # Get the raw result, set it in the remote future, feed it
150 # back into the @batchable generator so it can be decoded, and
151 # set the result on the final future to this value.
152 remoteresult = next(self._resultiter)
153 remotefuture.set(remoteresult)
154 finalfuture.set(next(batchable))
155
156 # Verify our @batchable generators only emit 2 values.
157 try:
158 next(batchable)
159 except StopIteration:
160 pass
161 else:
162 raise error.ProgrammingError('%s @batchable generator emitted '
163 'unexpected value count' % command)
164
165 yield finalfuture.value
166
167 def encodebatchcmds(req):
76 def encodebatchcmds(req):
168 """Return a ``cmds`` argument value for the ``batch`` command."""
77 """Return a ``cmds`` argument value for the ``batch`` command."""
169 escapearg = wireprototypes.escapebatcharg
78 escapearg = wireprototypes.escapebatcharg
@@ -412,9 +321,6 b' class wirepeer(repository.legacypeer):'
412
321
413 # Begin of ipeercommands interface.
322 # Begin of ipeercommands interface.
414
323
415 def iterbatch(self):
416 return remoteiterbatcher(self)
417
418 @batchable
324 @batchable
419 def lookup(self, key):
325 def lookup(self, key):
420 self.requirecap('lookup', _('look up remote revision'))
326 self.requirecap('lookup', _('look up remote revision'))
@@ -7,10 +7,10 b''
7
7
8 from __future__ import absolute_import, print_function
8 from __future__ import absolute_import, print_function
9
9
10 import contextlib
11
10 from mercurial import (
12 from mercurial import (
11 error,
12 localrepo,
13 localrepo,
13 util,
14 wireprotov1peer,
14 wireprotov1peer,
15
15
16 )
16 )
@@ -30,9 +30,14 b' class localthing(thing):'
30 return "%s und %s" % (b, a,)
30 return "%s und %s" % (b, a,)
31 def greet(self, name=None):
31 def greet(self, name=None):
32 return "Hello, %s" % name
32 return "Hello, %s" % name
33 def batchiter(self):
33
34 '''Support for local batching.'''
34 @contextlib.contextmanager
35 return localrepo.localiterbatcher(self)
35 def commandexecutor(self):
36 e = localrepo.localcommandexecutor(self)
37 try:
38 yield e
39 finally:
40 e.close()
36
41
37 # usage of "thing" interface
42 # usage of "thing" interface
38 def use(it):
43 def use(it):
@@ -45,52 +50,15 b' def use(it):'
45 print(it.bar("Eins", "Zwei"))
50 print(it.bar("Eins", "Zwei"))
46
51
47 # Batched call to a couple of proxied methods.
52 # Batched call to a couple of proxied methods.
48 batch = it.batchiter()
49 # The calls return futures to eventually hold results.
50 foo = batch.foo(one="One", two="Two")
51 bar = batch.bar("Eins", "Zwei")
52 bar2 = batch.bar(b="Uno", a="Due")
53
54 # Future shouldn't be set until we submit().
55 assert isinstance(foo, wireprotov1peer.future)
56 assert not util.safehasattr(foo, 'value')
57 assert not util.safehasattr(bar, 'value')
58 batch.submit()
59 # Call results() to obtain results as a generator.
60 results = batch.results()
61
53
62 # Future results shouldn't be set until we consume a value.
54 with it.commandexecutor() as e:
63 assert not util.safehasattr(foo, 'value')
55 ffoo = e.callcommand('foo', {'one': 'One', 'two': 'Two'})
64 foovalue = next(results)
56 fbar = e.callcommand('bar', {'b': 'Eins', 'a': 'Zwei'})
65 assert util.safehasattr(foo, 'value')
57 fbar2 = e.callcommand('bar', {'b': 'Uno', 'a': 'Due'})
66 assert foovalue == foo.value
67 print(foo.value)
68 next(results)
69 print(bar.value)
70 next(results)
71 print(bar2.value)
72
58
73 # We should be at the end of the results generator.
59 print(ffoo.result())
74 try:
60 print(fbar.result())
75 next(results)
61 print(fbar2.result())
76 except StopIteration:
77 print('proper end of results generator')
78 else:
79 print('extra emitted element!')
80
81 # Attempting to call a non-batchable method inside a batch fails.
82 batch = it.batchiter()
83 try:
84 batch.greet(name='John Smith')
85 except error.ProgrammingError as e:
86 print(e)
87
88 # Attempting to call a local method inside a batch fails.
89 batch = it.batchiter()
90 try:
91 batch.hello()
92 except error.ProgrammingError as e:
93 print(e)
94
62
95 # local usage
63 # local usage
96 mylocal = localthing()
64 mylocal = localthing()
@@ -177,8 +145,13 b' class remotething(thing):'
177 for r in res.split(';'):
145 for r in res.split(';'):
178 yield r
146 yield r
179
147
180 def batchiter(self):
148 @contextlib.contextmanager
181 return wireprotov1peer.remoteiterbatcher(self)
149 def commandexecutor(self):
150 e = wireprotov1peer.peerexecutor(self)
151 try:
152 yield e
153 finally:
154 e.close()
182
155
183 @wireprotov1peer.batchable
156 @wireprotov1peer.batchable
184 def foo(self, one, two=None):
157 def foo(self, one, two=None):
@@ -6,7 +6,6 b' Eins und Zwei'
6 One and Two
6 One and Two
7 Eins und Zwei
7 Eins und Zwei
8 Uno und Due
8 Uno und Due
9 proper end of results generator
10
9
11 == Remote
10 == Remote
12 Ready.
11 Ready.
@@ -21,6 +20,3 b' REQ: batch?cmds=foo:one=Pof,two=Uxp;bar:'
21 One and Two
20 One and Two
22 Eins und Zwei
21 Eins und Zwei
23 Uno und Due
22 Uno und Due
24 proper end of results generator
25 Attempted to batch a non-batchable call to 'greet'
26 Attempted to batch a non-batchable call to 'hello'
@@ -93,7 +93,9 b' srv = serverrepo()'
93 clt = clientpeer(srv, uimod.ui())
93 clt = clientpeer(srv, uimod.ui())
94
94
95 print(clt.greet(b"Foobar"))
95 print(clt.greet(b"Foobar"))
96 b = clt.iterbatch()
96
97 list(map(b.greet, (b'Fo, =;:<o', b'Bar')))
97 with clt.commandexecutor() as e:
98 b.submit()
98 fgreet1 = e.callcommand(b'greet', {b'name': b'Fo, =;:<o'})
99 print([r for r in b.results()])
99 fgreet2 = e.callcommand(b'greet', {b'name': b'Bar'})
100
101 print([f.result() for f in (fgreet1, fgreet2)])
General Comments 0
You need to be logged in to leave comments. Login now