Show More
@@ -66,7 +66,6 b' from . import (' | |||||
66 | txnutil, |
|
66 | txnutil, | |
67 | util, |
|
67 | util, | |
68 | vfs as vfsmod, |
|
68 | vfs as vfsmod, | |
69 | wireprotov1peer, |
|
|||
70 | ) |
|
69 | ) | |
71 | from .utils import ( |
|
70 | from .utils import ( | |
72 | procutil, |
|
71 | procutil, | |
@@ -154,20 +153,6 b" moderncaps = {'lookup', 'branchmap', 'pu" | |||||
154 | 'unbundle'} |
|
153 | 'unbundle'} | |
155 | legacycaps = moderncaps.union({'changegroupsubset'}) |
|
154 | legacycaps = moderncaps.union({'changegroupsubset'}) | |
156 |
|
155 | |||
157 | class localiterbatcher(wireprotov1peer.iterbatcher): |
|
|||
158 | def __init__(self, local): |
|
|||
159 | super(localiterbatcher, self).__init__() |
|
|||
160 | self.local = local |
|
|||
161 |
|
||||
162 | def submit(self): |
|
|||
163 | # submit for a local iter batcher is a noop |
|
|||
164 | pass |
|
|||
165 |
|
||||
166 | def results(self): |
|
|||
167 | for name, args, opts, resref in self.calls: |
|
|||
168 | resref.set(getattr(self.local, name)(*args, **opts)) |
|
|||
169 | yield resref.value |
|
|||
170 |
|
||||
171 | @zi.implementer(repository.ipeercommandexecutor) |
|
156 | @zi.implementer(repository.ipeercommandexecutor) | |
172 | class localcommandexecutor(object): |
|
157 | class localcommandexecutor(object): | |
173 | def __init__(self, peer): |
|
158 | def __init__(self, peer): | |
@@ -333,9 +318,6 b' class localpeer(repository.peer):' | |||||
333 | def commandexecutor(self): |
|
318 | def commandexecutor(self): | |
334 | return localcommandexecutor(self) |
|
319 | return localcommandexecutor(self) | |
335 |
|
320 | |||
336 | def iterbatch(self): |
|
|||
337 | return localiterbatcher(self) |
|
|||
338 |
|
||||
339 | # End of peer interface. |
|
321 | # End of peer interface. | |
340 |
|
322 | |||
341 | class locallegacypeer(repository.legacypeer, localpeer): |
|
323 | class locallegacypeer(repository.legacypeer, localpeer): |
@@ -284,29 +284,6 b' class ipeerbase(ipeerconnection, ipeerca' | |||||
284 |
|
284 | |||
285 | All peer instances must conform to this interface. |
|
285 | All peer instances must conform to this interface. | |
286 | """ |
|
286 | """ | |
287 | def iterbatch(): |
|
|||
288 | """Obtain an object to be used for multiple method calls. |
|
|||
289 |
|
||||
290 | Various operations call several methods on peer instances. If each |
|
|||
291 | method call were performed immediately and serially, this would |
|
|||
292 | require round trips to remote peers and/or would slow down execution. |
|
|||
293 |
|
||||
294 | Some peers have the ability to "batch" method calls to avoid costly |
|
|||
295 | round trips or to facilitate concurrent execution. |
|
|||
296 |
|
||||
297 | This method returns an object that can be used to indicate intent to |
|
|||
298 | perform batched method calls. |
|
|||
299 |
|
||||
300 | The returned object is a proxy of this peer. It intercepts calls to |
|
|||
301 | batchable methods and queues them instead of performing them |
|
|||
302 | immediately. This proxy object has a ``submit`` method that will |
|
|||
303 | perform all queued batchable method calls. A ``results()`` method |
|
|||
304 | exposes the results of queued/batched method calls. It is a generator |
|
|||
305 | of results in the order they were called. |
|
|||
306 |
|
||||
307 | Not all peers or wire protocol implementations may actually batch method |
|
|||
308 | calls. However, they must all support this API. |
|
|||
309 | """ |
|
|||
310 |
|
287 | |||
311 | class ipeerbaselegacycommands(ipeerbase, ipeerlegacycommands): |
|
288 | class ipeerbaselegacycommands(ipeerbase, ipeerlegacycommands): | |
312 | """Unified peer interface that supports legacy commands.""" |
|
289 | """Unified peer interface that supports legacy commands.""" |
@@ -73,97 +73,6 b' class future(object):' | |||||
73 | raise error.RepoError("future is already set") |
|
73 | raise error.RepoError("future is already set") | |
74 | self.value = value |
|
74 | self.value = value | |
75 |
|
75 | |||
76 | class batcher(object): |
|
|||
77 | '''base class for batches of commands submittable in a single request |
|
|||
78 |
|
||||
79 | All methods invoked on instances of this class are simply queued and |
|
|||
80 | return a a future for the result. Once you call submit(), all the queued |
|
|||
81 | calls are performed and the results set in their respective futures. |
|
|||
82 | ''' |
|
|||
83 | def __init__(self): |
|
|||
84 | self.calls = [] |
|
|||
85 | def __getattr__(self, name): |
|
|||
86 | def call(*args, **opts): |
|
|||
87 | resref = future() |
|
|||
88 | # Please don't invent non-ascii method names, or you will |
|
|||
89 | # give core hg a very sad time. |
|
|||
90 | self.calls.append((name.encode('ascii'), args, opts, resref,)) |
|
|||
91 | return resref |
|
|||
92 | return call |
|
|||
93 | def submit(self): |
|
|||
94 | raise NotImplementedError() |
|
|||
95 |
|
||||
96 | class iterbatcher(batcher): |
|
|||
97 |
|
||||
98 | def submit(self): |
|
|||
99 | raise NotImplementedError() |
|
|||
100 |
|
||||
101 | def results(self): |
|
|||
102 | raise NotImplementedError() |
|
|||
103 |
|
||||
104 | class remoteiterbatcher(iterbatcher): |
|
|||
105 | def __init__(self, remote): |
|
|||
106 | super(remoteiterbatcher, self).__init__() |
|
|||
107 | self._remote = remote |
|
|||
108 |
|
||||
109 | def __getattr__(self, name): |
|
|||
110 | # Validate this method is batchable, since submit() only supports |
|
|||
111 | # batchable methods. |
|
|||
112 | fn = getattr(self._remote, name) |
|
|||
113 | if not getattr(fn, 'batchable', None): |
|
|||
114 | raise error.ProgrammingError('Attempted to batch a non-batchable ' |
|
|||
115 | 'call to %r' % name) |
|
|||
116 |
|
||||
117 | return super(remoteiterbatcher, self).__getattr__(name) |
|
|||
118 |
|
||||
119 | def submit(self): |
|
|||
120 | """Break the batch request into many patch calls and pipeline them. |
|
|||
121 |
|
||||
122 | This is mostly valuable over http where request sizes can be |
|
|||
123 | limited, but can be used in other places as well. |
|
|||
124 | """ |
|
|||
125 | # 2-tuple of (command, arguments) that represents what will be |
|
|||
126 | # sent over the wire. |
|
|||
127 | requests = [] |
|
|||
128 |
|
||||
129 | # 4-tuple of (command, final future, @batchable generator, remote |
|
|||
130 | # future). |
|
|||
131 | results = [] |
|
|||
132 |
|
||||
133 | for command, args, opts, finalfuture in self.calls: |
|
|||
134 | mtd = getattr(self._remote, command) |
|
|||
135 | batchable = mtd.batchable(mtd.__self__, *args, **opts) |
|
|||
136 |
|
||||
137 | commandargs, fremote = next(batchable) |
|
|||
138 | assert fremote |
|
|||
139 | requests.append((command, commandargs)) |
|
|||
140 | results.append((command, finalfuture, batchable, fremote)) |
|
|||
141 |
|
||||
142 | if requests: |
|
|||
143 | self._resultiter = self._remote._submitbatch(requests) |
|
|||
144 |
|
||||
145 | self._results = results |
|
|||
146 |
|
||||
147 | def results(self): |
|
|||
148 | for command, finalfuture, batchable, remotefuture in self._results: |
|
|||
149 | # Get the raw result, set it in the remote future, feed it |
|
|||
150 | # back into the @batchable generator so it can be decoded, and |
|
|||
151 | # set the result on the final future to this value. |
|
|||
152 | remoteresult = next(self._resultiter) |
|
|||
153 | remotefuture.set(remoteresult) |
|
|||
154 | finalfuture.set(next(batchable)) |
|
|||
155 |
|
||||
156 | # Verify our @batchable generators only emit 2 values. |
|
|||
157 | try: |
|
|||
158 | next(batchable) |
|
|||
159 | except StopIteration: |
|
|||
160 | pass |
|
|||
161 | else: |
|
|||
162 | raise error.ProgrammingError('%s @batchable generator emitted ' |
|
|||
163 | 'unexpected value count' % command) |
|
|||
164 |
|
||||
165 | yield finalfuture.value |
|
|||
166 |
|
||||
167 | def encodebatchcmds(req): |
|
76 | def encodebatchcmds(req): | |
168 | """Return a ``cmds`` argument value for the ``batch`` command.""" |
|
77 | """Return a ``cmds`` argument value for the ``batch`` command.""" | |
169 | escapearg = wireprototypes.escapebatcharg |
|
78 | escapearg = wireprototypes.escapebatcharg | |
@@ -412,9 +321,6 b' class wirepeer(repository.legacypeer):' | |||||
412 |
|
321 | |||
413 | # Begin of ipeercommands interface. |
|
322 | # Begin of ipeercommands interface. | |
414 |
|
323 | |||
415 | def iterbatch(self): |
|
|||
416 | return remoteiterbatcher(self) |
|
|||
417 |
|
||||
418 | @batchable |
|
324 | @batchable | |
419 | def lookup(self, key): |
|
325 | def lookup(self, key): | |
420 | self.requirecap('lookup', _('look up remote revision')) |
|
326 | self.requirecap('lookup', _('look up remote revision')) |
@@ -7,10 +7,10 b'' | |||||
7 |
|
7 | |||
8 | from __future__ import absolute_import, print_function |
|
8 | from __future__ import absolute_import, print_function | |
9 |
|
9 | |||
|
10 | import contextlib | |||
|
11 | ||||
10 | from mercurial import ( |
|
12 | from mercurial import ( | |
11 | error, |
|
|||
12 | localrepo, |
|
13 | localrepo, | |
13 | util, |
|
|||
14 | wireprotov1peer, |
|
14 | wireprotov1peer, | |
15 |
|
15 | |||
16 | ) |
|
16 | ) | |
@@ -30,9 +30,14 b' class localthing(thing):' | |||||
30 | return "%s und %s" % (b, a,) |
|
30 | return "%s und %s" % (b, a,) | |
31 | def greet(self, name=None): |
|
31 | def greet(self, name=None): | |
32 | return "Hello, %s" % name |
|
32 | return "Hello, %s" % name | |
33 | def batchiter(self): |
|
33 | ||
34 | '''Support for local batching.''' |
|
34 | @contextlib.contextmanager | |
35 | return localrepo.localiterbatcher(self) |
|
35 | def commandexecutor(self): | |
|
36 | e = localrepo.localcommandexecutor(self) | |||
|
37 | try: | |||
|
38 | yield e | |||
|
39 | finally: | |||
|
40 | e.close() | |||
36 |
|
41 | |||
37 | # usage of "thing" interface |
|
42 | # usage of "thing" interface | |
38 | def use(it): |
|
43 | def use(it): | |
@@ -45,52 +50,15 b' def use(it):' | |||||
45 | print(it.bar("Eins", "Zwei")) |
|
50 | print(it.bar("Eins", "Zwei")) | |
46 |
|
51 | |||
47 | # Batched call to a couple of proxied methods. |
|
52 | # Batched call to a couple of proxied methods. | |
48 | batch = it.batchiter() |
|
|||
49 | # The calls return futures to eventually hold results. |
|
|||
50 | foo = batch.foo(one="One", two="Two") |
|
|||
51 | bar = batch.bar("Eins", "Zwei") |
|
|||
52 | bar2 = batch.bar(b="Uno", a="Due") |
|
|||
53 |
|
||||
54 | # Future shouldn't be set until we submit(). |
|
|||
55 | assert isinstance(foo, wireprotov1peer.future) |
|
|||
56 | assert not util.safehasattr(foo, 'value') |
|
|||
57 | assert not util.safehasattr(bar, 'value') |
|
|||
58 | batch.submit() |
|
|||
59 | # Call results() to obtain results as a generator. |
|
|||
60 | results = batch.results() |
|
|||
61 |
|
53 | |||
62 | # Future results shouldn't be set until we consume a value. |
|
54 | with it.commandexecutor() as e: | |
63 | assert not util.safehasattr(foo, 'value') |
|
55 | ffoo = e.callcommand('foo', {'one': 'One', 'two': 'Two'}) | |
64 | foovalue = next(results) |
|
56 | fbar = e.callcommand('bar', {'b': 'Eins', 'a': 'Zwei'}) | |
65 | assert util.safehasattr(foo, 'value') |
|
57 | fbar2 = e.callcommand('bar', {'b': 'Uno', 'a': 'Due'}) | |
66 | assert foovalue == foo.value |
|
|||
67 | print(foo.value) |
|
|||
68 | next(results) |
|
|||
69 | print(bar.value) |
|
|||
70 | next(results) |
|
|||
71 | print(bar2.value) |
|
|||
72 |
|
58 | |||
73 | # We should be at the end of the results generator. |
|
59 | print(ffoo.result()) | |
74 | try: |
|
60 | print(fbar.result()) | |
75 |
|
|
61 | print(fbar2.result()) | |
76 | except StopIteration: |
|
|||
77 | print('proper end of results generator') |
|
|||
78 | else: |
|
|||
79 | print('extra emitted element!') |
|
|||
80 |
|
||||
81 | # Attempting to call a non-batchable method inside a batch fails. |
|
|||
82 | batch = it.batchiter() |
|
|||
83 | try: |
|
|||
84 | batch.greet(name='John Smith') |
|
|||
85 | except error.ProgrammingError as e: |
|
|||
86 | print(e) |
|
|||
87 |
|
||||
88 | # Attempting to call a local method inside a batch fails. |
|
|||
89 | batch = it.batchiter() |
|
|||
90 | try: |
|
|||
91 | batch.hello() |
|
|||
92 | except error.ProgrammingError as e: |
|
|||
93 | print(e) |
|
|||
94 |
|
62 | |||
95 | # local usage |
|
63 | # local usage | |
96 | mylocal = localthing() |
|
64 | mylocal = localthing() | |
@@ -177,8 +145,13 b' class remotething(thing):' | |||||
177 | for r in res.split(';'): |
|
145 | for r in res.split(';'): | |
178 | yield r |
|
146 | yield r | |
179 |
|
147 | |||
180 | def batchiter(self): |
|
148 | @contextlib.contextmanager | |
181 | return wireprotov1peer.remoteiterbatcher(self) |
|
149 | def commandexecutor(self): | |
|
150 | e = wireprotov1peer.peerexecutor(self) | |||
|
151 | try: | |||
|
152 | yield e | |||
|
153 | finally: | |||
|
154 | e.close() | |||
182 |
|
155 | |||
183 | @wireprotov1peer.batchable |
|
156 | @wireprotov1peer.batchable | |
184 | def foo(self, one, two=None): |
|
157 | def foo(self, one, two=None): |
@@ -6,7 +6,6 b' Eins und Zwei' | |||||
6 | One and Two |
|
6 | One and Two | |
7 | Eins und Zwei |
|
7 | Eins und Zwei | |
8 | Uno und Due |
|
8 | Uno und Due | |
9 | proper end of results generator |
|
|||
10 |
|
9 | |||
11 | == Remote |
|
10 | == Remote | |
12 | Ready. |
|
11 | Ready. | |
@@ -21,6 +20,3 b' REQ: batch?cmds=foo:one=Pof,two=Uxp;bar:' | |||||
21 | One and Two |
|
20 | One and Two | |
22 | Eins und Zwei |
|
21 | Eins und Zwei | |
23 | Uno und Due |
|
22 | Uno und Due | |
24 | proper end of results generator |
|
|||
25 | Attempted to batch a non-batchable call to 'greet' |
|
|||
26 | Attempted to batch a non-batchable call to 'hello' |
|
@@ -93,7 +93,9 b' srv = serverrepo()' | |||||
93 | clt = clientpeer(srv, uimod.ui()) |
|
93 | clt = clientpeer(srv, uimod.ui()) | |
94 |
|
94 | |||
95 | print(clt.greet(b"Foobar")) |
|
95 | print(clt.greet(b"Foobar")) | |
96 | b = clt.iterbatch() |
|
96 | ||
97 | list(map(b.greet, (b'Fo, =;:<o', b'Bar'))) |
|
97 | with clt.commandexecutor() as e: | |
98 | b.submit() |
|
98 | fgreet1 = e.callcommand(b'greet', {b'name': b'Fo, =;:<o'}) | |
99 | print([r for r in b.results()]) |
|
99 | fgreet2 = e.callcommand(b'greet', {b'name': b'Bar'}) | |
|
100 | ||||
|
101 | print([f.result() for f in (fgreet1, fgreet2)]) |
General Comments 0
You need to be logged in to leave comments.
Login now