Show More
@@ -69,7 +69,8 b' class localiterbatcher(iterbatcher):' | |||
|
69 | 69 | |
|
70 | 70 | def results(self): |
|
71 | 71 | for name, args, opts, resref in self.calls: |
|
72 |
|
|
|
72 | resref.set(getattr(self.local, name)(*args, **opts)) | |
|
73 | yield resref.value | |
|
73 | 74 | |
|
74 | 75 | def batchable(f): |
|
75 | 76 | '''annotation for batchable methods |
@@ -133,23 +133,47 b' class remoteiterbatcher(peer.iterbatcher' | |||
|
133 | 133 | This is mostly valuable over http where request sizes can be |
|
134 | 134 | limited, but can be used in other places as well. |
|
135 | 135 | """ |
|
136 | req, rsp = [], [] | |
|
137 | for name, args, opts, resref in self.calls: | |
|
138 | mtd = getattr(self._remote, name) | |
|
136 | # 2-tuple of (command, arguments) that represents what will be | |
|
137 | # sent over the wire. | |
|
138 | requests = [] | |
|
139 | ||
|
140 | # 4-tuple of (command, final future, @batchable generator, remote | |
|
141 | # future). | |
|
142 | results = [] | |
|
143 | ||
|
144 | for command, args, opts, finalfuture in self.calls: | |
|
145 | mtd = getattr(self._remote, command) | |
|
139 | 146 | batchable = mtd.batchable(mtd.im_self, *args, **opts) |
|
140 | encargsorres, encresref = next(batchable) | |
|
141 | assert encresref | |
|
142 | req.append((name, encargsorres)) | |
|
143 |
r |
|
|
144 | if req: | |
|
145 | self._resultiter = self._remote._submitbatch(req) | |
|
146 | self._rsp = rsp | |
|
147 | ||
|
148 | commandargs, fremote = next(batchable) | |
|
149 | assert fremote | |
|
150 | requests.append((command, commandargs)) | |
|
151 | results.append((command, finalfuture, batchable, fremote)) | |
|
152 | ||
|
153 | if requests: | |
|
154 | self._resultiter = self._remote._submitbatch(requests) | |
|
155 | ||
|
156 | self._results = results | |
|
147 | 157 | |
|
148 | 158 | def results(self): |
|
149 | for (batchable, encresref), encres in itertools.izip( | |
|
150 | self._rsp, self._resultiter): | |
|
151 | encresref.set(encres) | |
|
152 | yield next(batchable) | |
|
159 | for command, finalfuture, batchable, remotefuture in self._results: | |
|
160 | # Get the raw result, set it in the remote future, feed it | |
|
161 | # back into the @batchable generator so it can be decoded, and | |
|
162 | # set the result on the final future to this value. | |
|
163 | remoteresult = next(self._resultiter) | |
|
164 | remotefuture.set(remoteresult) | |
|
165 | finalfuture.set(next(batchable)) | |
|
166 | ||
|
167 | # Verify our @batchable generators only emit 2 values. | |
|
168 | try: | |
|
169 | next(batchable) | |
|
170 | except StopIteration: | |
|
171 | pass | |
|
172 | else: | |
|
173 | raise error.ProgrammingError('%s @batchable generator emitted ' | |
|
174 | 'unexpected value count' % command) | |
|
175 | ||
|
176 | yield finalfuture.value | |
|
153 | 177 | |
|
154 | 178 | # Forward a couple of names from peer to make wireproto interactions |
|
155 | 179 | # slightly more sensible. |
@@ -8,7 +8,9 b'' | |||
|
8 | 8 | from __future__ import absolute_import, print_function |
|
9 | 9 | |
|
10 | 10 | from mercurial import ( |
|
11 | error, | |
|
11 | 12 | peer, |
|
13 | util, | |
|
12 | 14 | wireproto, |
|
13 | 15 | ) |
|
14 | 16 | |
@@ -27,9 +29,9 b' class localthing(thing):' | |||
|
27 | 29 | return "%s und %s" % (b, a,) |
|
28 | 30 | def greet(self, name=None): |
|
29 | 31 | return "Hello, %s" % name |
|
30 | def batch(self): | |
|
32 | def batchiter(self): | |
|
31 | 33 | '''Support for local batching.''' |
|
32 | return peer.localbatch(self) | |
|
34 | return peer.localiterbatcher(self) | |
|
33 | 35 | |
|
34 | 36 | # usage of "thing" interface |
|
35 | 37 | def use(it): |
@@ -41,27 +43,54 b' def use(it):' | |||
|
41 | 43 | print(it.foo("Un", two="Deux")) |
|
42 | 44 | print(it.bar("Eins", "Zwei")) |
|
43 | 45 | |
|
44 |
# Batched call to a couple of |
|
|
45 | batch = it.batch() | |
|
46 | # Batched call to a couple of proxied methods. | |
|
47 | batch = it.batchiter() | |
|
46 | 48 | # The calls return futures to eventually hold results. |
|
47 | 49 | foo = batch.foo(one="One", two="Two") |
|
48 | 50 | bar = batch.bar("Eins", "Zwei") |
|
49 | # We can call non-batchable proxy methods, but the break the current batch | |
|
50 | # request and cause additional roundtrips. | |
|
51 | greet = batch.greet(name="John Smith") | |
|
52 | # We can also add local methods into the mix, but they break the batch too. | |
|
53 | hello = batch.hello() | |
|
54 | 51 | bar2 = batch.bar(b="Uno", a="Due") |
|
55 | # Only now are all the calls executed in sequence, with as few roundtrips | |
|
56 | # as possible. | |
|
52 | ||
|
53 | # Future shouldn't be set until we submit(). | |
|
54 | assert isinstance(foo, peer.future) | |
|
55 | assert not util.safehasattr(foo, 'value') | |
|
56 | assert not util.safehasattr(bar, 'value') | |
|
57 | 57 | batch.submit() |
|
58 | # After the call to submit, the futures actually contain values. | |
|
58 | # Call results() to obtain results as a generator. | |
|
59 | results = batch.results() | |
|
60 | ||
|
61 | # Future results shouldn't be set until we consume a value. | |
|
62 | assert not util.safehasattr(foo, 'value') | |
|
63 | foovalue = next(results) | |
|
64 | assert util.safehasattr(foo, 'value') | |
|
65 | assert foovalue == foo.value | |
|
59 | 66 | print(foo.value) |
|
67 | next(results) | |
|
60 | 68 | print(bar.value) |
|
61 | print(greet.value) | |
|
62 | print(hello.value) | |
|
69 | next(results) | |
|
63 | 70 | print(bar2.value) |
|
64 | 71 | |
|
72 | # We should be at the end of the results generator. | |
|
73 | try: | |
|
74 | next(results) | |
|
75 | except StopIteration: | |
|
76 | print('proper end of results generator') | |
|
77 | else: | |
|
78 | print('extra emitted element!') | |
|
79 | ||
|
80 | # Attempting to call a non-batchable method inside a batch fails. | |
|
81 | batch = it.batchiter() | |
|
82 | try: | |
|
83 | batch.greet(name='John Smith') | |
|
84 | except error.ProgrammingError as e: | |
|
85 | print(e) | |
|
86 | ||
|
87 | # Attempting to call a local method inside a batch fails. | |
|
88 | batch = it.batchiter() | |
|
89 | try: | |
|
90 | batch.hello() | |
|
91 | except error.ProgrammingError as e: | |
|
92 | print(e) | |
|
93 | ||
|
65 | 94 | # local usage |
|
66 | 95 | mylocal = localthing() |
|
67 | 96 | print() |
@@ -144,10 +173,11 b' class remotething(thing):' | |||
|
144 | 173 | req.append(name + ':' + args) |
|
145 | 174 | req = ';'.join(req) |
|
146 | 175 | res = self._submitone('batch', [('cmds', req,)]) |
|
147 |
|
|
|
176 | for r in res.split(';'): | |
|
177 | yield r | |
|
148 | 178 | |
|
149 | def batch(self): | |
|
150 | return wireproto.remotebatch(self) | |
|
179 | def batchiter(self): | |
|
180 | return wireproto.remoteiterbatcher(self) | |
|
151 | 181 | |
|
152 | 182 | @peer.batchable |
|
153 | 183 | def foo(self, one, two=None): |
@@ -5,9 +5,8 b' Un and Deux' | |||
|
5 | 5 | Eins und Zwei |
|
6 | 6 | One and Two |
|
7 | 7 | Eins und Zwei |
|
8 | Hello, John Smith | |
|
9 | Ready. | |
|
10 | 8 | Uno und Due |
|
9 | proper end of results generator | |
|
11 | 10 | |
|
12 | 11 | == Remote |
|
13 | 12 | Ready. |
@@ -17,14 +16,11 b' Un and Deux' | |||
|
17 | 16 | REQ: bar?b=Fjot&a=[xfj |
|
18 | 17 | -> Fjot!voe![xfj |
|
19 | 18 | Eins und Zwei |
|
20 | REQ: batch?cmds=foo:one=Pof,two=Uxp;bar:b=Fjot,a=[xfj | |
|
21 | -> Pof!boe!Uxp;Fjot!voe![xfj | |
|
22 | REQ: greet?name=Kpio!Tnjui | |
|
23 | -> Ifmmp-!Kpio!Tnjui | |
|
24 | REQ: batch?cmds=bar:b=Vop,a=Evf | |
|
25 | -> Vop!voe!Evf | |
|
19 | REQ: batch?cmds=foo:one=Pof,two=Uxp;bar:b=Fjot,a=[xfj;bar:b=Vop,a=Evf | |
|
20 | -> Pof!boe!Uxp;Fjot!voe![xfj;Vop!voe!Evf | |
|
26 | 21 | One and Two |
|
27 | 22 | Eins und Zwei |
|
28 | Hello, John Smith | |
|
29 | Ready. | |
|
30 | 23 | Uno und Due |
|
24 | proper end of results generator | |
|
25 | Attempted to batch a non-batchable call to 'greet' | |
|
26 | Attempted to batch a non-batchable call to 'hello' |
General Comments 0
You need to be logged in to leave comments.
Login now