|
@@
-7,11
+7,12
b''
|
|
7
|
|
|
7
|
|
|
8
|
from __future__ import absolute_import
|
|
8
|
from __future__ import absolute_import
|
|
9
|
|
|
9
|
|
|
|
|
|
10
|
import threading
|
|
|
|
|
11
|
|
|
10
|
from .i18n import _
|
|
12
|
from .i18n import _
|
|
11
|
from . import (
|
|
13
|
from . import (
|
|
12
|
encoding,
|
|
14
|
encoding,
|
|
13
|
error,
|
|
15
|
error,
|
|
14
|
util,
|
|
|
|
|
15
|
wireprotoframing,
|
|
16
|
wireprotoframing,
|
|
16
|
)
|
|
17
|
)
|
|
17
|
from .utils import (
|
|
18
|
from .utils import (
|
|
@@
-34,20
+35,101
b' def formatrichmessage(atoms):'
|
|
34
|
return b''.join(chunks)
|
|
35
|
return b''.join(chunks)
|
|
35
|
|
|
36
|
|
|
36
|
class commandresponse(object):
|
|
37
|
class commandresponse(object):
|
|
37
|
"""Represents the response to a command request."""
|
|
38
|
"""Represents the response to a command request.
|
|
|
|
|
39
|
|
|
|
|
|
40
|
Instances track the state of the command and hold its results.
|
|
|
|
|
41
|
|
|
|
|
|
42
|
An external entity is required to update the state of the object when
|
|
|
|
|
43
|
events occur.
|
|
|
|
|
44
|
"""
|
|
38
|
|
|
45
|
|
|
39
|
def __init__(self, requestid, command):
|
|
46
|
def __init__(self, requestid, command):
|
|
40
|
self.requestid = requestid
|
|
47
|
self.requestid = requestid
|
|
41
|
self.command = command
|
|
48
|
self.command = command
|
|
42
|
|
|
49
|
|
|
43
|
self.b = util.bytesio()
|
|
50
|
# Whether all remote input related to this command has been
|
|
|
|
|
51
|
# received.
|
|
|
|
|
52
|
self._inputcomplete = False
|
|
|
|
|
53
|
|
|
|
|
|
54
|
# We have a lock that is acquired when important object state is
|
|
|
|
|
55
|
# mutated. This is to prevent race conditions between 1 thread
|
|
|
|
|
56
|
# sending us new data and another consuming it.
|
|
|
|
|
57
|
self._lock = threading.RLock()
|
|
|
|
|
58
|
|
|
|
|
|
59
|
# An event is set when state of the object changes. This event
|
|
|
|
|
60
|
# is waited on by the generator emitting objects.
|
|
|
|
|
61
|
self._serviceable = threading.Event()
|
|
|
|
|
62
|
|
|
|
|
|
63
|
self._pendingevents = []
|
|
|
|
|
64
|
self._decoder = cborutil.bufferingdecoder()
|
|
|
|
|
65
|
self._seeninitial = False
|
|
|
|
|
66
|
|
|
|
|
|
67
|
def _oninputcomplete(self):
|
|
|
|
|
68
|
with self._lock:
|
|
|
|
|
69
|
self._inputcomplete = True
|
|
|
|
|
70
|
self._serviceable.set()
|
|
|
|
|
71
|
|
|
|
|
|
72
|
def _onresponsedata(self, data):
|
|
|
|
|
73
|
available, readcount, wanted = self._decoder.decode(data)
|
|
|
|
|
74
|
|
|
|
|
|
75
|
if not available:
|
|
|
|
|
76
|
return
|
|
|
|
|
77
|
|
|
|
|
|
78
|
with self._lock:
|
|
|
|
|
79
|
for o in self._decoder.getavailable():
|
|
|
|
|
80
|
if not self._seeninitial:
|
|
|
|
|
81
|
self._handleinitial(o)
|
|
|
|
|
82
|
continue
|
|
|
|
|
83
|
|
|
|
|
|
84
|
self._pendingevents.append(o)
|
|
|
|
|
85
|
|
|
|
|
|
86
|
self._serviceable.set()
|
|
44
|
|
|
87
|
|
|
45
|
def cborobjects(self):
|
|
88
|
def _handleinitial(self, o):
|
|
46
|
"""Obtain decoded CBOR objects from this response."""
|
|
89
|
self._seeninitial = True
|
|
47
|
self.b.seek(0)
|
|
90
|
if o[b'status'] == 'ok':
|
|
|
|
|
91
|
return
|
|
|
|
|
92
|
|
|
|
|
|
93
|
atoms = [{'msg': o[b'error'][b'message']}]
|
|
|
|
|
94
|
if b'args' in o[b'error']:
|
|
|
|
|
95
|
atoms[0]['args'] = o[b'error'][b'args']
|
|
|
|
|
96
|
|
|
|
|
|
97
|
raise error.RepoError(formatrichmessage(atoms))
|
|
|
|
|
98
|
|
|
|
|
|
99
|
def objects(self):
|
|
|
|
|
100
|
"""Obtained decoded objects from this response.
|
|
|
|
|
101
|
|
|
|
|
|
102
|
This is a generator of data structures that were decoded from the
|
|
|
|
|
103
|
command response.
|
|
|
|
|
104
|
|
|
|
|
|
105
|
Obtaining the next member of the generator may block due to waiting
|
|
|
|
|
106
|
on external data to become available.
|
|
48
|
|
|
107
|
|
|
49
|
for v in cborutil.decodeall(self.b.getvalue()):
|
|
108
|
If the server encountered an error in the middle of serving the data
|
|
50
|
yield v
|
|
109
|
or if another error occurred, an exception may be raised when
|
|
|
|
|
110
|
advancing the generator.
|
|
|
|
|
111
|
"""
|
|
|
|
|
112
|
while True:
|
|
|
|
|
113
|
# TODO this can infinite loop if self._inputcomplete is never
|
|
|
|
|
114
|
# set. We likely want to tie the lifetime of this object/state
|
|
|
|
|
115
|
# to that of the background thread receiving frames and updating
|
|
|
|
|
116
|
# our state.
|
|
|
|
|
117
|
self._serviceable.wait(1.0)
|
|
|
|
|
118
|
|
|
|
|
|
119
|
with self._lock:
|
|
|
|
|
120
|
self._serviceable.clear()
|
|
|
|
|
121
|
|
|
|
|
|
122
|
# Make copies because objects could be mutated during
|
|
|
|
|
123
|
# iteration.
|
|
|
|
|
124
|
stop = self._inputcomplete
|
|
|
|
|
125
|
pending = list(self._pendingevents)
|
|
|
|
|
126
|
self._pendingevents[:] = []
|
|
|
|
|
127
|
|
|
|
|
|
128
|
for o in pending:
|
|
|
|
|
129
|
yield o
|
|
|
|
|
130
|
|
|
|
|
|
131
|
if stop:
|
|
|
|
|
132
|
break
|
|
51
|
|
|
133
|
|
|
52
|
class clienthandler(object):
|
|
134
|
class clienthandler(object):
|
|
53
|
"""Object to handle higher-level client activities.
|
|
135
|
"""Object to handle higher-level client activities.
|
|
@@
-80,6
+162,8
b' class clienthandler(object):'
|
|
80
|
rid = request.requestid
|
|
162
|
rid = request.requestid
|
|
81
|
self._requests[rid] = request
|
|
163
|
self._requests[rid] = request
|
|
82
|
self._futures[rid] = f
|
|
164
|
self._futures[rid] = f
|
|
|
|
|
165
|
# TODO we need some kind of lifetime on response instances otherwise
|
|
|
|
|
166
|
# objects() may deadlock.
|
|
83
|
self._responses[rid] = commandresponse(rid, command)
|
|
167
|
self._responses[rid] = commandresponse(rid, command)
|
|
84
|
|
|
168
|
|
|
85
|
return iter(())
|
|
169
|
return iter(())
|
|
@@
-119,8
+203,12
b' class clienthandler(object):'
|
|
119
|
if action == 'error':
|
|
203
|
if action == 'error':
|
|
120
|
e = error.RepoError(meta['message'])
|
|
204
|
e = error.RepoError(meta['message'])
|
|
121
|
|
|
205
|
|
|
|
|
|
206
|
if frame.requestid in self._responses:
|
|
|
|
|
207
|
self._responses[frame.requestid]._oninputcomplete()
|
|
|
|
|
208
|
|
|
122
|
if frame.requestid in self._futures:
|
|
209
|
if frame.requestid in self._futures:
|
|
123
|
self._futures[frame.requestid].set_exception(e)
|
|
210
|
self._futures[frame.requestid].set_exception(e)
|
|
|
|
|
211
|
del self._futures[frame.requestid]
|
|
124
|
else:
|
|
212
|
else:
|
|
125
|
raise e
|
|
213
|
raise e
|
|
126
|
|
|
214
|
|
|
@@
-141,39
+229,32
b' class clienthandler(object):'
|
|
141
|
self._processresponsedata(frame, meta, response)
|
|
229
|
self._processresponsedata(frame, meta, response)
|
|
142
|
except BaseException as e:
|
|
230
|
except BaseException as e:
|
|
143
|
self._futures[frame.requestid].set_exception(e)
|
|
231
|
self._futures[frame.requestid].set_exception(e)
|
|
|
|
|
232
|
del self._futures[frame.requestid]
|
|
|
|
|
233
|
response._oninputcomplete()
|
|
144
|
else:
|
|
234
|
else:
|
|
145
|
raise error.ProgrammingError(
|
|
235
|
raise error.ProgrammingError(
|
|
146
|
'unhandled action from clientreactor: %s' % action)
|
|
236
|
'unhandled action from clientreactor: %s' % action)
|
|
147
|
|
|
237
|
|
|
148
|
def _processresponsedata(self, frame, meta, response):
|
|
238
|
def _processresponsedata(self, frame, meta, response):
|
|
149
|
# This buffers all data until end of stream is received. This
|
|
239
|
# This can raise. The caller can handle it.
|
|
150
|
# is bad for performance.
|
|
240
|
response._onresponsedata(meta['data'])
|
|
151
|
# TODO make response data streamable
|
|
|
|
|
152
|
response.b.write(meta['data'])
|
|
|
|
|
153
|
|
|
241
|
|
|
154
|
if meta['eos']:
|
|
242
|
if meta['eos']:
|
|
155
|
# If the command has a decoder, resolve the future to the
|
|
243
|
response._oninputcomplete()
|
|
156
|
# decoded value. Otherwise resolve to the rich response object.
|
|
244
|
del self._requests[frame.requestid]
|
|
157
|
decoder = COMMAND_DECODERS.get(response.command)
|
|
|
|
|
158
|
|
|
|
|
|
159
|
# TODO consider always resolving the overall status map.
|
|
|
|
|
160
|
if decoder:
|
|
|
|
|
161
|
objs = response.cborobjects()
|
|
|
|
|
162
|
|
|
|
|
|
163
|
overall = next(objs)
|
|
|
|
|
164
|
|
|
245
|
|
|
165
|
if overall['status'] == 'ok':
|
|
246
|
# If the command has a decoder, we wait until all input has been
|
|
166
|
self._futures[frame.requestid].set_result(decoder(objs))
|
|
247
|
# received before resolving the future. Otherwise we resolve the
|
|
167
|
else:
|
|
248
|
# future immediately.
|
|
168
|
atoms = [{'msg': overall['error']['message']}]
|
|
249
|
if frame.requestid not in self._futures:
|
|
169
|
if 'args' in overall['error']:
|
|
250
|
return
|
|
170
|
atoms[0]['args'] = overall['error']['args']
|
|
|
|
|
171
|
e = error.RepoError(formatrichmessage(atoms))
|
|
|
|
|
172
|
self._futures[frame.requestid].set_exception(e)
|
|
|
|
|
173
|
else:
|
|
|
|
|
174
|
self._futures[frame.requestid].set_result(response)
|
|
|
|
|
175
|
|
|
251
|
|
|
176
|
del self._requests[frame.requestid]
|
|
252
|
if response.command not in COMMAND_DECODERS:
|
|
|
|
|
253
|
self._futures[frame.requestid].set_result(response.objects())
|
|
|
|
|
254
|
del self._futures[frame.requestid]
|
|
|
|
|
255
|
elif response._inputcomplete:
|
|
|
|
|
256
|
decoded = COMMAND_DECODERS[response.command](response.objects())
|
|
|
|
|
257
|
self._futures[frame.requestid].set_result(decoded)
|
|
177
|
del self._futures[frame.requestid]
|
|
258
|
del self._futures[frame.requestid]
|
|
178
|
|
|
259
|
|
|
179
|
def decodebranchmap(objs):
|
|
260
|
def decodebranchmap(objs):
|