Show More
@@ -0,0 +1,135 | |||
|
1 | # wireprotov2peer.py - client side code for wire protocol version 2 | |
|
2 | # | |
|
3 | # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com> | |
|
4 | # | |
|
5 | # This software may be used and distributed according to the terms of the | |
|
6 | # GNU General Public License version 2 or any later version. | |
|
7 | ||
|
8 | from __future__ import absolute_import | |
|
9 | ||
|
10 | from .i18n import _ | |
|
11 | from .thirdparty import ( | |
|
12 | cbor, | |
|
13 | ) | |
|
14 | from . import ( | |
|
15 | error, | |
|
16 | util, | |
|
17 | wireprotoframing, | |
|
18 | ) | |
|
19 | ||
|
20 | class clienthandler(object): | |
|
21 | """Object to handle higher-level client activities. | |
|
22 | ||
|
23 | The ``clientreactor`` is used to hold low-level state about the frame-based | |
|
24 | protocol, such as which requests and streams are active. This type is used | |
|
25 | for higher-level operations, such as reading frames from a socket, exposing | |
|
26 | and managing a higher-level primitive for representing command responses, | |
|
27 | etc. This class is what peers should probably use to bridge wire activity | |
|
28 | with the higher-level peer API. | |
|
29 | """ | |
|
30 | ||
|
31 | def __init__(self, ui, clientreactor): | |
|
32 | self._ui = ui | |
|
33 | self._reactor = clientreactor | |
|
34 | self._requests = {} | |
|
35 | self._futures = {} | |
|
36 | self._responses = {} | |
|
37 | ||
|
38 | def callcommand(self, command, args, f): | |
|
39 | """Register a request to call a command. | |
|
40 | ||
|
41 | Returns an iterable of frames that should be sent over the wire. | |
|
42 | """ | |
|
43 | request, action, meta = self._reactor.callcommand(command, args) | |
|
44 | ||
|
45 | if action != 'noop': | |
|
46 | raise error.ProgrammingError('%s not yet supported' % action) | |
|
47 | ||
|
48 | rid = request.requestid | |
|
49 | self._requests[rid] = request | |
|
50 | self._futures[rid] = f | |
|
51 | self._responses[rid] = { | |
|
52 | 'cbor': False, | |
|
53 | 'b': util.bytesio(), | |
|
54 | } | |
|
55 | ||
|
56 | return iter(()) | |
|
57 | ||
|
58 | def flushcommands(self): | |
|
59 | """Flush all queued commands. | |
|
60 | ||
|
61 | Returns an iterable of frames that should be sent over the wire. | |
|
62 | """ | |
|
63 | action, meta = self._reactor.flushcommands() | |
|
64 | ||
|
65 | if action != 'sendframes': | |
|
66 | raise error.ProgrammingError('%s not yet supported' % action) | |
|
67 | ||
|
68 | return meta['framegen'] | |
|
69 | ||
|
70 | def readframe(self, fh): | |
|
71 | """Attempt to read and process a frame. | |
|
72 | ||
|
73 | Returns None if no frame was read. Presumably this means EOF. | |
|
74 | """ | |
|
75 | frame = wireprotoframing.readframe(fh) | |
|
76 | if frame is None: | |
|
77 | # TODO tell reactor? | |
|
78 | return | |
|
79 | ||
|
80 | self._ui.note(_('received %r\n') % frame) | |
|
81 | self._processframe(frame) | |
|
82 | ||
|
83 | return True | |
|
84 | ||
|
85 | def _processframe(self, frame): | |
|
86 | """Process a single read frame.""" | |
|
87 | ||
|
88 | action, meta = self._reactor.onframerecv(frame) | |
|
89 | ||
|
90 | if action == 'error': | |
|
91 | e = error.RepoError(meta['message']) | |
|
92 | ||
|
93 | if frame.requestid in self._futures: | |
|
94 | self._futures[frame.requestid].set_exception(e) | |
|
95 | else: | |
|
96 | raise e | |
|
97 | ||
|
98 | if frame.requestid not in self._requests: | |
|
99 | raise error.ProgrammingError( | |
|
100 | 'received frame for unknown request; this is either a bug in ' | |
|
101 | 'the clientreactor not screening for this or this instance was ' | |
|
102 | 'never told about this request: %r' % frame) | |
|
103 | ||
|
104 | response = self._responses[frame.requestid] | |
|
105 | ||
|
106 | if action == 'responsedata': | |
|
107 | response['b'].write(meta['data']) | |
|
108 | ||
|
109 | if meta['cbor']: | |
|
110 | response['cbor'] = True | |
|
111 | ||
|
112 | if meta['eos']: | |
|
113 | if meta['cbor']: | |
|
114 | # If CBOR, decode every object. | |
|
115 | b = response['b'] | |
|
116 | ||
|
117 | size = b.tell() | |
|
118 | b.seek(0) | |
|
119 | ||
|
120 | decoder = cbor.CBORDecoder(b) | |
|
121 | ||
|
122 | result = [] | |
|
123 | while b.tell() < size: | |
|
124 | result.append(decoder.decode()) | |
|
125 | else: | |
|
126 | result = [response['b'].getvalue()] | |
|
127 | ||
|
128 | self._futures[frame.requestid].set_result(result) | |
|
129 | ||
|
130 | del self._requests[frame.requestid] | |
|
131 | del self._futures[frame.requestid] | |
|
132 | ||
|
133 | else: | |
|
134 | raise error.ProgrammingError( | |
|
135 | 'unhandled action from clientreactor: %s' % action) |
@@ -13,7 +13,6 import io | |||
|
13 | 13 | import os |
|
14 | 14 | import socket |
|
15 | 15 | import struct |
|
16 | import sys | |
|
17 | 16 | import tempfile |
|
18 | 17 | import weakref |
|
19 | 18 | |
@@ -36,6 +35,7 from . import ( | |||
|
36 | 35 | wireprotoframing, |
|
37 | 36 | wireprototypes, |
|
38 | 37 | wireprotov1peer, |
|
38 | wireprotov2peer, | |
|
39 | 39 | wireprotov2server, |
|
40 | 40 | ) |
|
41 | 41 | |
@@ -522,6 +522,8 def sendv2request(ui, opener, requestbui | |||
|
522 | 522 | reactor = wireprotoframing.clientreactor(hasmultiplesend=False, |
|
523 | 523 | buffersends=True) |
|
524 | 524 | |
|
525 | handler = wireprotov2peer.clienthandler(ui, reactor) | |
|
526 | ||
|
525 | 527 | url = '%s/%s' % (apiurl, permission) |
|
526 | 528 | |
|
527 | 529 | if len(requests) > 1: |
@@ -529,20 +531,11 def sendv2request(ui, opener, requestbui | |||
|
529 | 531 | else: |
|
530 | 532 | url += '/%s' % requests[0][0] |
|
531 | 533 | |
|
532 | # Request ID to (request, future) | |
|
533 | requestmap = {} | |
|
534 | ||
|
535 | 534 | for command, args, f in requests: |
|
536 |
|
|
|
537 | assert action == 'noop' | |
|
538 | ||
|
539 | requestmap[request.requestid] = (request, f) | |
|
540 | ||
|
541 | action, meta = reactor.flushcommands() | |
|
542 | assert action == 'sendframes' | |
|
535 | assert not list(handler.callcommand(command, args, f)) | |
|
543 | 536 | |
|
544 | 537 | # TODO stream this. |
|
545 |
body = b''.join(map(bytes, |
|
|
538 | body = b''.join(map(bytes, handler.flushcommands())) | |
|
546 | 539 | |
|
547 | 540 | # TODO modify user-agent to reflect v2 |
|
548 | 541 | headers = { |
@@ -564,7 +557,7 def sendv2request(ui, opener, requestbui | |||
|
564 | 557 | ui.traceback() |
|
565 | 558 | raise IOError(None, e) |
|
566 | 559 | |
|
567 |
return |
|
|
560 | return handler, res | |
|
568 | 561 | |
|
569 | 562 | class queuedcommandfuture(pycompat.futures.Future): |
|
570 | 563 | """Wraps result() on command futures to trigger submission on call.""" |
@@ -684,7 +677,7 class httpv2executor(object): | |||
|
684 | 677 | 'pull': 'ro', |
|
685 | 678 | }[permissions.pop()] |
|
686 | 679 | |
|
687 |
|
|
|
680 | handler, resp = sendv2request( | |
|
688 | 681 | self._ui, self._opener, self._requestbuilder, self._apiurl, |
|
689 | 682 | permission, calls) |
|
690 | 683 | |
@@ -692,9 +685,7 class httpv2executor(object): | |||
|
692 | 685 | |
|
693 | 686 | self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1) |
|
694 | 687 | self._responsef = self._responseexecutor.submit(self._handleresponse, |
|
695 |
|
|
|
696 | requests, | |
|
697 | resp) | |
|
688 | handler, resp) | |
|
698 | 689 | |
|
699 | 690 | def close(self): |
|
700 | 691 | if self._closed: |
@@ -723,62 +714,11 class httpv2executor(object): | |||
|
723 | 714 | |
|
724 | 715 | self._futures = None |
|
725 | 716 | |
|
726 |
def _handleresponse(self, |
|
|
717 | def _handleresponse(self, handler, resp): | |
|
727 | 718 | # Called in a thread to read the response. |
|
728 | 719 | |
|
729 | results = {k: [] for k in requests} | |
|
730 | ||
|
731 | while True: | |
|
732 | frame = wireprotoframing.readframe(resp) | |
|
733 | if frame is None: | |
|
734 | break | |
|
735 | ||
|
736 | self._ui.note(_('received %r\n') % frame) | |
|
737 | ||
|
738 | # Guard against receiving a frame with a request ID that we | |
|
739 | # didn't issue. This should never happen. | |
|
740 | request, f = requests.get(frame.requestid, [None, None]) | |
|
741 | ||
|
742 | action, meta = reactor.onframerecv(frame) | |
|
743 | ||
|
744 | if action == 'responsedata': | |
|
745 | assert request.requestid == meta['request'].requestid | |
|
746 | ||
|
747 | result = results[request.requestid] | |
|
748 | ||
|
749 | if meta['cbor']: | |
|
750 | payload = util.bytesio(meta['data']) | |
|
751 | ||
|
752 | decoder = cbor.CBORDecoder(payload) | |
|
753 | while payload.tell() + 1 < len(meta['data']): | |
|
754 | try: | |
|
755 | result.append(decoder.decode()) | |
|
756 | except Exception: | |
|
757 | pycompat.future_set_exception_info( | |
|
758 | f, sys.exc_info()[1:]) | |
|
759 | continue | |
|
760 | else: | |
|
761 | result.append(meta['data']) | |
|
762 | ||
|
763 | if meta['eos']: | |
|
764 | f.set_result(result) | |
|
765 | del results[request.requestid] | |
|
766 | ||
|
767 | elif action == 'error': | |
|
768 | e = error.RepoError(meta['message']) | |
|
769 | ||
|
770 | if f: | |
|
771 | f.set_exception(e) | |
|
772 | else: | |
|
773 | raise e | |
|
774 | ||
|
775 | else: | |
|
776 | e = error.ProgrammingError('unhandled action: %s' % action) | |
|
777 | ||
|
778 | if f: | |
|
779 | f.set_exception(e) | |
|
780 | else: | |
|
781 | raise e | |
|
720 | while handler.readframe(resp): | |
|
721 | pass | |
|
782 | 722 | |
|
783 | 723 | # TODO implement interface for version 2 peers |
|
784 | 724 | @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities, |
General Comments 0
You need to be logged in to leave comments.
Login now