##// END OF EJS Templates
wireprotov1peer: update all rpcs to use the new batchable scheme...
Valentin Gatien-Baron -
r48687:c424ff48 default
parent child Browse files
Show More
@@ -140,12 +140,10 def peersetup(ui, peer):
140 140 def getannotate(self, path, lastnode=None):
141 141 if not self.capable(b'getannotate'):
142 142 ui.warn(_(b'remote peer cannot provide annotate cache\n'))
143 yield None, None
143 return None, None
144 144 else:
145 145 args = {b'path': path, b'lastnode': lastnode or b''}
146 f = wireprotov1peer.future()
147 yield args, f
148 yield _parseresponse(f.value)
146 return args, _parseresponse
149 147
150 148 peer.__class__ = fastannotatepeer
151 149
@@ -431,18 +431,19 def localrepolistkeys(orig, self, namesp
431 431 @wireprotov1peer.batchable
432 432 def listkeyspatterns(self, namespace, patterns):
433 433 if not self.capable(b'pushkey'):
434 yield {}, None
435 f = wireprotov1peer.future()
434 return {}, None
436 435 self.ui.debug(b'preparing listkeys for "%s"\n' % namespace)
437 yield {
436
437 def decode(d):
438 self.ui.debug(
439 b'received listkey for "%s": %i bytes\n' % (namespace, len(d))
440 )
441 return pushkey.decodekeys(d)
442
443 return {
438 444 b'namespace': encoding.fromlocal(namespace),
439 445 b'patterns': wireprototypes.encodelist(patterns),
440 }, f
441 d = f.value
442 self.ui.debug(
443 b'received listkey for "%s": %i bytes\n' % (namespace, len(d))
444 )
445 yield pushkey.decodekeys(d)
446 }, decode
446 447
447 448
448 449 def _readbundlerevs(bundlerepo):
@@ -184,17 +184,18 def wirereposetup(ui, repo):
184 184
185 185 @wireprotov1peer.batchable
186 186 def statlfile(self, sha):
187 f = wireprotov1peer.future()
187 def decode(d):
188 try:
189 return int(d)
190 except (ValueError, urlerr.httperror):
191 # If the server returns anything but an integer followed by a
192 # newline, newline, it's not speaking our language; if we get
193 # an HTTP error, we can't be sure the largefile is present;
194 # either way, consider it missing.
195 return 2
196
188 197 result = {b'sha': sha}
189 yield result, f
190 try:
191 yield int(f.value)
192 except (ValueError, urlerr.httperror):
193 # If the server returns anything but an integer followed by a
194 # newline, newline, it's not speaking our language; if we get
195 # an HTTP error, we can't be sure the largefile is present;
196 # either way, consider it missing.
197 yield 2
198 return result, decode
198 199
199 200 repo.__class__ = lfileswirerepository
200 201
@@ -63,12 +63,14 def peersetup(ui, peer):
63 63 raise error.Abort(
64 64 b'configured remotefile server does not support getfile'
65 65 )
66 f = wireprotov1peer.future()
67 yield {b'file': file, b'node': node}, f
68 code, data = f.value.split(b'\0', 1)
69 if int(code):
70 raise error.LookupError(file, node, data)
71 yield data
66
67 def decode(d):
68 code, data = d.split(b'\0', 1)
69 if int(code):
70 raise error.LookupError(file, node, data)
71 return data
72
73 return {b'file': file, b'node': node}, decode
72 74
73 75 @wireprotov1peer.batchable
74 76 def x_rfl_getflogheads(self, path):
@@ -77,10 +79,11 def peersetup(ui, peer):
77 79 b'configured remotefile server does not '
78 80 b'support getflogheads'
79 81 )
80 f = wireprotov1peer.future()
81 yield {b'path': path}, f
82 heads = f.value.split(b'\n') if f.value else []
83 yield heads
82
83 def decode(d):
84 return d.split(b'\n') if d else []
85
86 return {b'path': path}, decode
84 87
85 88 def _updatecallstreamopts(self, command, opts):
86 89 if command != b'getbundle':
@@ -35,7 +35,7 from .utils import hashutil
35 35 urlreq = util.urlreq
36 36
37 37
38 def batchable_new_style(f):
38 def batchable(f):
39 39 """annotation for batchable methods
40 40
41 41 Such methods must implement a coroutine as follows:
@@ -68,33 +68,6 def batchable_new_style(f):
68 68 return plain
69 69
70 70
71 def batchable(f):
72 def upgraded(*args, **opts):
73 batchable = f(*args, **opts)
74 encoded_args_or_res, encoded_res_future = next(batchable)
75 if not encoded_res_future:
76 decode = None
77 else:
78
79 def decode(d):
80 encoded_res_future.set(d)
81 return next(batchable)
82
83 return encoded_args_or_res, decode
84
85 setattr(upgraded, '__name__', f.__name__)
86 return batchable_new_style(upgraded)
87
88
89 class future(object):
90 '''placeholder for a value to be set later'''
91
92 def set(self, value):
93 if util.safehasattr(self, b'value'):
94 raise error.RepoError(b"future is already set")
95 self.value = value
96
97
98 71 def encodebatchcmds(req):
99 72 """Return a ``cmds`` argument value for the ``batch`` command."""
100 73 escapearg = wireprototypes.escapebatcharg
@@ -372,87 +345,90 class wirepeer(repository.peer):
372 345 @batchable
373 346 def lookup(self, key):
374 347 self.requirecap(b'lookup', _(b'look up remote revision'))
375 f = future()
376 yield {b'key': encoding.fromlocal(key)}, f
377 d = f.value
378 success, data = d[:-1].split(b" ", 1)
379 if int(success):
380 yield bin(data)
381 else:
382 self._abort(error.RepoError(data))
348
349 def decode(d):
350 success, data = d[:-1].split(b" ", 1)
351 if int(success):
352 return bin(data)
353 else:
354 self._abort(error.RepoError(data))
355
356 return {b'key': encoding.fromlocal(key)}, decode
383 357
384 358 @batchable
385 359 def heads(self):
386 f = future()
387 yield {}, f
388 d = f.value
389 try:
390 yield wireprototypes.decodelist(d[:-1])
391 except ValueError:
392 self._abort(error.ResponseError(_(b"unexpected response:"), d))
360 def decode(d):
361 try:
362 return wireprototypes.decodelist(d[:-1])
363 except ValueError:
364 self._abort(error.ResponseError(_(b"unexpected response:"), d))
365
366 return {}, decode
393 367
394 368 @batchable
395 369 def known(self, nodes):
396 f = future()
397 yield {b'nodes': wireprototypes.encodelist(nodes)}, f
398 d = f.value
399 try:
400 yield [bool(int(b)) for b in pycompat.iterbytestr(d)]
401 except ValueError:
402 self._abort(error.ResponseError(_(b"unexpected response:"), d))
370 def decode(d):
371 try:
372 return [bool(int(b)) for b in pycompat.iterbytestr(d)]
373 except ValueError:
374 self._abort(error.ResponseError(_(b"unexpected response:"), d))
375
376 return {b'nodes': wireprototypes.encodelist(nodes)}, decode
403 377
404 378 @batchable
405 379 def branchmap(self):
406 f = future()
407 yield {}, f
408 d = f.value
409 try:
410 branchmap = {}
411 for branchpart in d.splitlines():
412 branchname, branchheads = branchpart.split(b' ', 1)
413 branchname = encoding.tolocal(urlreq.unquote(branchname))
414 branchheads = wireprototypes.decodelist(branchheads)
415 branchmap[branchname] = branchheads
416 yield branchmap
417 except TypeError:
418 self._abort(error.ResponseError(_(b"unexpected response:"), d))
380 def decode(d):
381 try:
382 branchmap = {}
383 for branchpart in d.splitlines():
384 branchname, branchheads = branchpart.split(b' ', 1)
385 branchname = encoding.tolocal(urlreq.unquote(branchname))
386 branchheads = wireprototypes.decodelist(branchheads)
387 branchmap[branchname] = branchheads
388 return branchmap
389 except TypeError:
390 self._abort(error.ResponseError(_(b"unexpected response:"), d))
391
392 return {}, decode
419 393
420 394 @batchable
421 395 def listkeys(self, namespace):
422 396 if not self.capable(b'pushkey'):
423 yield {}, None
424 f = future()
397 return {}, None
425 398 self.ui.debug(b'preparing listkeys for "%s"\n' % namespace)
426 yield {b'namespace': encoding.fromlocal(namespace)}, f
427 d = f.value
428 self.ui.debug(
429 b'received listkey for "%s": %i bytes\n' % (namespace, len(d))
430 )
431 yield pushkeymod.decodekeys(d)
399
400 def decode(d):
401 self.ui.debug(
402 b'received listkey for "%s": %i bytes\n' % (namespace, len(d))
403 )
404 return pushkeymod.decodekeys(d)
405
406 return {b'namespace': encoding.fromlocal(namespace)}, decode
432 407
433 408 @batchable
434 409 def pushkey(self, namespace, key, old, new):
435 410 if not self.capable(b'pushkey'):
436 yield False, None
437 f = future()
411 return False, None
438 412 self.ui.debug(b'preparing pushkey for "%s:%s"\n' % (namespace, key))
439 yield {
413
414 def decode(d):
415 d, output = d.split(b'\n', 1)
416 try:
417 d = bool(int(d))
418 except ValueError:
419 raise error.ResponseError(
420 _(b'push failed (unexpected response):'), d
421 )
422 for l in output.splitlines(True):
423 self.ui.status(_(b'remote: '), l)
424 return d
425
426 return {
440 427 b'namespace': encoding.fromlocal(namespace),
441 428 b'key': encoding.fromlocal(key),
442 429 b'old': encoding.fromlocal(old),
443 430 b'new': encoding.fromlocal(new),
444 }, f
445 d = f.value
446 d, output = d.split(b'\n', 1)
447 try:
448 d = bool(int(d))
449 except ValueError:
450 raise error.ResponseError(
451 _(b'push failed (unexpected response):'), d
452 )
453 for l in output.splitlines(True):
454 self.ui.status(_(b'remote: '), l)
455 yield d
431 }, decode
456 432
457 433 def stream_out(self):
458 434 return self._callstream(b'stream_out')
@@ -214,14 +214,11 class remotething(thing):
214 214 mangle(two),
215 215 ),
216 216 ]
217 encoded_res_future = wireprotov1peer.future()
218 yield encoded_args, encoded_res_future
219 yield unmangle(encoded_res_future.value)
217 return encoded_args, unmangle
220 218
221 219 @wireprotov1peer.batchable
222 220 def bar(self, b, a):
223 encresref = wireprotov1peer.future()
224 yield [
221 return [
225 222 (
226 223 b'b',
227 224 mangle(b),
@@ -230,8 +227,7 class remotething(thing):
230 227 b'a',
231 228 mangle(a),
232 229 ),
233 ], encresref
234 yield unmangle(encresref.value)
230 ], unmangle
235 231
236 232 # greet is coded directly. It therefore does not support batching. If it
237 233 # does appear in a batch, the batch is split around greet, and the call to
@@ -75,9 +75,7 class clientpeer(wireprotov1peer.wirepee
75 75
76 76 @wireprotov1peer.batchable
77 77 def greet(self, name):
78 f = wireprotov1peer.future()
79 yield {b'name': mangle(name)}, f
80 yield unmangle(f.value)
78 return {b'name': mangle(name)}, unmangle
81 79
82 80
83 81 class serverrepo(object):
General Comments 0
You need to be logged in to leave comments. Login now