##// END OF EJS Templates
batching: migrate basic noop batching into peer.peer...
Augie Fackler -
r25912:cbbdd085 default
parent child Browse files
Show More
@@ -8,9 +8,85
8 8
9 9 from i18n import _
10 10 import error
11 import util
12
13 # abstract batching support
14
15 class future(object):
16 '''placeholder for a value to be set later'''
17 def set(self, value):
18 if util.safehasattr(self, 'value'):
19 raise error.RepoError("future is already set")
20 self.value = value
21
22 class batcher(object):
23 '''base class for batches of commands submittable in a single request
24
25 All methods invoked on instances of this class are simply queued and
26 return a a future for the result. Once you call submit(), all the queued
27 calls are performed and the results set in their respective futures.
28 '''
29 def __init__(self):
30 self.calls = []
31 def __getattr__(self, name):
32 def call(*args, **opts):
33 resref = future()
34 self.calls.append((name, args, opts, resref,))
35 return resref
36 return call
37 def submit(self):
38 pass
39
40 class localbatch(batcher):
41 '''performs the queued calls directly'''
42 def __init__(self, local):
43 batcher.__init__(self)
44 self.local = local
45 def submit(self):
46 for name, args, opts, resref in self.calls:
47 resref.set(getattr(self.local, name)(*args, **opts))
48
49 def batchable(f):
50 '''annotation for batchable methods
51
52 Such methods must implement a coroutine as follows:
53
54 @batchable
55 def sample(self, one, two=None):
56 # Handle locally computable results first:
57 if not one:
58 yield "a local result", None
59 # Build list of encoded arguments suitable for your wire protocol:
60 encargs = [('one', encode(one),), ('two', encode(two),)]
61 # Create future for injection of encoded result:
62 encresref = future()
63 # Return encoded arguments and future:
64 yield encargs, encresref
65 # Assuming the future to be filled with the result from the batched
66 # request now. Decode it:
67 yield decode(encresref.value)
68
69 The decorator returns a function which wraps this coroutine as a plain
70 method, but adds the original method as an attribute called "batchable",
71 which is used by remotebatch to split the call into separate encoding and
72 decoding phases.
73 '''
74 def plain(*args, **opts):
75 batchable = f(*args, **opts)
76 encargsorres, encresref = batchable.next()
77 if not encresref:
78 return encargsorres # a local result in this case
79 self = args[0]
80 encresref.set(self._submitone(f.func_name, encargsorres))
81 return batchable.next()
82 setattr(plain, 'batchable', f)
83 return plain
11 84
12 85 class peerrepository(object):
13 86
87 def batch(self):
88 return localbatch(self)
89
14 90 def capable(self, name):
15 91 '''tell whether repo supports named capability.
16 92 return False if not supported.
@@ -58,48 +58,12 class abstractserverproto(object):
58 58 Some protocols may have compressed the contents."""
59 59 raise NotImplementedError()
60 60
61 # abstract batching support
62
63 class future(object):
64 '''placeholder for a value to be set later'''
65 def set(self, value):
66 if util.safehasattr(self, 'value'):
67 raise error.RepoError("future is already set")
68 self.value = value
69
70 class batcher(object):
71 '''base class for batches of commands submittable in a single request
72
73 All methods invoked on instances of this class are simply queued and
74 return a a future for the result. Once you call submit(), all the queued
75 calls are performed and the results set in their respective futures.
76 '''
77 def __init__(self):
78 self.calls = []
79 def __getattr__(self, name):
80 def call(*args, **opts):
81 resref = future()
82 self.calls.append((name, args, opts, resref,))
83 return resref
84 return call
85 def submit(self):
86 pass
87
88 class localbatch(batcher):
89 '''performs the queued calls directly'''
90 def __init__(self, local):
91 batcher.__init__(self)
92 self.local = local
93 def submit(self):
94 for name, args, opts, resref in self.calls:
95 resref.set(getattr(self.local, name)(*args, **opts))
96
97 class remotebatch(batcher):
61 class remotebatch(peer.batcher):
98 62 '''batches the queued calls; uses as few roundtrips as possible'''
99 63 def __init__(self, remote):
100 64 '''remote must support _submitbatch(encbatch) and
101 65 _submitone(op, encargs)'''
102 batcher.__init__(self)
66 peer.batcher.__init__(self)
103 67 self.remote = remote
104 68 def submit(self):
105 69 req, rsp = [], []
@@ -128,41 +92,10 class remotebatch(batcher):
128 92 encresref.set(encres)
129 93 resref.set(batchable.next())
130 94
131 def batchable(f):
132 '''annotation for batchable methods
133
134 Such methods must implement a coroutine as follows:
135
136 @batchable
137 def sample(self, one, two=None):
138 # Handle locally computable results first:
139 if not one:
140 yield "a local result", None
141 # Build list of encoded arguments suitable for your wire protocol:
142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 # Create future for injection of encoded result:
144 encresref = future()
145 # Return encoded arguments and future:
146 yield encargs, encresref
147 # Assuming the future to be filled with the result from the batched
148 # request now. Decode it:
149 yield decode(encresref.value)
150
151 The decorator returns a function which wraps this coroutine as a plain
152 method, but adds the original method as an attribute called "batchable",
153 which is used by remotebatch to split the call into separate encoding and
154 decoding phases.
155 '''
156 def plain(*args, **opts):
157 batchable = f(*args, **opts)
158 encargsorres, encresref = batchable.next()
159 if not encresref:
160 return encargsorres # a local result in this case
161 self = args[0]
162 encresref.set(self._submitone(f.func_name, encargsorres))
163 return batchable.next()
164 setattr(plain, 'batchable', f)
165 return plain
95 # Forward a couple of names from peer to make wireproto interactions
96 # slightly more sensible.
97 batchable = peer.batchable
98 future = peer.future
166 99
167 100 # list of nodes encoding / decoding
168 101
@@ -5,7 +5,8
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 from mercurial.wireproto import localbatch, remotebatch, batchable, future
8 from mercurial.peer import localbatch, batchable, future
9 from mercurial.wireproto import remotebatch
9 10
10 11 # equivalent of repo.repository
11 12 class thing(object):
@@ -12,6 +12,10 class proto(object):
12 12 class clientpeer(wireproto.wirepeer):
13 13 def __init__(self, serverrepo):
14 14 self.serverrepo = serverrepo
15
16 def _capabilities(self):
17 return ['batch']
18
15 19 def _call(self, cmd, **args):
16 20 return wireproto.dispatch(self.serverrepo, proto(args), cmd)
17 21
General Comments 0
You need to be logged in to leave comments. Login now