Show More
@@ -13,7 +13,9 b' import io' | |||
|
13 | 13 | import os |
|
14 | 14 | import socket |
|
15 | 15 | import struct |
|
16 | import sys | |
|
16 | 17 | import tempfile |
|
18 | import weakref | |
|
17 | 19 | |
|
18 | 20 | from .i18n import _ |
|
19 | 21 | from .thirdparty import ( |
@@ -31,7 +33,6 b' from . import (' | |||
|
31 | 33 | statichttprepo, |
|
32 | 34 | url as urlmod, |
|
33 | 35 | util, |
|
34 | wireproto, | |
|
35 | 36 | wireprotoframing, |
|
36 | 37 | wireprototypes, |
|
37 | 38 | wireprotov1peer, |
@@ -517,8 +518,262 b' class httppeer(wireprotov1peer.wirepeer)' | |||
|
517 | 518 | def _abort(self, exception): |
|
518 | 519 | raise exception |
|
519 | 520 | |
|
521 | def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests): | |
|
522 | reactor = wireprotoframing.clientreactor(hasmultiplesend=False, | |
|
523 | buffersends=True) | |
|
524 | ||
|
525 | url = '%s/%s' % (apiurl, permission) | |
|
526 | ||
|
527 | if len(requests) > 1: | |
|
528 | url += '/multirequest' | |
|
529 | else: | |
|
530 | url += '/%s' % requests[0][0] | |
|
531 | ||
|
532 | # Request ID to (request, future) | |
|
533 | requestmap = {} | |
|
534 | ||
|
535 | for command, args, f in requests: | |
|
536 | request, action, meta = reactor.callcommand(command, args) | |
|
537 | assert action == 'noop' | |
|
538 | ||
|
539 | requestmap[request.requestid] = (request, f) | |
|
540 | ||
|
541 | action, meta = reactor.flushcommands() | |
|
542 | assert action == 'sendframes' | |
|
543 | ||
|
544 | # TODO stream this. | |
|
545 | body = b''.join(map(bytes, meta['framegen'])) | |
|
546 | ||
|
547 | # TODO modify user-agent to reflect v2 | |
|
548 | headers = { | |
|
549 | r'Accept': wireprotov2server.FRAMINGTYPE, | |
|
550 | r'Content-Type': wireprotov2server.FRAMINGTYPE, | |
|
551 | } | |
|
552 | ||
|
553 | req = requestbuilder(pycompat.strurl(url), body, headers) | |
|
554 | req.add_unredirected_header(r'Content-Length', r'%d' % len(body)) | |
|
555 | ||
|
556 | try: | |
|
557 | res = opener.open(req) | |
|
558 | except urlerr.httperror as e: | |
|
559 | if e.code == 401: | |
|
560 | raise error.Abort(_('authorization failed')) | |
|
561 | ||
|
562 | raise | |
|
563 | except httplib.HTTPException as e: | |
|
564 | ui.traceback() | |
|
565 | raise IOError(None, e) | |
|
566 | ||
|
567 | return reactor, requestmap, res | |
|
568 | ||
|
569 | class queuedcommandfuture(pycompat.futures.Future): | |
|
570 | """Wraps result() on command futures to trigger submission on call.""" | |
|
571 | ||
|
572 | def result(self, timeout=None): | |
|
573 | if self.done(): | |
|
574 | return pycompat.futures.Future.result(self, timeout) | |
|
575 | ||
|
576 | self._peerexecutor.sendcommands() | |
|
577 | ||
|
578 | # sendcommands() will restore the original __class__ and self.result | |
|
579 | # will resolve to Future.result. | |
|
580 | return self.result(timeout) | |
|
581 | ||
|
582 | @zi.implementer(repository.ipeercommandexecutor) | |
|
583 | class httpv2executor(object): | |
|
584 | def __init__(self, ui, opener, requestbuilder, apiurl, descriptor): | |
|
585 | self._ui = ui | |
|
586 | self._opener = opener | |
|
587 | self._requestbuilder = requestbuilder | |
|
588 | self._apiurl = apiurl | |
|
589 | self._descriptor = descriptor | |
|
590 | self._sent = False | |
|
591 | self._closed = False | |
|
592 | self._neededpermissions = set() | |
|
593 | self._calls = [] | |
|
594 | self._futures = weakref.WeakSet() | |
|
595 | self._responseexecutor = None | |
|
596 | self._responsef = None | |
|
597 | ||
|
598 | def __enter__(self): | |
|
599 | return self | |
|
600 | ||
|
601 | def __exit__(self, exctype, excvalue, exctb): | |
|
602 | self.close() | |
|
603 | ||
|
604 | def callcommand(self, command, args): | |
|
605 | if self._sent: | |
|
606 | raise error.ProgrammingError('callcommand() cannot be used after ' | |
|
607 | 'commands are sent') | |
|
608 | ||
|
609 | if self._closed: | |
|
610 | raise error.ProgrammingError('callcommand() cannot be used after ' | |
|
611 | 'close()') | |
|
612 | ||
|
613 | # The service advertises which commands are available. So if we attempt | |
|
614 | # to call an unknown command or pass an unknown argument, we can screen | |
|
615 | # for this. | |
|
616 | if command not in self._descriptor['commands']: | |
|
617 | raise error.ProgrammingError( | |
|
618 | 'wire protocol command %s is not available' % command) | |
|
619 | ||
|
620 | cmdinfo = self._descriptor['commands'][command] | |
|
621 | unknownargs = set(args.keys()) - set(cmdinfo.get('args', {})) | |
|
622 | ||
|
623 | if unknownargs: | |
|
624 | raise error.ProgrammingError( | |
|
625 | 'wire protocol command %s does not accept argument: %s' % ( | |
|
626 | command, ', '.join(sorted(unknownargs)))) | |
|
627 | ||
|
628 | self._neededpermissions |= set(cmdinfo['permissions']) | |
|
629 | ||
|
630 | # TODO we /could/ also validate types here, since the API descriptor | |
|
631 | # includes types... | |
|
632 | ||
|
633 | f = pycompat.futures.Future() | |
|
634 | ||
|
635 | # Monkeypatch it so result() triggers sendcommands(), otherwise result() | |
|
636 | # could deadlock. | |
|
637 | f.__class__ = queuedcommandfuture | |
|
638 | f._peerexecutor = self | |
|
639 | ||
|
640 | self._futures.add(f) | |
|
641 | self._calls.append((command, args, f)) | |
|
642 | ||
|
643 | return f | |
|
644 | ||
|
645 | def sendcommands(self): | |
|
646 | if self._sent: | |
|
647 | return | |
|
648 | ||
|
649 | if not self._calls: | |
|
650 | return | |
|
651 | ||
|
652 | self._sent = True | |
|
653 | ||
|
654 | # Unhack any future types so caller sees a clean type and so we | |
|
655 | # break reference cycle. | |
|
656 | for f in self._futures: | |
|
657 | if isinstance(f, queuedcommandfuture): | |
|
658 | f.__class__ = pycompat.futures.Future | |
|
659 | f._peerexecutor = None | |
|
660 | ||
|
661 | # Mark the future as running and filter out cancelled futures. | |
|
662 | calls = [(command, args, f) | |
|
663 | for command, args, f in self._calls | |
|
664 | if f.set_running_or_notify_cancel()] | |
|
665 | ||
|
666 | # Clear out references, prevent improper object usage. | |
|
667 | self._calls = None | |
|
668 | ||
|
669 | if not calls: | |
|
670 | return | |
|
671 | ||
|
672 | permissions = set(self._neededpermissions) | |
|
673 | ||
|
674 | if 'push' in permissions and 'pull' in permissions: | |
|
675 | permissions.remove('pull') | |
|
676 | ||
|
677 | if len(permissions) > 1: | |
|
678 | raise error.RepoError(_('cannot make request requiring multiple ' | |
|
679 | 'permissions: %s') % | |
|
680 | _(', ').join(sorted(permissions))) | |
|
681 | ||
|
682 | permission = { | |
|
683 | 'push': 'rw', | |
|
684 | 'pull': 'ro', | |
|
685 | }[permissions.pop()] | |
|
686 | ||
|
687 | reactor, requests, resp = sendv2request( | |
|
688 | self._ui, self._opener, self._requestbuilder, self._apiurl, | |
|
689 | permission, calls) | |
|
690 | ||
|
691 | # TODO we probably want to validate the HTTP code, media type, etc. | |
|
692 | ||
|
693 | self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1) | |
|
694 | self._responsef = self._responseexecutor.submit(self._handleresponse, | |
|
695 | reactor, | |
|
696 | requests, | |
|
697 | resp) | |
|
698 | ||
|
699 | def close(self): | |
|
700 | if self._closed: | |
|
701 | return | |
|
702 | ||
|
703 | self.sendcommands() | |
|
704 | ||
|
705 | self._closed = True | |
|
706 | ||
|
707 | if not self._responsef: | |
|
708 | return | |
|
709 | ||
|
710 | try: | |
|
711 | self._responsef.result() | |
|
712 | finally: | |
|
713 | self._responseexecutor.shutdown(wait=True) | |
|
714 | self._responsef = None | |
|
715 | self._responseexecutor = None | |
|
716 | ||
|
717 | # If any of our futures are still in progress, mark them as | |
|
718 | # errored, otherwise a result() could wait indefinitely. | |
|
719 | for f in self._futures: | |
|
720 | if not f.done(): | |
|
721 | f.set_exception(error.ResponseError( | |
|
722 | _('unfulfilled command response'))) | |
|
723 | ||
|
724 | self._futures = None | |
|
725 | ||
|
726 | def _handleresponse(self, reactor, requests, resp): | |
|
727 | # Called in a thread to read the response. | |
|
728 | ||
|
729 | results = {k: [] for k in requests} | |
|
730 | ||
|
731 | while True: | |
|
732 | frame = wireprotoframing.readframe(resp) | |
|
733 | if frame is None: | |
|
734 | break | |
|
735 | ||
|
736 | self._ui.note(_('received %r\n') % frame) | |
|
737 | ||
|
738 | # Guard against receiving a frame with a request ID that we | |
|
739 | # didn't issue. This should never happen. | |
|
740 | request, f = requests.get(frame.requestid, [None, None]) | |
|
741 | ||
|
742 | action, meta = reactor.onframerecv(frame) | |
|
743 | ||
|
744 | if action == 'responsedata': | |
|
745 | assert request.requestid == meta['request'].requestid | |
|
746 | ||
|
747 | result = results[request.requestid] | |
|
748 | ||
|
749 | if meta['cbor']: | |
|
750 | payload = util.bytesio(meta['data']) | |
|
751 | ||
|
752 | decoder = cbor.CBORDecoder(payload) | |
|
753 | while payload.tell() + 1 < len(meta['data']): | |
|
754 | try: | |
|
755 | result.append(decoder.decode()) | |
|
756 | except Exception: | |
|
757 | f.set_exception_info(*sys.exc_info()[1:]) | |
|
758 | continue | |
|
759 | else: | |
|
760 | result.append(meta['data']) | |
|
761 | ||
|
762 | if meta['eos']: | |
|
763 | f.set_result(result) | |
|
764 | del results[request.requestid] | |
|
765 | ||
|
766 | else: | |
|
767 | e = error.ProgrammingError('unhandled action: %s' % action) | |
|
768 | ||
|
769 | if f: | |
|
770 | f.set_exception(e) | |
|
771 | else: | |
|
772 | raise e | |
|
773 | ||
|
520 | 774 | # TODO implement interface for version 2 peers |
|
521 |
@zi.implementer(repository.ipeerconnection, repository.ipeercapabilities |
|
|
775 | @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities, | |
|
776 | repository.ipeerrequests) | |
|
522 | 777 | class httpv2peer(object): |
|
523 | 778 | def __init__(self, ui, repourl, apipath, opener, requestbuilder, |
|
524 | 779 | apidescriptor): |
@@ -529,6 +784,7 b' class httpv2peer(object):' | |||
|
529 | 784 | |
|
530 | 785 | self._url = repourl |
|
531 | 786 | self._apipath = apipath |
|
787 | self._apiurl = '%s/%s' % (repourl, apipath) | |
|
532 | 788 | self._opener = opener |
|
533 | 789 | self._requestbuilder = requestbuilder |
|
534 | 790 | self._descriptor = apidescriptor |
@@ -580,85 +836,13 b' class httpv2peer(object):' | |||
|
580 | 836 | |
|
581 | 837 | # End of ipeercapabilities. |
|
582 | 838 | |
|
583 | # TODO require to be part of a batched primitive, use futures. | |
|
584 | 839 | def _call(self, name, **args): |
|
585 | """Call a wire protocol command with arguments.""" | |
|
586 | ||
|
587 | # Having this early has a side-effect of importing wireprotov2server, | |
|
588 | # which has the side-effect of ensuring commands are registered. | |
|
589 | ||
|
590 | # TODO modify user-agent to reflect v2. | |
|
591 | headers = { | |
|
592 | r'Accept': wireprotov2server.FRAMINGTYPE, | |
|
593 | r'Content-Type': wireprotov2server.FRAMINGTYPE, | |
|
594 | } | |
|
595 | ||
|
596 | # TODO permissions should come from capabilities results. | |
|
597 | permission = wireproto.commandsv2[name].permission | |
|
598 | if permission not in ('push', 'pull'): | |
|
599 | raise error.ProgrammingError('unknown permission type: %s' % | |
|
600 | permission) | |
|
601 | ||
|
602 | permission = { | |
|
603 | 'push': 'rw', | |
|
604 | 'pull': 'ro', | |
|
605 | }[permission] | |
|
606 | ||
|
607 | url = '%s/%s/%s/%s' % (self._url, self._apipath, permission, name) | |
|
608 | ||
|
609 | # TODO this should be part of a generic peer for the frame-based | |
|
610 | # protocol. | |
|
611 | reactor = wireprotoframing.clientreactor(hasmultiplesend=False, | |
|
612 | buffersends=True) | |
|
613 | ||
|
614 | request, action, meta = reactor.callcommand(name, args) | |
|
615 | assert action == 'noop' | |
|
616 | ||
|
617 | action, meta = reactor.flushcommands() | |
|
618 | assert action == 'sendframes' | |
|
840 | with self.commandexecutor() as e: | |
|
841 | return e.callcommand(name, args).result() | |
|
619 | 842 | |
|
620 | body = b''.join(map(bytes, meta['framegen'])) | |
|
621 | req = self._requestbuilder(pycompat.strurl(url), body, headers) | |
|
622 | req.add_unredirected_header(r'Content-Length', r'%d' % len(body)) | |
|
623 | ||
|
624 | # TODO unify this code with httppeer. | |
|
625 | try: | |
|
626 | res = self._opener.open(req) | |
|
627 | except urlerr.httperror as e: | |
|
628 | if e.code == 401: | |
|
629 | raise error.Abort(_('authorization failed')) | |
|
630 | ||
|
631 | raise | |
|
632 | except httplib.HTTPException as e: | |
|
633 | self.ui.traceback() | |
|
634 | raise IOError(None, e) | |
|
635 | ||
|
636 | # TODO validate response type, wrap response to handle I/O errors. | |
|
637 | # TODO more robust frame receiver. | |
|
638 | results = [] | |
|
639 | ||
|
640 | while True: | |
|
641 | frame = wireprotoframing.readframe(res) | |
|
642 | if frame is None: | |
|
643 | break | |
|
644 | ||
|
645 | self.ui.note(_('received %r\n') % frame) | |
|
646 | ||
|
647 | action, meta = reactor.onframerecv(frame) | |
|
648 | ||
|
649 | if action == 'responsedata': | |
|
650 | if meta['cbor']: | |
|
651 | payload = util.bytesio(meta['data']) | |
|
652 | ||
|
653 | decoder = cbor.CBORDecoder(payload) | |
|
654 | while payload.tell() + 1 < len(meta['data']): | |
|
655 | results.append(decoder.decode()) | |
|
656 | else: | |
|
657 | results.append(meta['data']) | |
|
658 | else: | |
|
659 | error.ProgrammingError('unhandled action: %s' % action) | |
|
660 | ||
|
661 | return results | |
|
843 | def commandexecutor(self): | |
|
844 | return httpv2executor(self.ui, self._opener, self._requestbuilder, | |
|
845 | self._apiurl, self._descriptor) | |
|
662 | 846 | |
|
663 | 847 | # Registry of API service names to metadata about peers that handle it. |
|
664 | 848 | # |
General Comments 0
You need to be logged in to leave comments.
Login now