##// END OF EJS Templates
wireproto: overhaul iterating batcher code (API)...
Gregory Szorc -
r33761:4c706037 default
parent child Browse files
Show More
@@ -69,7 +69,8 b' class localiterbatcher(iterbatcher):'
69
69
70 def results(self):
70 def results(self):
71 for name, args, opts, resref in self.calls:
71 for name, args, opts, resref in self.calls:
72 yield getattr(self.local, name)(*args, **opts)
72 resref.set(getattr(self.local, name)(*args, **opts))
73 yield resref.value
73
74
74 def batchable(f):
75 def batchable(f):
75 '''annotation for batchable methods
76 '''annotation for batchable methods
@@ -133,23 +133,47 b' class remoteiterbatcher(peer.iterbatcher'
133 This is mostly valuable over http where request sizes can be
133 This is mostly valuable over http where request sizes can be
134 limited, but can be used in other places as well.
134 limited, but can be used in other places as well.
135 """
135 """
136 req, rsp = [], []
136 # 2-tuple of (command, arguments) that represents what will be
137 for name, args, opts, resref in self.calls:
137 # sent over the wire.
138 mtd = getattr(self._remote, name)
138 requests = []
139
140 # 4-tuple of (command, final future, @batchable generator, remote
141 # future).
142 results = []
143
144 for command, args, opts, finalfuture in self.calls:
145 mtd = getattr(self._remote, command)
139 batchable = mtd.batchable(mtd.im_self, *args, **opts)
146 batchable = mtd.batchable(mtd.im_self, *args, **opts)
140 encargsorres, encresref = next(batchable)
147
141 assert encresref
148 commandargs, fremote = next(batchable)
142 req.append((name, encargsorres))
149 assert fremote
143 rsp.append((batchable, encresref))
150 requests.append((command, commandargs))
144 if req:
151 results.append((command, finalfuture, batchable, fremote))
145 self._resultiter = self._remote._submitbatch(req)
152
146 self._rsp = rsp
153 if requests:
154 self._resultiter = self._remote._submitbatch(requests)
155
156 self._results = results
147
157
148 def results(self):
158 def results(self):
149 for (batchable, encresref), encres in itertools.izip(
159 for command, finalfuture, batchable, remotefuture in self._results:
150 self._rsp, self._resultiter):
160 # Get the raw result, set it in the remote future, feed it
151 encresref.set(encres)
161 # back into the @batchable generator so it can be decoded, and
152 yield next(batchable)
162 # set the result on the final future to this value.
163 remoteresult = next(self._resultiter)
164 remotefuture.set(remoteresult)
165 finalfuture.set(next(batchable))
166
167 # Verify our @batchable generators only emit 2 values.
168 try:
169 next(batchable)
170 except StopIteration:
171 pass
172 else:
173 raise error.ProgrammingError('%s @batchable generator emitted '
174 'unexpected value count' % command)
175
176 yield finalfuture.value
153
177
154 # Forward a couple of names from peer to make wireproto interactions
178 # Forward a couple of names from peer to make wireproto interactions
155 # slightly more sensible.
179 # slightly more sensible.
@@ -8,7 +8,9 b''
8 from __future__ import absolute_import, print_function
8 from __future__ import absolute_import, print_function
9
9
10 from mercurial import (
10 from mercurial import (
11 error,
11 peer,
12 peer,
13 util,
12 wireproto,
14 wireproto,
13 )
15 )
14
16
@@ -27,9 +29,9 b' class localthing(thing):'
27 return "%s und %s" % (b, a,)
29 return "%s und %s" % (b, a,)
28 def greet(self, name=None):
30 def greet(self, name=None):
29 return "Hello, %s" % name
31 return "Hello, %s" % name
30 def batch(self):
32 def batchiter(self):
31 '''Support for local batching.'''
33 '''Support for local batching.'''
32 return peer.localbatch(self)
34 return peer.localiterbatcher(self)
33
35
34 # usage of "thing" interface
36 # usage of "thing" interface
35 def use(it):
37 def use(it):
@@ -41,27 +43,54 b' def use(it):'
41 print(it.foo("Un", two="Deux"))
43 print(it.foo("Un", two="Deux"))
42 print(it.bar("Eins", "Zwei"))
44 print(it.bar("Eins", "Zwei"))
43
45
44 # Batched call to a couple of (possibly proxied) methods.
46 # Batched call to a couple of proxied methods.
45 batch = it.batch()
47 batch = it.batchiter()
46 # The calls return futures to eventually hold results.
48 # The calls return futures to eventually hold results.
47 foo = batch.foo(one="One", two="Two")
49 foo = batch.foo(one="One", two="Two")
48 bar = batch.bar("Eins", "Zwei")
50 bar = batch.bar("Eins", "Zwei")
49 # We can call non-batchable proxy methods, but the break the current batch
50 # request and cause additional roundtrips.
51 greet = batch.greet(name="John Smith")
52 # We can also add local methods into the mix, but they break the batch too.
53 hello = batch.hello()
54 bar2 = batch.bar(b="Uno", a="Due")
51 bar2 = batch.bar(b="Uno", a="Due")
55 # Only now are all the calls executed in sequence, with as few roundtrips
52
56 # as possible.
53 # Future shouldn't be set until we submit().
54 assert isinstance(foo, peer.future)
55 assert not util.safehasattr(foo, 'value')
56 assert not util.safehasattr(bar, 'value')
57 batch.submit()
57 batch.submit()
58 # After the call to submit, the futures actually contain values.
58 # Call results() to obtain results as a generator.
59 results = batch.results()
60
61 # Future results shouldn't be set until we consume a value.
62 assert not util.safehasattr(foo, 'value')
63 foovalue = next(results)
64 assert util.safehasattr(foo, 'value')
65 assert foovalue == foo.value
59 print(foo.value)
66 print(foo.value)
67 next(results)
60 print(bar.value)
68 print(bar.value)
61 print(greet.value)
69 next(results)
62 print(hello.value)
63 print(bar2.value)
70 print(bar2.value)
64
71
72 # We should be at the end of the results generator.
73 try:
74 next(results)
75 except StopIteration:
76 print('proper end of results generator')
77 else:
78 print('extra emitted element!')
79
80 # Attempting to call a non-batchable method inside a batch fails.
81 batch = it.batchiter()
82 try:
83 batch.greet(name='John Smith')
84 except error.ProgrammingError as e:
85 print(e)
86
87 # Attempting to call a local method inside a batch fails.
88 batch = it.batchiter()
89 try:
90 batch.hello()
91 except error.ProgrammingError as e:
92 print(e)
93
65 # local usage
94 # local usage
66 mylocal = localthing()
95 mylocal = localthing()
67 print()
96 print()
@@ -144,10 +173,11 b' class remotething(thing):'
144 req.append(name + ':' + args)
173 req.append(name + ':' + args)
145 req = ';'.join(req)
174 req = ';'.join(req)
146 res = self._submitone('batch', [('cmds', req,)])
175 res = self._submitone('batch', [('cmds', req,)])
147 return res.split(';')
176 for r in res.split(';'):
177 yield r
148
178
149 def batch(self):
179 def batchiter(self):
150 return wireproto.remotebatch(self)
180 return wireproto.remoteiterbatcher(self)
151
181
152 @peer.batchable
182 @peer.batchable
153 def foo(self, one, two=None):
183 def foo(self, one, two=None):
@@ -5,9 +5,8 b' Un and Deux'
5 Eins und Zwei
5 Eins und Zwei
6 One and Two
6 One and Two
7 Eins und Zwei
7 Eins und Zwei
8 Hello, John Smith
9 Ready.
10 Uno und Due
8 Uno und Due
9 proper end of results generator
11
10
12 == Remote
11 == Remote
13 Ready.
12 Ready.
@@ -17,14 +16,11 b' Un and Deux'
17 REQ: bar?b=Fjot&a=[xfj
16 REQ: bar?b=Fjot&a=[xfj
18 -> Fjot!voe![xfj
17 -> Fjot!voe![xfj
19 Eins und Zwei
18 Eins und Zwei
20 REQ: batch?cmds=foo:one=Pof,two=Uxp;bar:b=Fjot,a=[xfj
19 REQ: batch?cmds=foo:one=Pof,two=Uxp;bar:b=Fjot,a=[xfj;bar:b=Vop,a=Evf
21 -> Pof!boe!Uxp;Fjot!voe![xfj
20 -> Pof!boe!Uxp;Fjot!voe![xfj;Vop!voe!Evf
22 REQ: greet?name=Kpio!Tnjui
23 -> Ifmmp-!Kpio!Tnjui
24 REQ: batch?cmds=bar:b=Vop,a=Evf
25 -> Vop!voe!Evf
26 One and Two
21 One and Two
27 Eins und Zwei
22 Eins und Zwei
28 Hello, John Smith
29 Ready.
30 Uno und Due
23 Uno und Due
24 proper end of results generator
25 Attempted to batch a non-batchable call to 'greet'
26 Attempted to batch a non-batchable call to 'hello'
General Comments 0
You need to be logged in to leave comments. Login now