##// END OF EJS Templates
safehasattr: pass attribute name as string instead of bytes...
marmoute -
r51444:b1fb4185 default
parent child Browse files
Show More
@@ -1,648 +1,648 b''
1 1 # wireprotov1peer.py - Client-side functionality for wire protocol version 1.
2 2 #
3 3 # Copyright 2005-2010 Olivia Mackall <olivia@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import sys
10 10 import weakref
11 11
12 12 from concurrent import futures
13 13 from .i18n import _
14 14 from .node import bin
15 15 from .pycompat import (
16 16 getattr,
17 17 setattr,
18 18 )
19 19 from . import (
20 20 bundle2,
21 21 changegroup as changegroupmod,
22 22 encoding,
23 23 error,
24 24 pushkey as pushkeymod,
25 25 pycompat,
26 26 util,
27 27 wireprototypes,
28 28 )
29 29 from .interfaces import (
30 30 repository,
31 31 util as interfaceutil,
32 32 )
33 33 from .utils import hashutil
34 34
35 35 urlreq = util.urlreq
36 36
37 37
38 38 def batchable(f):
39 39 """annotation for batchable methods
40 40
41 41 Such methods must implement a coroutine as follows:
42 42
43 43 @batchable
44 44 def sample(self, one, two=None):
45 45 # Build list of encoded arguments suitable for your wire protocol:
46 46 encoded_args = [('one', encode(one),), ('two', encode(two),)]
47 47 # Return it, along with a function that will receive the result
48 48 # from the batched request.
49 49 return encoded_args, decode
50 50
51 51 The decorator returns a function which wraps this coroutine as a plain
52 52 method, but adds the original method as an attribute called "batchable",
53 53 which is used by remotebatch to split the call into separate encoding and
54 54 decoding phases.
55 55 """
56 56
57 57 def plain(*args, **opts):
58 58 encoded_args_or_res, decode = f(*args, **opts)
59 59 if not decode:
60 60 return encoded_args_or_res # a local result in this case
61 61 self = args[0]
62 62 cmd = pycompat.bytesurl(f.__name__) # ensure cmd is ascii bytestr
63 63 encoded_res = self._submitone(cmd, encoded_args_or_res)
64 64 return decode(encoded_res)
65 65
66 66 setattr(plain, 'batchable', f)
67 67 setattr(plain, '__name__', f.__name__)
68 68 return plain
69 69
70 70
71 71 def encodebatchcmds(req):
72 72 """Return a ``cmds`` argument value for the ``batch`` command."""
73 73 escapearg = wireprototypes.escapebatcharg
74 74
75 75 cmds = []
76 76 for op, argsdict in req:
77 77 # Old servers didn't properly unescape argument names. So prevent
78 78 # the sending of argument names that may not be decoded properly by
79 79 # servers.
80 80 assert all(escapearg(k) == k for k in argsdict)
81 81
82 82 args = b','.join(
83 83 b'%s=%s' % (escapearg(k), escapearg(v)) for k, v in argsdict.items()
84 84 )
85 85 cmds.append(b'%s %s' % (op, args))
86 86
87 87 return b';'.join(cmds)
88 88
89 89
90 90 class unsentfuture(futures.Future):
91 91 """A Future variation to represent an unsent command.
92 92
93 93 Because we buffer commands and don't submit them immediately, calling
94 94 ``result()`` on an unsent future could deadlock. Futures for buffered
95 95 commands are represented by this type, which wraps ``result()`` to
96 96 call ``sendcommands()``.
97 97 """
98 98
99 99 def result(self, timeout=None):
100 100 if self.done():
101 101 return futures.Future.result(self, timeout)
102 102
103 103 self._peerexecutor.sendcommands()
104 104
105 105 # This looks like it will infinitely recurse. However,
106 106 # sendcommands() should modify __class__. This call serves as a check
107 107 # on that.
108 108 return self.result(timeout)
109 109
110 110
111 111 @interfaceutil.implementer(repository.ipeercommandexecutor)
112 112 class peerexecutor:
113 113 def __init__(self, peer):
114 114 self._peer = peer
115 115 self._sent = False
116 116 self._closed = False
117 117 self._calls = []
118 118 self._futures = weakref.WeakSet()
119 119 self._responseexecutor = None
120 120 self._responsef = None
121 121
122 122 def __enter__(self):
123 123 return self
124 124
125 125 def __exit__(self, exctype, excvalee, exctb):
126 126 self.close()
127 127
128 128 def callcommand(self, command, args):
129 129 if self._sent:
130 130 raise error.ProgrammingError(
131 131 b'callcommand() cannot be used after commands are sent'
132 132 )
133 133
134 134 if self._closed:
135 135 raise error.ProgrammingError(
136 136 b'callcommand() cannot be used after close()'
137 137 )
138 138
139 139 # Commands are dispatched through methods on the peer.
140 140 fn = getattr(self._peer, pycompat.sysstr(command), None)
141 141
142 142 if not fn:
143 143 raise error.ProgrammingError(
144 144 b'cannot call command %s: method of same name not available '
145 145 b'on peer' % command
146 146 )
147 147
148 148 # Commands are either batchable or they aren't. If a command
149 149 # isn't batchable, we send it immediately because the executor
150 150 # can no longer accept new commands after a non-batchable command.
151 151 # If a command is batchable, we queue it for later. But we have
152 152 # to account for the case of a non-batchable command arriving after
153 153 # a batchable one and refuse to service it.
154 154
155 155 def addcall():
156 156 f = futures.Future()
157 157 self._futures.add(f)
158 158 self._calls.append((command, args, fn, f))
159 159 return f
160 160
161 161 if getattr(fn, 'batchable', False):
162 162 f = addcall()
163 163
164 164 # But since we don't issue it immediately, we wrap its result()
165 165 # to trigger sending so we avoid deadlocks.
166 166 f.__class__ = unsentfuture
167 167 f._peerexecutor = self
168 168 else:
169 169 if self._calls:
170 170 raise error.ProgrammingError(
171 171 b'%s is not batchable and cannot be called on a command '
172 172 b'executor along with other commands' % command
173 173 )
174 174
175 175 f = addcall()
176 176
177 177 # Non-batchable commands can never coexist with another command
178 178 # in this executor. So send the command immediately.
179 179 self.sendcommands()
180 180
181 181 return f
182 182
183 183 def sendcommands(self):
184 184 if self._sent:
185 185 return
186 186
187 187 if not self._calls:
188 188 return
189 189
190 190 self._sent = True
191 191
192 192 # Unhack any future types so caller seens a clean type and to break
193 193 # cycle between us and futures.
194 194 for f in self._futures:
195 195 if isinstance(f, unsentfuture):
196 196 f.__class__ = futures.Future
197 197 f._peerexecutor = None
198 198
199 199 calls = self._calls
200 200 # Mainly to destroy references to futures.
201 201 self._calls = None
202 202
203 203 # Simple case of a single command. We call it synchronously.
204 204 if len(calls) == 1:
205 205 command, args, fn, f = calls[0]
206 206
207 207 # Future was cancelled. Ignore it.
208 208 if not f.set_running_or_notify_cancel():
209 209 return
210 210
211 211 try:
212 212 result = fn(**pycompat.strkwargs(args))
213 213 except Exception:
214 214 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
215 215 else:
216 216 f.set_result(result)
217 217
218 218 return
219 219
220 220 # Batch commands are a bit harder. First, we have to deal with the
221 221 # @batchable coroutine. That's a bit annoying. Furthermore, we also
222 222 # need to preserve streaming. i.e. it should be possible for the
223 223 # futures to resolve as data is coming in off the wire without having
224 224 # to wait for the final byte of the final response. We do this by
225 225 # spinning up a thread to read the responses.
226 226
227 227 requests = []
228 228 states = []
229 229
230 230 for command, args, fn, f in calls:
231 231 # Future was cancelled. Ignore it.
232 232 if not f.set_running_or_notify_cancel():
233 233 continue
234 234
235 235 try:
236 236 encoded_args_or_res, decode = fn.batchable(
237 237 fn.__self__, **pycompat.strkwargs(args)
238 238 )
239 239 except Exception:
240 240 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
241 241 return
242 242
243 243 if not decode:
244 244 f.set_result(encoded_args_or_res)
245 245 else:
246 246 requests.append((command, encoded_args_or_res))
247 247 states.append((command, f, batchable, decode))
248 248
249 249 if not requests:
250 250 return
251 251
252 252 # This will emit responses in order they were executed.
253 253 wireresults = self._peer._submitbatch(requests)
254 254
255 255 # The use of a thread pool executor here is a bit weird for something
256 256 # that only spins up a single thread. However, thread management is
257 257 # hard and it is easy to encounter race conditions, deadlocks, etc.
258 258 # concurrent.futures already solves these problems and its thread pool
259 259 # executor has minimal overhead. So we use it.
260 260 self._responseexecutor = futures.ThreadPoolExecutor(1)
261 261 self._responsef = self._responseexecutor.submit(
262 262 self._readbatchresponse, states, wireresults
263 263 )
264 264
265 265 def close(self):
266 266 self.sendcommands()
267 267
268 268 if self._closed:
269 269 return
270 270
271 271 self._closed = True
272 272
273 273 if not self._responsef:
274 274 return
275 275
276 276 # We need to wait on our in-flight response and then shut down the
277 277 # executor once we have a result.
278 278 try:
279 279 self._responsef.result()
280 280 finally:
281 281 self._responseexecutor.shutdown(wait=True)
282 282 self._responsef = None
283 283 self._responseexecutor = None
284 284
285 285 # If any of our futures are still in progress, mark them as
286 286 # errored. Otherwise a result() could wait indefinitely.
287 287 for f in self._futures:
288 288 if not f.done():
289 289 f.set_exception(
290 290 error.ResponseError(
291 291 _(b'unfulfilled batch command response'), None
292 292 )
293 293 )
294 294
295 295 self._futures = None
296 296
297 297 def _readbatchresponse(self, states, wireresults):
298 298 # Executes in a thread to read data off the wire.
299 299
300 300 for command, f, batchable, decode in states:
301 301 # Grab raw result off the wire and teach the internal future
302 302 # about it.
303 303 try:
304 304 remoteresult = next(wireresults)
305 305 except StopIteration:
306 306 # This can happen in particular because next(batchable)
307 307 # in the previous iteration can call peer._abort, which
308 308 # may close the peer.
309 309 f.set_exception(
310 310 error.ResponseError(
311 311 _(b'unfulfilled batch command response'), None
312 312 )
313 313 )
314 314 else:
315 315 try:
316 316 result = decode(remoteresult)
317 317 except Exception:
318 318 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
319 319 else:
320 320 f.set_result(result)
321 321
322 322
323 323 @interfaceutil.implementer(
324 324 repository.ipeercommands, repository.ipeerlegacycommands
325 325 )
326 326 class wirepeer(repository.peer):
327 327 """Client-side interface for communicating with a peer repository.
328 328
329 329 Methods commonly call wire protocol commands of the same name.
330 330
331 331 See also httppeer.py and sshpeer.py for protocol-specific
332 332 implementations of this interface.
333 333 """
334 334
335 335 def commandexecutor(self):
336 336 return peerexecutor(self)
337 337
338 338 # Begin of ipeercommands interface.
339 339
340 340 def clonebundles(self):
341 341 self.requirecap(b'clonebundles', _(b'clone bundles'))
342 342 return self._call(b'clonebundles')
343 343
344 344 @batchable
345 345 def lookup(self, key):
346 346 self.requirecap(b'lookup', _(b'look up remote revision'))
347 347
348 348 def decode(d):
349 349 success, data = d[:-1].split(b" ", 1)
350 350 if int(success):
351 351 return bin(data)
352 352 else:
353 353 self._abort(error.RepoError(data))
354 354
355 355 return {b'key': encoding.fromlocal(key)}, decode
356 356
357 357 @batchable
358 358 def heads(self):
359 359 def decode(d):
360 360 try:
361 361 return wireprototypes.decodelist(d[:-1])
362 362 except ValueError:
363 363 self._abort(error.ResponseError(_(b"unexpected response:"), d))
364 364
365 365 return {}, decode
366 366
367 367 @batchable
368 368 def known(self, nodes):
369 369 def decode(d):
370 370 try:
371 371 return [bool(int(b)) for b in pycompat.iterbytestr(d)]
372 372 except ValueError:
373 373 self._abort(error.ResponseError(_(b"unexpected response:"), d))
374 374
375 375 return {b'nodes': wireprototypes.encodelist(nodes)}, decode
376 376
377 377 @batchable
378 378 def branchmap(self):
379 379 def decode(d):
380 380 try:
381 381 branchmap = {}
382 382 for branchpart in d.splitlines():
383 383 branchname, branchheads = branchpart.split(b' ', 1)
384 384 branchname = encoding.tolocal(urlreq.unquote(branchname))
385 385 branchheads = wireprototypes.decodelist(branchheads)
386 386 branchmap[branchname] = branchheads
387 387 return branchmap
388 388 except TypeError:
389 389 self._abort(error.ResponseError(_(b"unexpected response:"), d))
390 390
391 391 return {}, decode
392 392
393 393 @batchable
394 394 def listkeys(self, namespace):
395 395 if not self.capable(b'pushkey'):
396 396 return {}, None
397 397 self.ui.debug(b'preparing listkeys for "%s"\n' % namespace)
398 398
399 399 def decode(d):
400 400 self.ui.debug(
401 401 b'received listkey for "%s": %i bytes\n' % (namespace, len(d))
402 402 )
403 403 return pushkeymod.decodekeys(d)
404 404
405 405 return {b'namespace': encoding.fromlocal(namespace)}, decode
406 406
407 407 @batchable
408 408 def pushkey(self, namespace, key, old, new):
409 409 if not self.capable(b'pushkey'):
410 410 return False, None
411 411 self.ui.debug(b'preparing pushkey for "%s:%s"\n' % (namespace, key))
412 412
413 413 def decode(d):
414 414 d, output = d.split(b'\n', 1)
415 415 try:
416 416 d = bool(int(d))
417 417 except ValueError:
418 418 raise error.ResponseError(
419 419 _(b'push failed (unexpected response):'), d
420 420 )
421 421 for l in output.splitlines(True):
422 422 self.ui.status(_(b'remote: '), l)
423 423 return d
424 424
425 425 return {
426 426 b'namespace': encoding.fromlocal(namespace),
427 427 b'key': encoding.fromlocal(key),
428 428 b'old': encoding.fromlocal(old),
429 429 b'new': encoding.fromlocal(new),
430 430 }, decode
431 431
432 432 def stream_out(self):
433 433 return self._callstream(b'stream_out')
434 434
435 435 def getbundle(self, source, **kwargs):
436 436 kwargs = pycompat.byteskwargs(kwargs)
437 437 self.requirecap(b'getbundle', _(b'look up remote changes'))
438 438 opts = {}
439 439 bundlecaps = kwargs.get(b'bundlecaps') or set()
440 440 for key, value in kwargs.items():
441 441 if value is None:
442 442 continue
443 443 keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
444 444 if keytype is None:
445 445 raise error.ProgrammingError(
446 446 b'Unexpectedly None keytype for key %s' % key
447 447 )
448 448 elif keytype == b'nodes':
449 449 value = wireprototypes.encodelist(value)
450 450 elif keytype == b'csv':
451 451 value = b','.join(value)
452 452 elif keytype == b'scsv':
453 453 value = b','.join(sorted(value))
454 454 elif keytype == b'boolean':
455 455 value = b'%i' % bool(value)
456 456 elif keytype != b'plain':
457 457 raise KeyError(b'unknown getbundle option type %s' % keytype)
458 458 opts[key] = value
459 459 f = self._callcompressable(b"getbundle", **pycompat.strkwargs(opts))
460 460 if any((cap.startswith(b'HG2') for cap in bundlecaps)):
461 461 return bundle2.getunbundler(self.ui, f)
462 462 else:
463 463 return changegroupmod.cg1unpacker(f, b'UN')
464 464
465 465 def unbundle(self, bundle, heads, url):
466 466 """Send cg (a readable file-like object representing the
467 467 changegroup to push, typically a chunkbuffer object) to the
468 468 remote server as a bundle.
469 469
470 470 When pushing a bundle10 stream, return an integer indicating the
471 471 result of the push (see changegroup.apply()).
472 472
473 473 When pushing a bundle20 stream, return a bundle20 stream.
474 474
475 475 `url` is the url the client thinks it's pushing to, which is
476 476 visible to hooks.
477 477 """
478 478
479 479 if heads != [b'force'] and self.capable(b'unbundlehash'):
480 480 heads = wireprototypes.encodelist(
481 481 [b'hashed', hashutil.sha1(b''.join(sorted(heads))).digest()]
482 482 )
483 483 else:
484 484 heads = wireprototypes.encodelist(heads)
485 485
486 if util.safehasattr(bundle, b'deltaheader'):
486 if util.safehasattr(bundle, 'deltaheader'):
487 487 # this a bundle10, do the old style call sequence
488 488 ret, output = self._callpush(b"unbundle", bundle, heads=heads)
489 489 if ret == b"":
490 490 raise error.ResponseError(_(b'push failed:'), output)
491 491 try:
492 492 ret = int(ret)
493 493 except ValueError:
494 494 raise error.ResponseError(
495 495 _(b'push failed (unexpected response):'), ret
496 496 )
497 497
498 498 for l in output.splitlines(True):
499 499 self.ui.status(_(b'remote: '), l)
500 500 else:
501 501 # bundle2 push. Send a stream, fetch a stream.
502 502 stream = self._calltwowaystream(b'unbundle', bundle, heads=heads)
503 503 ret = bundle2.getunbundler(self.ui, stream)
504 504 return ret
505 505
506 506 # End of ipeercommands interface.
507 507
508 508 # Begin of ipeerlegacycommands interface.
509 509
510 510 def branches(self, nodes):
511 511 n = wireprototypes.encodelist(nodes)
512 512 d = self._call(b"branches", nodes=n)
513 513 try:
514 514 br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
515 515 return br
516 516 except ValueError:
517 517 self._abort(error.ResponseError(_(b"unexpected response:"), d))
518 518
519 519 def between(self, pairs):
520 520 batch = 8 # avoid giant requests
521 521 r = []
522 522 for i in range(0, len(pairs), batch):
523 523 n = b" ".join(
524 524 [
525 525 wireprototypes.encodelist(p, b'-')
526 526 for p in pairs[i : i + batch]
527 527 ]
528 528 )
529 529 d = self._call(b"between", pairs=n)
530 530 try:
531 531 r.extend(
532 532 l and wireprototypes.decodelist(l) or []
533 533 for l in d.splitlines()
534 534 )
535 535 except ValueError:
536 536 self._abort(error.ResponseError(_(b"unexpected response:"), d))
537 537 return r
538 538
539 539 def changegroup(self, nodes, source):
540 540 n = wireprototypes.encodelist(nodes)
541 541 f = self._callcompressable(b"changegroup", roots=n)
542 542 return changegroupmod.cg1unpacker(f, b'UN')
543 543
544 544 def changegroupsubset(self, bases, heads, source):
545 545 self.requirecap(b'changegroupsubset', _(b'look up remote changes'))
546 546 bases = wireprototypes.encodelist(bases)
547 547 heads = wireprototypes.encodelist(heads)
548 548 f = self._callcompressable(
549 549 b"changegroupsubset", bases=bases, heads=heads
550 550 )
551 551 return changegroupmod.cg1unpacker(f, b'UN')
552 552
553 553 # End of ipeerlegacycommands interface.
554 554
555 555 def _submitbatch(self, req):
556 556 """run batch request <req> on the server
557 557
558 558 Returns an iterator of the raw responses from the server.
559 559 """
560 560 ui = self.ui
561 561 if ui.debugflag and ui.configbool(b'devel', b'debug.peer-request'):
562 562 ui.debug(b'devel-peer-request: batched-content\n')
563 563 for op, args in req:
564 564 msg = b'devel-peer-request: - %s (%d arguments)\n'
565 565 ui.debug(msg % (op, len(args)))
566 566
567 567 unescapearg = wireprototypes.unescapebatcharg
568 568
569 569 rsp = self._callstream(b"batch", cmds=encodebatchcmds(req))
570 570 chunk = rsp.read(1024)
571 571 work = [chunk]
572 572 while chunk:
573 573 while b';' not in chunk and chunk:
574 574 chunk = rsp.read(1024)
575 575 work.append(chunk)
576 576 merged = b''.join(work)
577 577 while b';' in merged:
578 578 one, merged = merged.split(b';', 1)
579 579 yield unescapearg(one)
580 580 chunk = rsp.read(1024)
581 581 work = [merged, chunk]
582 582 yield unescapearg(b''.join(work))
583 583
584 584 def _submitone(self, op, args):
585 585 return self._call(op, **pycompat.strkwargs(args))
586 586
587 587 def debugwireargs(self, one, two, three=None, four=None, five=None):
588 588 # don't pass optional arguments left at their default value
589 589 opts = {}
590 590 if three is not None:
591 591 opts['three'] = three
592 592 if four is not None:
593 593 opts['four'] = four
594 594 return self._call(b'debugwireargs', one=one, two=two, **opts)
595 595
596 596 def _call(self, cmd, **args):
597 597 """execute <cmd> on the server
598 598
599 599 The command is expected to return a simple string.
600 600
601 601 returns the server reply as a string."""
602 602 raise NotImplementedError()
603 603
604 604 def _callstream(self, cmd, **args):
605 605 """execute <cmd> on the server
606 606
607 607 The command is expected to return a stream. Note that if the
608 608 command doesn't return a stream, _callstream behaves
609 609 differently for ssh and http peers.
610 610
611 611 returns the server reply as a file like object.
612 612 """
613 613 raise NotImplementedError()
614 614
615 615 def _callcompressable(self, cmd, **args):
616 616 """execute <cmd> on the server
617 617
618 618 The command is expected to return a stream.
619 619
620 620 The stream may have been compressed in some implementations. This
621 621 function takes care of the decompression. This is the only difference
622 622 with _callstream.
623 623
624 624 returns the server reply as a file like object.
625 625 """
626 626 raise NotImplementedError()
627 627
628 628 def _callpush(self, cmd, fp, **args):
629 629 """execute a <cmd> on server
630 630
631 631 The command is expected to be related to a push. Push has a special
632 632 return method.
633 633
634 634 returns the server reply as a (ret, output) tuple. ret is either
635 635 empty (error) or a stringified int.
636 636 """
637 637 raise NotImplementedError()
638 638
639 639 def _calltwowaystream(self, cmd, fp, **args):
640 640 """execute <cmd> on server
641 641
642 642 The command will send a stream to the server and get a stream in reply.
643 643 """
644 644 raise NotImplementedError()
645 645
646 646 def _abort(self, exception):
647 647 """clearly abort the wire protocol connection and raise the exception"""
648 648 raise NotImplementedError()
General Comments 0
You need to be logged in to leave comments. Login now