##// END OF EJS Templates
wireprotov1peer: simplify the way batchable rpcs are defined...
Valentin Gatien-Baron -
r48686:cdad6560 default
parent child Browse files
Show More
@@ -35,7 +35,7 b' from .utils import hashutil'
35 urlreq = util.urlreq
35 urlreq = util.urlreq
36
36
37
37
38 def batchable(f):
38 def batchable_new_style(f):
39 """annotation for batchable methods
39 """annotation for batchable methods
40
40
41 Such methods must implement a coroutine as follows:
41 Such methods must implement a coroutine as follows:
@@ -44,13 +44,9 b' def batchable(f):'
44 def sample(self, one, two=None):
44 def sample(self, one, two=None):
45 # Build list of encoded arguments suitable for your wire protocol:
45 # Build list of encoded arguments suitable for your wire protocol:
46 encoded_args = [('one', encode(one),), ('two', encode(two),)]
46 encoded_args = [('one', encode(one),), ('two', encode(two),)]
47 # Create future for injection of encoded result:
47 # Return it, along with a function that will receive the result
48 encoded_res_future = future()
48 # from the batched request.
49 # Return encoded arguments and future:
49 return encoded_args, decode
50 yield encoded_args, encoded_res_future
51 # Assuming the future to be filled with the result from the batched
52 # request now. Decode it:
53 yield decode(encoded_res_future.value)
54
50
55 The decorator returns a function which wraps this coroutine as a plain
51 The decorator returns a function which wraps this coroutine as a plain
56 method, but adds the original method as an attribute called "batchable",
52 method, but adds the original method as an attribute called "batchable",
@@ -59,20 +55,37 b' def batchable(f):'
59 """
55 """
60
56
61 def plain(*args, **opts):
57 def plain(*args, **opts):
62 batchable = f(*args, **opts)
58 encoded_args_or_res, decode = f(*args, **opts)
63 encoded_args_or_res, encoded_res_future = next(batchable)
59 if not decode:
64 if not encoded_res_future:
65 return encoded_args_or_res # a local result in this case
60 return encoded_args_or_res # a local result in this case
66 self = args[0]
61 self = args[0]
67 cmd = pycompat.bytesurl(f.__name__) # ensure cmd is ascii bytestr
62 cmd = pycompat.bytesurl(f.__name__) # ensure cmd is ascii bytestr
68 encoded_res_future.set(self._submitone(cmd, encoded_args_or_res))
63 encoded_res = self._submitone(cmd, encoded_args_or_res)
69 return next(batchable)
64 return decode(encoded_res)
70
65
71 setattr(plain, 'batchable', f)
66 setattr(plain, 'batchable', f)
72 setattr(plain, '__name__', f.__name__)
67 setattr(plain, '__name__', f.__name__)
73 return plain
68 return plain
74
69
75
70
71 def batchable(f):
72 def upgraded(*args, **opts):
73 batchable = f(*args, **opts)
74 encoded_args_or_res, encoded_res_future = next(batchable)
75 if not encoded_res_future:
76 decode = None
77 else:
78
79 def decode(d):
80 encoded_res_future.set(d)
81 return next(batchable)
82
83 return encoded_args_or_res, decode
84
85 setattr(upgraded, '__name__', f.__name__)
86 return batchable_new_style(upgraded)
87
88
76 class future(object):
89 class future(object):
77 '''placeholder for a value to be set later'''
90 '''placeholder for a value to be set later'''
78
91
@@ -248,25 +261,18 b' class peerexecutor(object):'
248 continue
261 continue
249
262
250 try:
263 try:
251 batchable = fn.batchable(
264 encoded_args_or_res, decode = fn.batchable(
252 fn.__self__, **pycompat.strkwargs(args)
265 fn.__self__, **pycompat.strkwargs(args)
253 )
266 )
254 except Exception:
267 except Exception:
255 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
268 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
256 return
269 return
257
270
258 # Encoded arguments and future holding remote result.
271 if not decode:
259 try:
260 encoded_args_or_res, fremote = next(batchable)
261 except Exception:
262 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
263 return
264
265 if not fremote:
266 f.set_result(encoded_args_or_res)
272 f.set_result(encoded_args_or_res)
267 else:
273 else:
268 requests.append((command, encoded_args_or_res))
274 requests.append((command, encoded_args_or_res))
269 states.append((command, f, batchable, fremote))
275 states.append((command, f, batchable, decode))
270
276
271 if not requests:
277 if not requests:
272 return
278 return
@@ -319,7 +325,7 b' class peerexecutor(object):'
319 def _readbatchresponse(self, states, wireresults):
325 def _readbatchresponse(self, states, wireresults):
320 # Executes in a thread to read data off the wire.
326 # Executes in a thread to read data off the wire.
321
327
322 for command, f, batchable, fremote in states:
328 for command, f, batchable, decode in states:
323 # Grab raw result off the wire and teach the internal future
329 # Grab raw result off the wire and teach the internal future
324 # about it.
330 # about it.
325 try:
331 try:
@@ -334,11 +340,8 b' class peerexecutor(object):'
334 )
340 )
335 )
341 )
336 else:
342 else:
337 fremote.set(remoteresult)
338
339 # And ask the coroutine to decode that value.
340 try:
343 try:
341 result = next(batchable)
344 result = decode(remoteresult)
342 except Exception:
345 except Exception:
343 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
346 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
344 else:
347 else:
General Comments 0
You need to be logged in to leave comments. Login now