##// END OF EJS Templates
batching: migrate basic noop batching into peer.peer...
Augie Fackler -
r25912:cbbdd085 default
parent child Browse files
Show More
@@ -8,9 +8,85 b''
8
8
9 from i18n import _
9 from i18n import _
10 import error
10 import error
11 import util
12
13 # abstract batching support
14
15 class future(object):
16 '''placeholder for a value to be set later'''
17 def set(self, value):
18 if util.safehasattr(self, 'value'):
19 raise error.RepoError("future is already set")
20 self.value = value
21
22 class batcher(object):
23 '''base class for batches of commands submittable in a single request
24
25 All methods invoked on instances of this class are simply queued and
26 return a a future for the result. Once you call submit(), all the queued
27 calls are performed and the results set in their respective futures.
28 '''
29 def __init__(self):
30 self.calls = []
31 def __getattr__(self, name):
32 def call(*args, **opts):
33 resref = future()
34 self.calls.append((name, args, opts, resref,))
35 return resref
36 return call
37 def submit(self):
38 pass
39
40 class localbatch(batcher):
41 '''performs the queued calls directly'''
42 def __init__(self, local):
43 batcher.__init__(self)
44 self.local = local
45 def submit(self):
46 for name, args, opts, resref in self.calls:
47 resref.set(getattr(self.local, name)(*args, **opts))
48
49 def batchable(f):
50 '''annotation for batchable methods
51
52 Such methods must implement a coroutine as follows:
53
54 @batchable
55 def sample(self, one, two=None):
56 # Handle locally computable results first:
57 if not one:
58 yield "a local result", None
59 # Build list of encoded arguments suitable for your wire protocol:
60 encargs = [('one', encode(one),), ('two', encode(two),)]
61 # Create future for injection of encoded result:
62 encresref = future()
63 # Return encoded arguments and future:
64 yield encargs, encresref
65 # Assuming the future to be filled with the result from the batched
66 # request now. Decode it:
67 yield decode(encresref.value)
68
69 The decorator returns a function which wraps this coroutine as a plain
70 method, but adds the original method as an attribute called "batchable",
71 which is used by remotebatch to split the call into separate encoding and
72 decoding phases.
73 '''
74 def plain(*args, **opts):
75 batchable = f(*args, **opts)
76 encargsorres, encresref = batchable.next()
77 if not encresref:
78 return encargsorres # a local result in this case
79 self = args[0]
80 encresref.set(self._submitone(f.func_name, encargsorres))
81 return batchable.next()
82 setattr(plain, 'batchable', f)
83 return plain
11
84
12 class peerrepository(object):
85 class peerrepository(object):
13
86
87 def batch(self):
88 return localbatch(self)
89
14 def capable(self, name):
90 def capable(self, name):
15 '''tell whether repo supports named capability.
91 '''tell whether repo supports named capability.
16 return False if not supported.
92 return False if not supported.
@@ -58,48 +58,12 b' class abstractserverproto(object):'
58 Some protocols may have compressed the contents."""
58 Some protocols may have compressed the contents."""
59 raise NotImplementedError()
59 raise NotImplementedError()
60
60
61 # abstract batching support
61 class remotebatch(peer.batcher):
62
63 class future(object):
64 '''placeholder for a value to be set later'''
65 def set(self, value):
66 if util.safehasattr(self, 'value'):
67 raise error.RepoError("future is already set")
68 self.value = value
69
70 class batcher(object):
71 '''base class for batches of commands submittable in a single request
72
73 All methods invoked on instances of this class are simply queued and
74 return a a future for the result. Once you call submit(), all the queued
75 calls are performed and the results set in their respective futures.
76 '''
77 def __init__(self):
78 self.calls = []
79 def __getattr__(self, name):
80 def call(*args, **opts):
81 resref = future()
82 self.calls.append((name, args, opts, resref,))
83 return resref
84 return call
85 def submit(self):
86 pass
87
88 class localbatch(batcher):
89 '''performs the queued calls directly'''
90 def __init__(self, local):
91 batcher.__init__(self)
92 self.local = local
93 def submit(self):
94 for name, args, opts, resref in self.calls:
95 resref.set(getattr(self.local, name)(*args, **opts))
96
97 class remotebatch(batcher):
98 '''batches the queued calls; uses as few roundtrips as possible'''
62 '''batches the queued calls; uses as few roundtrips as possible'''
99 def __init__(self, remote):
63 def __init__(self, remote):
100 '''remote must support _submitbatch(encbatch) and
64 '''remote must support _submitbatch(encbatch) and
101 _submitone(op, encargs)'''
65 _submitone(op, encargs)'''
102 batcher.__init__(self)
66 peer.batcher.__init__(self)
103 self.remote = remote
67 self.remote = remote
104 def submit(self):
68 def submit(self):
105 req, rsp = [], []
69 req, rsp = [], []
@@ -128,41 +92,10 b' class remotebatch(batcher):'
128 encresref.set(encres)
92 encresref.set(encres)
129 resref.set(batchable.next())
93 resref.set(batchable.next())
130
94
131 def batchable(f):
95 # Forward a couple of names from peer to make wireproto interactions
132 '''annotation for batchable methods
96 # slightly more sensible.
133
97 batchable = peer.batchable
134 Such methods must implement a coroutine as follows:
98 future = peer.future
135
136 @batchable
137 def sample(self, one, two=None):
138 # Handle locally computable results first:
139 if not one:
140 yield "a local result", None
141 # Build list of encoded arguments suitable for your wire protocol:
142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 # Create future for injection of encoded result:
144 encresref = future()
145 # Return encoded arguments and future:
146 yield encargs, encresref
147 # Assuming the future to be filled with the result from the batched
148 # request now. Decode it:
149 yield decode(encresref.value)
150
151 The decorator returns a function which wraps this coroutine as a plain
152 method, but adds the original method as an attribute called "batchable",
153 which is used by remotebatch to split the call into separate encoding and
154 decoding phases.
155 '''
156 def plain(*args, **opts):
157 batchable = f(*args, **opts)
158 encargsorres, encresref = batchable.next()
159 if not encresref:
160 return encargsorres # a local result in this case
161 self = args[0]
162 encresref.set(self._submitone(f.func_name, encargsorres))
163 return batchable.next()
164 setattr(plain, 'batchable', f)
165 return plain
166
99
167 # list of nodes encoding / decoding
100 # list of nodes encoding / decoding
168
101
@@ -5,7 +5,8 b''
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from mercurial.wireproto import localbatch, remotebatch, batchable, future
8 from mercurial.peer import localbatch, batchable, future
9 from mercurial.wireproto import remotebatch
9
10
10 # equivalent of repo.repository
11 # equivalent of repo.repository
11 class thing(object):
12 class thing(object):
@@ -12,6 +12,10 b' class proto(object):'
12 class clientpeer(wireproto.wirepeer):
12 class clientpeer(wireproto.wirepeer):
13 def __init__(self, serverrepo):
13 def __init__(self, serverrepo):
14 self.serverrepo = serverrepo
14 self.serverrepo = serverrepo
15
16 def _capabilities(self):
17 return ['batch']
18
15 def _call(self, cmd, **args):
19 def _call(self, cmd, **args):
16 return wireproto.dispatch(self.serverrepo, proto(args), cmd)
20 return wireproto.dispatch(self.serverrepo, proto(args), cmd)
17
21
General Comments 0
You need to be logged in to leave comments. Login now