Show More
@@ -13,7 +13,9 b' import io' | |||||
13 | import os |
|
13 | import os | |
14 | import socket |
|
14 | import socket | |
15 | import struct |
|
15 | import struct | |
|
16 | import sys | |||
16 | import tempfile |
|
17 | import tempfile | |
|
18 | import weakref | |||
17 |
|
19 | |||
18 | from .i18n import _ |
|
20 | from .i18n import _ | |
19 | from .thirdparty import ( |
|
21 | from .thirdparty import ( | |
@@ -31,7 +33,6 b' from . import (' | |||||
31 | statichttprepo, |
|
33 | statichttprepo, | |
32 | url as urlmod, |
|
34 | url as urlmod, | |
33 | util, |
|
35 | util, | |
34 | wireproto, |
|
|||
35 | wireprotoframing, |
|
36 | wireprotoframing, | |
36 | wireprototypes, |
|
37 | wireprototypes, | |
37 | wireprotov1peer, |
|
38 | wireprotov1peer, | |
@@ -517,8 +518,262 b' class httppeer(wireprotov1peer.wirepeer)' | |||||
517 | def _abort(self, exception): |
|
518 | def _abort(self, exception): | |
518 | raise exception |
|
519 | raise exception | |
519 |
|
520 | |||
|
521 | def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests): | |||
|
522 | reactor = wireprotoframing.clientreactor(hasmultiplesend=False, | |||
|
523 | buffersends=True) | |||
|
524 | ||||
|
525 | url = '%s/%s' % (apiurl, permission) | |||
|
526 | ||||
|
527 | if len(requests) > 1: | |||
|
528 | url += '/multirequest' | |||
|
529 | else: | |||
|
530 | url += '/%s' % requests[0][0] | |||
|
531 | ||||
|
532 | # Request ID to (request, future) | |||
|
533 | requestmap = {} | |||
|
534 | ||||
|
535 | for command, args, f in requests: | |||
|
536 | request, action, meta = reactor.callcommand(command, args) | |||
|
537 | assert action == 'noop' | |||
|
538 | ||||
|
539 | requestmap[request.requestid] = (request, f) | |||
|
540 | ||||
|
541 | action, meta = reactor.flushcommands() | |||
|
542 | assert action == 'sendframes' | |||
|
543 | ||||
|
544 | # TODO stream this. | |||
|
545 | body = b''.join(map(bytes, meta['framegen'])) | |||
|
546 | ||||
|
547 | # TODO modify user-agent to reflect v2 | |||
|
548 | headers = { | |||
|
549 | r'Accept': wireprotov2server.FRAMINGTYPE, | |||
|
550 | r'Content-Type': wireprotov2server.FRAMINGTYPE, | |||
|
551 | } | |||
|
552 | ||||
|
553 | req = requestbuilder(pycompat.strurl(url), body, headers) | |||
|
554 | req.add_unredirected_header(r'Content-Length', r'%d' % len(body)) | |||
|
555 | ||||
|
556 | try: | |||
|
557 | res = opener.open(req) | |||
|
558 | except urlerr.httperror as e: | |||
|
559 | if e.code == 401: | |||
|
560 | raise error.Abort(_('authorization failed')) | |||
|
561 | ||||
|
562 | raise | |||
|
563 | except httplib.HTTPException as e: | |||
|
564 | ui.traceback() | |||
|
565 | raise IOError(None, e) | |||
|
566 | ||||
|
567 | return reactor, requestmap, res | |||
|
568 | ||||
|
569 | class queuedcommandfuture(pycompat.futures.Future): | |||
|
570 | """Wraps result() on command futures to trigger submission on call.""" | |||
|
571 | ||||
|
572 | def result(self, timeout=None): | |||
|
573 | if self.done(): | |||
|
574 | return pycompat.futures.Future.result(self, timeout) | |||
|
575 | ||||
|
576 | self._peerexecutor.sendcommands() | |||
|
577 | ||||
|
578 | # sendcommands() will restore the original __class__ and self.result | |||
|
579 | # will resolve to Future.result. | |||
|
580 | return self.result(timeout) | |||
|
581 | ||||
|
582 | @zi.implementer(repository.ipeercommandexecutor) | |||
|
583 | class httpv2executor(object): | |||
|
584 | def __init__(self, ui, opener, requestbuilder, apiurl, descriptor): | |||
|
585 | self._ui = ui | |||
|
586 | self._opener = opener | |||
|
587 | self._requestbuilder = requestbuilder | |||
|
588 | self._apiurl = apiurl | |||
|
589 | self._descriptor = descriptor | |||
|
590 | self._sent = False | |||
|
591 | self._closed = False | |||
|
592 | self._neededpermissions = set() | |||
|
593 | self._calls = [] | |||
|
594 | self._futures = weakref.WeakSet() | |||
|
595 | self._responseexecutor = None | |||
|
596 | self._responsef = None | |||
|
597 | ||||
|
598 | def __enter__(self): | |||
|
599 | return self | |||
|
600 | ||||
|
601 | def __exit__(self, exctype, excvalue, exctb): | |||
|
602 | self.close() | |||
|
603 | ||||
|
604 | def callcommand(self, command, args): | |||
|
605 | if self._sent: | |||
|
606 | raise error.ProgrammingError('callcommand() cannot be used after ' | |||
|
607 | 'commands are sent') | |||
|
608 | ||||
|
609 | if self._closed: | |||
|
610 | raise error.ProgrammingError('callcommand() cannot be used after ' | |||
|
611 | 'close()') | |||
|
612 | ||||
|
613 | # The service advertises which commands are available. So if we attempt | |||
|
614 | # to call an unknown command or pass an unknown argument, we can screen | |||
|
615 | # for this. | |||
|
616 | if command not in self._descriptor['commands']: | |||
|
617 | raise error.ProgrammingError( | |||
|
618 | 'wire protocol command %s is not available' % command) | |||
|
619 | ||||
|
620 | cmdinfo = self._descriptor['commands'][command] | |||
|
621 | unknownargs = set(args.keys()) - set(cmdinfo.get('args', {})) | |||
|
622 | ||||
|
623 | if unknownargs: | |||
|
624 | raise error.ProgrammingError( | |||
|
625 | 'wire protocol command %s does not accept argument: %s' % ( | |||
|
626 | command, ', '.join(sorted(unknownargs)))) | |||
|
627 | ||||
|
628 | self._neededpermissions |= set(cmdinfo['permissions']) | |||
|
629 | ||||
|
630 | # TODO we /could/ also validate types here, since the API descriptor | |||
|
631 | # includes types... | |||
|
632 | ||||
|
633 | f = pycompat.futures.Future() | |||
|
634 | ||||
|
635 | # Monkeypatch it so result() triggers sendcommands(), otherwise result() | |||
|
636 | # could deadlock. | |||
|
637 | f.__class__ = queuedcommandfuture | |||
|
638 | f._peerexecutor = self | |||
|
639 | ||||
|
640 | self._futures.add(f) | |||
|
641 | self._calls.append((command, args, f)) | |||
|
642 | ||||
|
643 | return f | |||
|
644 | ||||
|
645 | def sendcommands(self): | |||
|
646 | if self._sent: | |||
|
647 | return | |||
|
648 | ||||
|
649 | if not self._calls: | |||
|
650 | return | |||
|
651 | ||||
|
652 | self._sent = True | |||
|
653 | ||||
|
654 | # Unhack any future types so caller sees a clean type and so we | |||
|
655 | # break reference cycle. | |||
|
656 | for f in self._futures: | |||
|
657 | if isinstance(f, queuedcommandfuture): | |||
|
658 | f.__class__ = pycompat.futures.Future | |||
|
659 | f._peerexecutor = None | |||
|
660 | ||||
|
661 | # Mark the future as running and filter out cancelled futures. | |||
|
662 | calls = [(command, args, f) | |||
|
663 | for command, args, f in self._calls | |||
|
664 | if f.set_running_or_notify_cancel()] | |||
|
665 | ||||
|
666 | # Clear out references, prevent improper object usage. | |||
|
667 | self._calls = None | |||
|
668 | ||||
|
669 | if not calls: | |||
|
670 | return | |||
|
671 | ||||
|
672 | permissions = set(self._neededpermissions) | |||
|
673 | ||||
|
674 | if 'push' in permissions and 'pull' in permissions: | |||
|
675 | permissions.remove('pull') | |||
|
676 | ||||
|
677 | if len(permissions) > 1: | |||
|
678 | raise error.RepoError(_('cannot make request requiring multiple ' | |||
|
679 | 'permissions: %s') % | |||
|
680 | _(', ').join(sorted(permissions))) | |||
|
681 | ||||
|
682 | permission = { | |||
|
683 | 'push': 'rw', | |||
|
684 | 'pull': 'ro', | |||
|
685 | }[permissions.pop()] | |||
|
686 | ||||
|
687 | reactor, requests, resp = sendv2request( | |||
|
688 | self._ui, self._opener, self._requestbuilder, self._apiurl, | |||
|
689 | permission, calls) | |||
|
690 | ||||
|
691 | # TODO we probably want to validate the HTTP code, media type, etc. | |||
|
692 | ||||
|
693 | self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1) | |||
|
694 | self._responsef = self._responseexecutor.submit(self._handleresponse, | |||
|
695 | reactor, | |||
|
696 | requests, | |||
|
697 | resp) | |||
|
698 | ||||
|
699 | def close(self): | |||
|
700 | if self._closed: | |||
|
701 | return | |||
|
702 | ||||
|
703 | self.sendcommands() | |||
|
704 | ||||
|
705 | self._closed = True | |||
|
706 | ||||
|
707 | if not self._responsef: | |||
|
708 | return | |||
|
709 | ||||
|
710 | try: | |||
|
711 | self._responsef.result() | |||
|
712 | finally: | |||
|
713 | self._responseexecutor.shutdown(wait=True) | |||
|
714 | self._responsef = None | |||
|
715 | self._responseexecutor = None | |||
|
716 | ||||
|
717 | # If any of our futures are still in progress, mark them as | |||
|
718 | # errored, otherwise a result() could wait indefinitely. | |||
|
719 | for f in self._futures: | |||
|
720 | if not f.done(): | |||
|
721 | f.set_exception(error.ResponseError( | |||
|
722 | _('unfulfilled command response'))) | |||
|
723 | ||||
|
724 | self._futures = None | |||
|
725 | ||||
|
726 | def _handleresponse(self, reactor, requests, resp): | |||
|
727 | # Called in a thread to read the response. | |||
|
728 | ||||
|
729 | results = {k: [] for k in requests} | |||
|
730 | ||||
|
731 | while True: | |||
|
732 | frame = wireprotoframing.readframe(resp) | |||
|
733 | if frame is None: | |||
|
734 | break | |||
|
735 | ||||
|
736 | self._ui.note(_('received %r\n') % frame) | |||
|
737 | ||||
|
738 | # Guard against receiving a frame with a request ID that we | |||
|
739 | # didn't issue. This should never happen. | |||
|
740 | request, f = requests.get(frame.requestid, [None, None]) | |||
|
741 | ||||
|
742 | action, meta = reactor.onframerecv(frame) | |||
|
743 | ||||
|
744 | if action == 'responsedata': | |||
|
745 | assert request.requestid == meta['request'].requestid | |||
|
746 | ||||
|
747 | result = results[request.requestid] | |||
|
748 | ||||
|
749 | if meta['cbor']: | |||
|
750 | payload = util.bytesio(meta['data']) | |||
|
751 | ||||
|
752 | decoder = cbor.CBORDecoder(payload) | |||
|
753 | while payload.tell() + 1 < len(meta['data']): | |||
|
754 | try: | |||
|
755 | result.append(decoder.decode()) | |||
|
756 | except Exception: | |||
|
757 | f.set_exception_info(*sys.exc_info()[1:]) | |||
|
758 | continue | |||
|
759 | else: | |||
|
760 | result.append(meta['data']) | |||
|
761 | ||||
|
762 | if meta['eos']: | |||
|
763 | f.set_result(result) | |||
|
764 | del results[request.requestid] | |||
|
765 | ||||
|
766 | else: | |||
|
767 | e = error.ProgrammingError('unhandled action: %s' % action) | |||
|
768 | ||||
|
769 | if f: | |||
|
770 | f.set_exception(e) | |||
|
771 | else: | |||
|
772 | raise e | |||
|
773 | ||||
520 | # TODO implement interface for version 2 peers |
|
774 | # TODO implement interface for version 2 peers | |
521 |
@zi.implementer(repository.ipeerconnection, repository.ipeercapabilities |
|
775 | @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities, | |
|
776 | repository.ipeerrequests) | |||
522 | class httpv2peer(object): |
|
777 | class httpv2peer(object): | |
523 | def __init__(self, ui, repourl, apipath, opener, requestbuilder, |
|
778 | def __init__(self, ui, repourl, apipath, opener, requestbuilder, | |
524 | apidescriptor): |
|
779 | apidescriptor): | |
@@ -529,6 +784,7 b' class httpv2peer(object):' | |||||
529 |
|
784 | |||
530 | self._url = repourl |
|
785 | self._url = repourl | |
531 | self._apipath = apipath |
|
786 | self._apipath = apipath | |
|
787 | self._apiurl = '%s/%s' % (repourl, apipath) | |||
532 | self._opener = opener |
|
788 | self._opener = opener | |
533 | self._requestbuilder = requestbuilder |
|
789 | self._requestbuilder = requestbuilder | |
534 | self._descriptor = apidescriptor |
|
790 | self._descriptor = apidescriptor | |
@@ -580,85 +836,13 b' class httpv2peer(object):' | |||||
580 |
|
836 | |||
581 | # End of ipeercapabilities. |
|
837 | # End of ipeercapabilities. | |
582 |
|
838 | |||
583 | # TODO require to be part of a batched primitive, use futures. |
|
|||
584 | def _call(self, name, **args): |
|
839 | def _call(self, name, **args): | |
585 | """Call a wire protocol command with arguments.""" |
|
840 | with self.commandexecutor() as e: | |
586 |
|
841 | return e.callcommand(name, args).result() | ||
587 | # Having this early has a side-effect of importing wireprotov2server, |
|
|||
588 | # which has the side-effect of ensuring commands are registered. |
|
|||
589 |
|
||||
590 | # TODO modify user-agent to reflect v2. |
|
|||
591 | headers = { |
|
|||
592 | r'Accept': wireprotov2server.FRAMINGTYPE, |
|
|||
593 | r'Content-Type': wireprotov2server.FRAMINGTYPE, |
|
|||
594 | } |
|
|||
595 |
|
||||
596 | # TODO permissions should come from capabilities results. |
|
|||
597 | permission = wireproto.commandsv2[name].permission |
|
|||
598 | if permission not in ('push', 'pull'): |
|
|||
599 | raise error.ProgrammingError('unknown permission type: %s' % |
|
|||
600 | permission) |
|
|||
601 |
|
||||
602 | permission = { |
|
|||
603 | 'push': 'rw', |
|
|||
604 | 'pull': 'ro', |
|
|||
605 | }[permission] |
|
|||
606 |
|
||||
607 | url = '%s/%s/%s/%s' % (self._url, self._apipath, permission, name) |
|
|||
608 |
|
||||
609 | # TODO this should be part of a generic peer for the frame-based |
|
|||
610 | # protocol. |
|
|||
611 | reactor = wireprotoframing.clientreactor(hasmultiplesend=False, |
|
|||
612 | buffersends=True) |
|
|||
613 |
|
||||
614 | request, action, meta = reactor.callcommand(name, args) |
|
|||
615 | assert action == 'noop' |
|
|||
616 |
|
||||
617 | action, meta = reactor.flushcommands() |
|
|||
618 | assert action == 'sendframes' |
|
|||
619 |
|
842 | |||
620 | body = b''.join(map(bytes, meta['framegen'])) |
|
843 | def commandexecutor(self): | |
621 | req = self._requestbuilder(pycompat.strurl(url), body, headers) |
|
844 | return httpv2executor(self.ui, self._opener, self._requestbuilder, | |
622 | req.add_unredirected_header(r'Content-Length', r'%d' % len(body)) |
|
845 | self._apiurl, self._descriptor) | |
623 |
|
||||
624 | # TODO unify this code with httppeer. |
|
|||
625 | try: |
|
|||
626 | res = self._opener.open(req) |
|
|||
627 | except urlerr.httperror as e: |
|
|||
628 | if e.code == 401: |
|
|||
629 | raise error.Abort(_('authorization failed')) |
|
|||
630 |
|
||||
631 | raise |
|
|||
632 | except httplib.HTTPException as e: |
|
|||
633 | self.ui.traceback() |
|
|||
634 | raise IOError(None, e) |
|
|||
635 |
|
||||
636 | # TODO validate response type, wrap response to handle I/O errors. |
|
|||
637 | # TODO more robust frame receiver. |
|
|||
638 | results = [] |
|
|||
639 |
|
||||
640 | while True: |
|
|||
641 | frame = wireprotoframing.readframe(res) |
|
|||
642 | if frame is None: |
|
|||
643 | break |
|
|||
644 |
|
||||
645 | self.ui.note(_('received %r\n') % frame) |
|
|||
646 |
|
||||
647 | action, meta = reactor.onframerecv(frame) |
|
|||
648 |
|
||||
649 | if action == 'responsedata': |
|
|||
650 | if meta['cbor']: |
|
|||
651 | payload = util.bytesio(meta['data']) |
|
|||
652 |
|
||||
653 | decoder = cbor.CBORDecoder(payload) |
|
|||
654 | while payload.tell() + 1 < len(meta['data']): |
|
|||
655 | results.append(decoder.decode()) |
|
|||
656 | else: |
|
|||
657 | results.append(meta['data']) |
|
|||
658 | else: |
|
|||
659 | error.ProgrammingError('unhandled action: %s' % action) |
|
|||
660 |
|
||||
661 | return results |
|
|||
662 |
|
846 | |||
663 | # Registry of API service names to metadata about peers that handle it. |
|
847 | # Registry of API service names to metadata about peers that handle it. | |
664 | # |
|
848 | # |
General Comments 0
You need to be logged in to leave comments.
Login now