##// END OF EJS Templates
wireprotov1peer: simplify the way batchable rpcs are defined...
Valentin Gatien-Baron -
r48686:cdad6560 default
parent child Browse files
Show More
@@ -1,670 +1,673 b''
1 # wireprotov1peer.py - Client-side functionality for wire protocol version 1.
1 # wireprotov1peer.py - Client-side functionality for wire protocol version 1.
2 #
2 #
3 # Copyright 2005-2010 Olivia Mackall <olivia@selenic.com>
3 # Copyright 2005-2010 Olivia Mackall <olivia@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import sys
10 import sys
11 import weakref
11 import weakref
12
12
13 from .i18n import _
13 from .i18n import _
14 from .node import bin
14 from .node import bin
15 from .pycompat import (
15 from .pycompat import (
16 getattr,
16 getattr,
17 setattr,
17 setattr,
18 )
18 )
19 from . import (
19 from . import (
20 bundle2,
20 bundle2,
21 changegroup as changegroupmod,
21 changegroup as changegroupmod,
22 encoding,
22 encoding,
23 error,
23 error,
24 pushkey as pushkeymod,
24 pushkey as pushkeymod,
25 pycompat,
25 pycompat,
26 util,
26 util,
27 wireprototypes,
27 wireprototypes,
28 )
28 )
29 from .interfaces import (
29 from .interfaces import (
30 repository,
30 repository,
31 util as interfaceutil,
31 util as interfaceutil,
32 )
32 )
33 from .utils import hashutil
33 from .utils import hashutil
34
34
35 urlreq = util.urlreq
35 urlreq = util.urlreq
36
36
37
37
38 def batchable(f):
38 def batchable_new_style(f):
39 """annotation for batchable methods
39 """annotation for batchable methods
40
40
41 Such methods must implement a coroutine as follows:
41 Such methods must implement a coroutine as follows:
42
42
43 @batchable
43 @batchable
44 def sample(self, one, two=None):
44 def sample(self, one, two=None):
45 # Build list of encoded arguments suitable for your wire protocol:
45 # Build list of encoded arguments suitable for your wire protocol:
46 encoded_args = [('one', encode(one),), ('two', encode(two),)]
46 encoded_args = [('one', encode(one),), ('two', encode(two),)]
47 # Create future for injection of encoded result:
47 # Return it, along with a function that will receive the result
48 encoded_res_future = future()
48 # from the batched request.
49 # Return encoded arguments and future:
49 return encoded_args, decode
50 yield encoded_args, encoded_res_future
51 # Assuming the future to be filled with the result from the batched
52 # request now. Decode it:
53 yield decode(encoded_res_future.value)
54
50
55 The decorator returns a function which wraps this coroutine as a plain
51 The decorator returns a function which wraps this coroutine as a plain
56 method, but adds the original method as an attribute called "batchable",
52 method, but adds the original method as an attribute called "batchable",
57 which is used by remotebatch to split the call into separate encoding and
53 which is used by remotebatch to split the call into separate encoding and
58 decoding phases.
54 decoding phases.
59 """
55 """
60
56
61 def plain(*args, **opts):
57 def plain(*args, **opts):
62 batchable = f(*args, **opts)
58 encoded_args_or_res, decode = f(*args, **opts)
63 encoded_args_or_res, encoded_res_future = next(batchable)
59 if not decode:
64 if not encoded_res_future:
65 return encoded_args_or_res # a local result in this case
60 return encoded_args_or_res # a local result in this case
66 self = args[0]
61 self = args[0]
67 cmd = pycompat.bytesurl(f.__name__) # ensure cmd is ascii bytestr
62 cmd = pycompat.bytesurl(f.__name__) # ensure cmd is ascii bytestr
68 encoded_res_future.set(self._submitone(cmd, encoded_args_or_res))
63 encoded_res = self._submitone(cmd, encoded_args_or_res)
69 return next(batchable)
64 return decode(encoded_res)
70
65
71 setattr(plain, 'batchable', f)
66 setattr(plain, 'batchable', f)
72 setattr(plain, '__name__', f.__name__)
67 setattr(plain, '__name__', f.__name__)
73 return plain
68 return plain
74
69
75
70
71 def batchable(f):
72 def upgraded(*args, **opts):
73 batchable = f(*args, **opts)
74 encoded_args_or_res, encoded_res_future = next(batchable)
75 if not encoded_res_future:
76 decode = None
77 else:
78
79 def decode(d):
80 encoded_res_future.set(d)
81 return next(batchable)
82
83 return encoded_args_or_res, decode
84
85 setattr(upgraded, '__name__', f.__name__)
86 return batchable_new_style(upgraded)
87
88
76 class future(object):
89 class future(object):
77 '''placeholder for a value to be set later'''
90 '''placeholder for a value to be set later'''
78
91
79 def set(self, value):
92 def set(self, value):
80 if util.safehasattr(self, b'value'):
93 if util.safehasattr(self, b'value'):
81 raise error.RepoError(b"future is already set")
94 raise error.RepoError(b"future is already set")
82 self.value = value
95 self.value = value
83
96
84
97
85 def encodebatchcmds(req):
98 def encodebatchcmds(req):
86 """Return a ``cmds`` argument value for the ``batch`` command."""
99 """Return a ``cmds`` argument value for the ``batch`` command."""
87 escapearg = wireprototypes.escapebatcharg
100 escapearg = wireprototypes.escapebatcharg
88
101
89 cmds = []
102 cmds = []
90 for op, argsdict in req:
103 for op, argsdict in req:
91 # Old servers didn't properly unescape argument names. So prevent
104 # Old servers didn't properly unescape argument names. So prevent
92 # the sending of argument names that may not be decoded properly by
105 # the sending of argument names that may not be decoded properly by
93 # servers.
106 # servers.
94 assert all(escapearg(k) == k for k in argsdict)
107 assert all(escapearg(k) == k for k in argsdict)
95
108
96 args = b','.join(
109 args = b','.join(
97 b'%s=%s' % (escapearg(k), escapearg(v))
110 b'%s=%s' % (escapearg(k), escapearg(v))
98 for k, v in pycompat.iteritems(argsdict)
111 for k, v in pycompat.iteritems(argsdict)
99 )
112 )
100 cmds.append(b'%s %s' % (op, args))
113 cmds.append(b'%s %s' % (op, args))
101
114
102 return b';'.join(cmds)
115 return b';'.join(cmds)
103
116
104
117
105 class unsentfuture(pycompat.futures.Future):
118 class unsentfuture(pycompat.futures.Future):
106 """A Future variation to represent an unsent command.
119 """A Future variation to represent an unsent command.
107
120
108 Because we buffer commands and don't submit them immediately, calling
121 Because we buffer commands and don't submit them immediately, calling
109 ``result()`` on an unsent future could deadlock. Futures for buffered
122 ``result()`` on an unsent future could deadlock. Futures for buffered
110 commands are represented by this type, which wraps ``result()`` to
123 commands are represented by this type, which wraps ``result()`` to
111 call ``sendcommands()``.
124 call ``sendcommands()``.
112 """
125 """
113
126
114 def result(self, timeout=None):
127 def result(self, timeout=None):
115 if self.done():
128 if self.done():
116 return pycompat.futures.Future.result(self, timeout)
129 return pycompat.futures.Future.result(self, timeout)
117
130
118 self._peerexecutor.sendcommands()
131 self._peerexecutor.sendcommands()
119
132
120 # This looks like it will infinitely recurse. However,
133 # This looks like it will infinitely recurse. However,
121 # sendcommands() should modify __class__. This call serves as a check
134 # sendcommands() should modify __class__. This call serves as a check
122 # on that.
135 # on that.
123 return self.result(timeout)
136 return self.result(timeout)
124
137
125
138
126 @interfaceutil.implementer(repository.ipeercommandexecutor)
139 @interfaceutil.implementer(repository.ipeercommandexecutor)
127 class peerexecutor(object):
140 class peerexecutor(object):
128 def __init__(self, peer):
141 def __init__(self, peer):
129 self._peer = peer
142 self._peer = peer
130 self._sent = False
143 self._sent = False
131 self._closed = False
144 self._closed = False
132 self._calls = []
145 self._calls = []
133 self._futures = weakref.WeakSet()
146 self._futures = weakref.WeakSet()
134 self._responseexecutor = None
147 self._responseexecutor = None
135 self._responsef = None
148 self._responsef = None
136
149
137 def __enter__(self):
150 def __enter__(self):
138 return self
151 return self
139
152
140 def __exit__(self, exctype, excvalee, exctb):
153 def __exit__(self, exctype, excvalee, exctb):
141 self.close()
154 self.close()
142
155
143 def callcommand(self, command, args):
156 def callcommand(self, command, args):
144 if self._sent:
157 if self._sent:
145 raise error.ProgrammingError(
158 raise error.ProgrammingError(
146 b'callcommand() cannot be used after commands are sent'
159 b'callcommand() cannot be used after commands are sent'
147 )
160 )
148
161
149 if self._closed:
162 if self._closed:
150 raise error.ProgrammingError(
163 raise error.ProgrammingError(
151 b'callcommand() cannot be used after close()'
164 b'callcommand() cannot be used after close()'
152 )
165 )
153
166
154 # Commands are dispatched through methods on the peer.
167 # Commands are dispatched through methods on the peer.
155 fn = getattr(self._peer, pycompat.sysstr(command), None)
168 fn = getattr(self._peer, pycompat.sysstr(command), None)
156
169
157 if not fn:
170 if not fn:
158 raise error.ProgrammingError(
171 raise error.ProgrammingError(
159 b'cannot call command %s: method of same name not available '
172 b'cannot call command %s: method of same name not available '
160 b'on peer' % command
173 b'on peer' % command
161 )
174 )
162
175
163 # Commands are either batchable or they aren't. If a command
176 # Commands are either batchable or they aren't. If a command
164 # isn't batchable, we send it immediately because the executor
177 # isn't batchable, we send it immediately because the executor
165 # can no longer accept new commands after a non-batchable command.
178 # can no longer accept new commands after a non-batchable command.
166 # If a command is batchable, we queue it for later. But we have
179 # If a command is batchable, we queue it for later. But we have
167 # to account for the case of a non-batchable command arriving after
180 # to account for the case of a non-batchable command arriving after
168 # a batchable one and refuse to service it.
181 # a batchable one and refuse to service it.
169
182
170 def addcall():
183 def addcall():
171 f = pycompat.futures.Future()
184 f = pycompat.futures.Future()
172 self._futures.add(f)
185 self._futures.add(f)
173 self._calls.append((command, args, fn, f))
186 self._calls.append((command, args, fn, f))
174 return f
187 return f
175
188
176 if getattr(fn, 'batchable', False):
189 if getattr(fn, 'batchable', False):
177 f = addcall()
190 f = addcall()
178
191
179 # But since we don't issue it immediately, we wrap its result()
192 # But since we don't issue it immediately, we wrap its result()
180 # to trigger sending so we avoid deadlocks.
193 # to trigger sending so we avoid deadlocks.
181 f.__class__ = unsentfuture
194 f.__class__ = unsentfuture
182 f._peerexecutor = self
195 f._peerexecutor = self
183 else:
196 else:
184 if self._calls:
197 if self._calls:
185 raise error.ProgrammingError(
198 raise error.ProgrammingError(
186 b'%s is not batchable and cannot be called on a command '
199 b'%s is not batchable and cannot be called on a command '
187 b'executor along with other commands' % command
200 b'executor along with other commands' % command
188 )
201 )
189
202
190 f = addcall()
203 f = addcall()
191
204
192 # Non-batchable commands can never coexist with another command
205 # Non-batchable commands can never coexist with another command
193 # in this executor. So send the command immediately.
206 # in this executor. So send the command immediately.
194 self.sendcommands()
207 self.sendcommands()
195
208
196 return f
209 return f
197
210
198 def sendcommands(self):
211 def sendcommands(self):
199 if self._sent:
212 if self._sent:
200 return
213 return
201
214
202 if not self._calls:
215 if not self._calls:
203 return
216 return
204
217
205 self._sent = True
218 self._sent = True
206
219
207 # Unhack any future types so caller seens a clean type and to break
220 # Unhack any future types so caller seens a clean type and to break
208 # cycle between us and futures.
221 # cycle between us and futures.
209 for f in self._futures:
222 for f in self._futures:
210 if isinstance(f, unsentfuture):
223 if isinstance(f, unsentfuture):
211 f.__class__ = pycompat.futures.Future
224 f.__class__ = pycompat.futures.Future
212 f._peerexecutor = None
225 f._peerexecutor = None
213
226
214 calls = self._calls
227 calls = self._calls
215 # Mainly to destroy references to futures.
228 # Mainly to destroy references to futures.
216 self._calls = None
229 self._calls = None
217
230
218 # Simple case of a single command. We call it synchronously.
231 # Simple case of a single command. We call it synchronously.
219 if len(calls) == 1:
232 if len(calls) == 1:
220 command, args, fn, f = calls[0]
233 command, args, fn, f = calls[0]
221
234
222 # Future was cancelled. Ignore it.
235 # Future was cancelled. Ignore it.
223 if not f.set_running_or_notify_cancel():
236 if not f.set_running_or_notify_cancel():
224 return
237 return
225
238
226 try:
239 try:
227 result = fn(**pycompat.strkwargs(args))
240 result = fn(**pycompat.strkwargs(args))
228 except Exception:
241 except Exception:
229 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
242 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
230 else:
243 else:
231 f.set_result(result)
244 f.set_result(result)
232
245
233 return
246 return
234
247
235 # Batch commands are a bit harder. First, we have to deal with the
248 # Batch commands are a bit harder. First, we have to deal with the
236 # @batchable coroutine. That's a bit annoying. Furthermore, we also
249 # @batchable coroutine. That's a bit annoying. Furthermore, we also
237 # need to preserve streaming. i.e. it should be possible for the
250 # need to preserve streaming. i.e. it should be possible for the
238 # futures to resolve as data is coming in off the wire without having
251 # futures to resolve as data is coming in off the wire without having
239 # to wait for the final byte of the final response. We do this by
252 # to wait for the final byte of the final response. We do this by
240 # spinning up a thread to read the responses.
253 # spinning up a thread to read the responses.
241
254
242 requests = []
255 requests = []
243 states = []
256 states = []
244
257
245 for command, args, fn, f in calls:
258 for command, args, fn, f in calls:
246 # Future was cancelled. Ignore it.
259 # Future was cancelled. Ignore it.
247 if not f.set_running_or_notify_cancel():
260 if not f.set_running_or_notify_cancel():
248 continue
261 continue
249
262
250 try:
263 try:
251 batchable = fn.batchable(
264 encoded_args_or_res, decode = fn.batchable(
252 fn.__self__, **pycompat.strkwargs(args)
265 fn.__self__, **pycompat.strkwargs(args)
253 )
266 )
254 except Exception:
267 except Exception:
255 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
268 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
256 return
269 return
257
270
258 # Encoded arguments and future holding remote result.
271 if not decode:
259 try:
260 encoded_args_or_res, fremote = next(batchable)
261 except Exception:
262 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
263 return
264
265 if not fremote:
266 f.set_result(encoded_args_or_res)
272 f.set_result(encoded_args_or_res)
267 else:
273 else:
268 requests.append((command, encoded_args_or_res))
274 requests.append((command, encoded_args_or_res))
269 states.append((command, f, batchable, fremote))
275 states.append((command, f, batchable, decode))
270
276
271 if not requests:
277 if not requests:
272 return
278 return
273
279
274 # This will emit responses in order they were executed.
280 # This will emit responses in order they were executed.
275 wireresults = self._peer._submitbatch(requests)
281 wireresults = self._peer._submitbatch(requests)
276
282
277 # The use of a thread pool executor here is a bit weird for something
283 # The use of a thread pool executor here is a bit weird for something
278 # that only spins up a single thread. However, thread management is
284 # that only spins up a single thread. However, thread management is
279 # hard and it is easy to encounter race conditions, deadlocks, etc.
285 # hard and it is easy to encounter race conditions, deadlocks, etc.
280 # concurrent.futures already solves these problems and its thread pool
286 # concurrent.futures already solves these problems and its thread pool
281 # executor has minimal overhead. So we use it.
287 # executor has minimal overhead. So we use it.
282 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
288 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
283 self._responsef = self._responseexecutor.submit(
289 self._responsef = self._responseexecutor.submit(
284 self._readbatchresponse, states, wireresults
290 self._readbatchresponse, states, wireresults
285 )
291 )
286
292
287 def close(self):
293 def close(self):
288 self.sendcommands()
294 self.sendcommands()
289
295
290 if self._closed:
296 if self._closed:
291 return
297 return
292
298
293 self._closed = True
299 self._closed = True
294
300
295 if not self._responsef:
301 if not self._responsef:
296 return
302 return
297
303
298 # We need to wait on our in-flight response and then shut down the
304 # We need to wait on our in-flight response and then shut down the
299 # executor once we have a result.
305 # executor once we have a result.
300 try:
306 try:
301 self._responsef.result()
307 self._responsef.result()
302 finally:
308 finally:
303 self._responseexecutor.shutdown(wait=True)
309 self._responseexecutor.shutdown(wait=True)
304 self._responsef = None
310 self._responsef = None
305 self._responseexecutor = None
311 self._responseexecutor = None
306
312
307 # If any of our futures are still in progress, mark them as
313 # If any of our futures are still in progress, mark them as
308 # errored. Otherwise a result() could wait indefinitely.
314 # errored. Otherwise a result() could wait indefinitely.
309 for f in self._futures:
315 for f in self._futures:
310 if not f.done():
316 if not f.done():
311 f.set_exception(
317 f.set_exception(
312 error.ResponseError(
318 error.ResponseError(
313 _(b'unfulfilled batch command response'), None
319 _(b'unfulfilled batch command response'), None
314 )
320 )
315 )
321 )
316
322
317 self._futures = None
323 self._futures = None
318
324
319 def _readbatchresponse(self, states, wireresults):
325 def _readbatchresponse(self, states, wireresults):
320 # Executes in a thread to read data off the wire.
326 # Executes in a thread to read data off the wire.
321
327
322 for command, f, batchable, fremote in states:
328 for command, f, batchable, decode in states:
323 # Grab raw result off the wire and teach the internal future
329 # Grab raw result off the wire and teach the internal future
324 # about it.
330 # about it.
325 try:
331 try:
326 remoteresult = next(wireresults)
332 remoteresult = next(wireresults)
327 except StopIteration:
333 except StopIteration:
328 # This can happen in particular because next(batchable)
334 # This can happen in particular because next(batchable)
329 # in the previous iteration can call peer._abort, which
335 # in the previous iteration can call peer._abort, which
330 # may close the peer.
336 # may close the peer.
331 f.set_exception(
337 f.set_exception(
332 error.ResponseError(
338 error.ResponseError(
333 _(b'unfulfilled batch command response'), None
339 _(b'unfulfilled batch command response'), None
334 )
340 )
335 )
341 )
336 else:
342 else:
337 fremote.set(remoteresult)
338
339 # And ask the coroutine to decode that value.
340 try:
343 try:
341 result = next(batchable)
344 result = decode(remoteresult)
342 except Exception:
345 except Exception:
343 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
346 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
344 else:
347 else:
345 f.set_result(result)
348 f.set_result(result)
346
349
347
350
348 @interfaceutil.implementer(
351 @interfaceutil.implementer(
349 repository.ipeercommands, repository.ipeerlegacycommands
352 repository.ipeercommands, repository.ipeerlegacycommands
350 )
353 )
351 class wirepeer(repository.peer):
354 class wirepeer(repository.peer):
352 """Client-side interface for communicating with a peer repository.
355 """Client-side interface for communicating with a peer repository.
353
356
354 Methods commonly call wire protocol commands of the same name.
357 Methods commonly call wire protocol commands of the same name.
355
358
356 See also httppeer.py and sshpeer.py for protocol-specific
359 See also httppeer.py and sshpeer.py for protocol-specific
357 implementations of this interface.
360 implementations of this interface.
358 """
361 """
359
362
360 def commandexecutor(self):
363 def commandexecutor(self):
361 return peerexecutor(self)
364 return peerexecutor(self)
362
365
363 # Begin of ipeercommands interface.
366 # Begin of ipeercommands interface.
364
367
365 def clonebundles(self):
368 def clonebundles(self):
366 self.requirecap(b'clonebundles', _(b'clone bundles'))
369 self.requirecap(b'clonebundles', _(b'clone bundles'))
367 return self._call(b'clonebundles')
370 return self._call(b'clonebundles')
368
371
369 @batchable
372 @batchable
370 def lookup(self, key):
373 def lookup(self, key):
371 self.requirecap(b'lookup', _(b'look up remote revision'))
374 self.requirecap(b'lookup', _(b'look up remote revision'))
372 f = future()
375 f = future()
373 yield {b'key': encoding.fromlocal(key)}, f
376 yield {b'key': encoding.fromlocal(key)}, f
374 d = f.value
377 d = f.value
375 success, data = d[:-1].split(b" ", 1)
378 success, data = d[:-1].split(b" ", 1)
376 if int(success):
379 if int(success):
377 yield bin(data)
380 yield bin(data)
378 else:
381 else:
379 self._abort(error.RepoError(data))
382 self._abort(error.RepoError(data))
380
383
381 @batchable
384 @batchable
382 def heads(self):
385 def heads(self):
383 f = future()
386 f = future()
384 yield {}, f
387 yield {}, f
385 d = f.value
388 d = f.value
386 try:
389 try:
387 yield wireprototypes.decodelist(d[:-1])
390 yield wireprototypes.decodelist(d[:-1])
388 except ValueError:
391 except ValueError:
389 self._abort(error.ResponseError(_(b"unexpected response:"), d))
392 self._abort(error.ResponseError(_(b"unexpected response:"), d))
390
393
391 @batchable
394 @batchable
392 def known(self, nodes):
395 def known(self, nodes):
393 f = future()
396 f = future()
394 yield {b'nodes': wireprototypes.encodelist(nodes)}, f
397 yield {b'nodes': wireprototypes.encodelist(nodes)}, f
395 d = f.value
398 d = f.value
396 try:
399 try:
397 yield [bool(int(b)) for b in pycompat.iterbytestr(d)]
400 yield [bool(int(b)) for b in pycompat.iterbytestr(d)]
398 except ValueError:
401 except ValueError:
399 self._abort(error.ResponseError(_(b"unexpected response:"), d))
402 self._abort(error.ResponseError(_(b"unexpected response:"), d))
400
403
401 @batchable
404 @batchable
402 def branchmap(self):
405 def branchmap(self):
403 f = future()
406 f = future()
404 yield {}, f
407 yield {}, f
405 d = f.value
408 d = f.value
406 try:
409 try:
407 branchmap = {}
410 branchmap = {}
408 for branchpart in d.splitlines():
411 for branchpart in d.splitlines():
409 branchname, branchheads = branchpart.split(b' ', 1)
412 branchname, branchheads = branchpart.split(b' ', 1)
410 branchname = encoding.tolocal(urlreq.unquote(branchname))
413 branchname = encoding.tolocal(urlreq.unquote(branchname))
411 branchheads = wireprototypes.decodelist(branchheads)
414 branchheads = wireprototypes.decodelist(branchheads)
412 branchmap[branchname] = branchheads
415 branchmap[branchname] = branchheads
413 yield branchmap
416 yield branchmap
414 except TypeError:
417 except TypeError:
415 self._abort(error.ResponseError(_(b"unexpected response:"), d))
418 self._abort(error.ResponseError(_(b"unexpected response:"), d))
416
419
417 @batchable
420 @batchable
418 def listkeys(self, namespace):
421 def listkeys(self, namespace):
419 if not self.capable(b'pushkey'):
422 if not self.capable(b'pushkey'):
420 yield {}, None
423 yield {}, None
421 f = future()
424 f = future()
422 self.ui.debug(b'preparing listkeys for "%s"\n' % namespace)
425 self.ui.debug(b'preparing listkeys for "%s"\n' % namespace)
423 yield {b'namespace': encoding.fromlocal(namespace)}, f
426 yield {b'namespace': encoding.fromlocal(namespace)}, f
424 d = f.value
427 d = f.value
425 self.ui.debug(
428 self.ui.debug(
426 b'received listkey for "%s": %i bytes\n' % (namespace, len(d))
429 b'received listkey for "%s": %i bytes\n' % (namespace, len(d))
427 )
430 )
428 yield pushkeymod.decodekeys(d)
431 yield pushkeymod.decodekeys(d)
429
432
430 @batchable
433 @batchable
431 def pushkey(self, namespace, key, old, new):
434 def pushkey(self, namespace, key, old, new):
432 if not self.capable(b'pushkey'):
435 if not self.capable(b'pushkey'):
433 yield False, None
436 yield False, None
434 f = future()
437 f = future()
435 self.ui.debug(b'preparing pushkey for "%s:%s"\n' % (namespace, key))
438 self.ui.debug(b'preparing pushkey for "%s:%s"\n' % (namespace, key))
436 yield {
439 yield {
437 b'namespace': encoding.fromlocal(namespace),
440 b'namespace': encoding.fromlocal(namespace),
438 b'key': encoding.fromlocal(key),
441 b'key': encoding.fromlocal(key),
439 b'old': encoding.fromlocal(old),
442 b'old': encoding.fromlocal(old),
440 b'new': encoding.fromlocal(new),
443 b'new': encoding.fromlocal(new),
441 }, f
444 }, f
442 d = f.value
445 d = f.value
443 d, output = d.split(b'\n', 1)
446 d, output = d.split(b'\n', 1)
444 try:
447 try:
445 d = bool(int(d))
448 d = bool(int(d))
446 except ValueError:
449 except ValueError:
447 raise error.ResponseError(
450 raise error.ResponseError(
448 _(b'push failed (unexpected response):'), d
451 _(b'push failed (unexpected response):'), d
449 )
452 )
450 for l in output.splitlines(True):
453 for l in output.splitlines(True):
451 self.ui.status(_(b'remote: '), l)
454 self.ui.status(_(b'remote: '), l)
452 yield d
455 yield d
453
456
454 def stream_out(self):
457 def stream_out(self):
455 return self._callstream(b'stream_out')
458 return self._callstream(b'stream_out')
456
459
457 def getbundle(self, source, **kwargs):
460 def getbundle(self, source, **kwargs):
458 kwargs = pycompat.byteskwargs(kwargs)
461 kwargs = pycompat.byteskwargs(kwargs)
459 self.requirecap(b'getbundle', _(b'look up remote changes'))
462 self.requirecap(b'getbundle', _(b'look up remote changes'))
460 opts = {}
463 opts = {}
461 bundlecaps = kwargs.get(b'bundlecaps') or set()
464 bundlecaps = kwargs.get(b'bundlecaps') or set()
462 for key, value in pycompat.iteritems(kwargs):
465 for key, value in pycompat.iteritems(kwargs):
463 if value is None:
466 if value is None:
464 continue
467 continue
465 keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
468 keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
466 if keytype is None:
469 if keytype is None:
467 raise error.ProgrammingError(
470 raise error.ProgrammingError(
468 b'Unexpectedly None keytype for key %s' % key
471 b'Unexpectedly None keytype for key %s' % key
469 )
472 )
470 elif keytype == b'nodes':
473 elif keytype == b'nodes':
471 value = wireprototypes.encodelist(value)
474 value = wireprototypes.encodelist(value)
472 elif keytype == b'csv':
475 elif keytype == b'csv':
473 value = b','.join(value)
476 value = b','.join(value)
474 elif keytype == b'scsv':
477 elif keytype == b'scsv':
475 value = b','.join(sorted(value))
478 value = b','.join(sorted(value))
476 elif keytype == b'boolean':
479 elif keytype == b'boolean':
477 value = b'%i' % bool(value)
480 value = b'%i' % bool(value)
478 elif keytype != b'plain':
481 elif keytype != b'plain':
479 raise KeyError(b'unknown getbundle option type %s' % keytype)
482 raise KeyError(b'unknown getbundle option type %s' % keytype)
480 opts[key] = value
483 opts[key] = value
481 f = self._callcompressable(b"getbundle", **pycompat.strkwargs(opts))
484 f = self._callcompressable(b"getbundle", **pycompat.strkwargs(opts))
482 if any((cap.startswith(b'HG2') for cap in bundlecaps)):
485 if any((cap.startswith(b'HG2') for cap in bundlecaps)):
483 return bundle2.getunbundler(self.ui, f)
486 return bundle2.getunbundler(self.ui, f)
484 else:
487 else:
485 return changegroupmod.cg1unpacker(f, b'UN')
488 return changegroupmod.cg1unpacker(f, b'UN')
486
489
487 def unbundle(self, bundle, heads, url):
490 def unbundle(self, bundle, heads, url):
488 """Send cg (a readable file-like object representing the
491 """Send cg (a readable file-like object representing the
489 changegroup to push, typically a chunkbuffer object) to the
492 changegroup to push, typically a chunkbuffer object) to the
490 remote server as a bundle.
493 remote server as a bundle.
491
494
492 When pushing a bundle10 stream, return an integer indicating the
495 When pushing a bundle10 stream, return an integer indicating the
493 result of the push (see changegroup.apply()).
496 result of the push (see changegroup.apply()).
494
497
495 When pushing a bundle20 stream, return a bundle20 stream.
498 When pushing a bundle20 stream, return a bundle20 stream.
496
499
497 `url` is the url the client thinks it's pushing to, which is
500 `url` is the url the client thinks it's pushing to, which is
498 visible to hooks.
501 visible to hooks.
499 """
502 """
500
503
501 if heads != [b'force'] and self.capable(b'unbundlehash'):
504 if heads != [b'force'] and self.capable(b'unbundlehash'):
502 heads = wireprototypes.encodelist(
505 heads = wireprototypes.encodelist(
503 [b'hashed', hashutil.sha1(b''.join(sorted(heads))).digest()]
506 [b'hashed', hashutil.sha1(b''.join(sorted(heads))).digest()]
504 )
507 )
505 else:
508 else:
506 heads = wireprototypes.encodelist(heads)
509 heads = wireprototypes.encodelist(heads)
507
510
508 if util.safehasattr(bundle, b'deltaheader'):
511 if util.safehasattr(bundle, b'deltaheader'):
509 # this a bundle10, do the old style call sequence
512 # this a bundle10, do the old style call sequence
510 ret, output = self._callpush(b"unbundle", bundle, heads=heads)
513 ret, output = self._callpush(b"unbundle", bundle, heads=heads)
511 if ret == b"":
514 if ret == b"":
512 raise error.ResponseError(_(b'push failed:'), output)
515 raise error.ResponseError(_(b'push failed:'), output)
513 try:
516 try:
514 ret = int(ret)
517 ret = int(ret)
515 except ValueError:
518 except ValueError:
516 raise error.ResponseError(
519 raise error.ResponseError(
517 _(b'push failed (unexpected response):'), ret
520 _(b'push failed (unexpected response):'), ret
518 )
521 )
519
522
520 for l in output.splitlines(True):
523 for l in output.splitlines(True):
521 self.ui.status(_(b'remote: '), l)
524 self.ui.status(_(b'remote: '), l)
522 else:
525 else:
523 # bundle2 push. Send a stream, fetch a stream.
526 # bundle2 push. Send a stream, fetch a stream.
524 stream = self._calltwowaystream(b'unbundle', bundle, heads=heads)
527 stream = self._calltwowaystream(b'unbundle', bundle, heads=heads)
525 ret = bundle2.getunbundler(self.ui, stream)
528 ret = bundle2.getunbundler(self.ui, stream)
526 return ret
529 return ret
527
530
528 # End of ipeercommands interface.
531 # End of ipeercommands interface.
529
532
530 # Begin of ipeerlegacycommands interface.
533 # Begin of ipeerlegacycommands interface.
531
534
532 def branches(self, nodes):
535 def branches(self, nodes):
533 n = wireprototypes.encodelist(nodes)
536 n = wireprototypes.encodelist(nodes)
534 d = self._call(b"branches", nodes=n)
537 d = self._call(b"branches", nodes=n)
535 try:
538 try:
536 br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
539 br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
537 return br
540 return br
538 except ValueError:
541 except ValueError:
539 self._abort(error.ResponseError(_(b"unexpected response:"), d))
542 self._abort(error.ResponseError(_(b"unexpected response:"), d))
540
543
541 def between(self, pairs):
544 def between(self, pairs):
542 batch = 8 # avoid giant requests
545 batch = 8 # avoid giant requests
543 r = []
546 r = []
544 for i in pycompat.xrange(0, len(pairs), batch):
547 for i in pycompat.xrange(0, len(pairs), batch):
545 n = b" ".join(
548 n = b" ".join(
546 [
549 [
547 wireprototypes.encodelist(p, b'-')
550 wireprototypes.encodelist(p, b'-')
548 for p in pairs[i : i + batch]
551 for p in pairs[i : i + batch]
549 ]
552 ]
550 )
553 )
551 d = self._call(b"between", pairs=n)
554 d = self._call(b"between", pairs=n)
552 try:
555 try:
553 r.extend(
556 r.extend(
554 l and wireprototypes.decodelist(l) or []
557 l and wireprototypes.decodelist(l) or []
555 for l in d.splitlines()
558 for l in d.splitlines()
556 )
559 )
557 except ValueError:
560 except ValueError:
558 self._abort(error.ResponseError(_(b"unexpected response:"), d))
561 self._abort(error.ResponseError(_(b"unexpected response:"), d))
559 return r
562 return r
560
563
561 def changegroup(self, nodes, source):
564 def changegroup(self, nodes, source):
562 n = wireprototypes.encodelist(nodes)
565 n = wireprototypes.encodelist(nodes)
563 f = self._callcompressable(b"changegroup", roots=n)
566 f = self._callcompressable(b"changegroup", roots=n)
564 return changegroupmod.cg1unpacker(f, b'UN')
567 return changegroupmod.cg1unpacker(f, b'UN')
565
568
566 def changegroupsubset(self, bases, heads, source):
569 def changegroupsubset(self, bases, heads, source):
567 self.requirecap(b'changegroupsubset', _(b'look up remote changes'))
570 self.requirecap(b'changegroupsubset', _(b'look up remote changes'))
568 bases = wireprototypes.encodelist(bases)
571 bases = wireprototypes.encodelist(bases)
569 heads = wireprototypes.encodelist(heads)
572 heads = wireprototypes.encodelist(heads)
570 f = self._callcompressable(
573 f = self._callcompressable(
571 b"changegroupsubset", bases=bases, heads=heads
574 b"changegroupsubset", bases=bases, heads=heads
572 )
575 )
573 return changegroupmod.cg1unpacker(f, b'UN')
576 return changegroupmod.cg1unpacker(f, b'UN')
574
577
575 # End of ipeerlegacycommands interface.
578 # End of ipeerlegacycommands interface.
576
579
577 def _submitbatch(self, req):
580 def _submitbatch(self, req):
578 """run batch request <req> on the server
581 """run batch request <req> on the server
579
582
580 Returns an iterator of the raw responses from the server.
583 Returns an iterator of the raw responses from the server.
581 """
584 """
582 ui = self.ui
585 ui = self.ui
583 if ui.debugflag and ui.configbool(b'devel', b'debug.peer-request'):
586 if ui.debugflag and ui.configbool(b'devel', b'debug.peer-request'):
584 ui.debug(b'devel-peer-request: batched-content\n')
587 ui.debug(b'devel-peer-request: batched-content\n')
585 for op, args in req:
588 for op, args in req:
586 msg = b'devel-peer-request: - %s (%d arguments)\n'
589 msg = b'devel-peer-request: - %s (%d arguments)\n'
587 ui.debug(msg % (op, len(args)))
590 ui.debug(msg % (op, len(args)))
588
591
589 unescapearg = wireprototypes.unescapebatcharg
592 unescapearg = wireprototypes.unescapebatcharg
590
593
591 rsp = self._callstream(b"batch", cmds=encodebatchcmds(req))
594 rsp = self._callstream(b"batch", cmds=encodebatchcmds(req))
592 chunk = rsp.read(1024)
595 chunk = rsp.read(1024)
593 work = [chunk]
596 work = [chunk]
594 while chunk:
597 while chunk:
595 while b';' not in chunk and chunk:
598 while b';' not in chunk and chunk:
596 chunk = rsp.read(1024)
599 chunk = rsp.read(1024)
597 work.append(chunk)
600 work.append(chunk)
598 merged = b''.join(work)
601 merged = b''.join(work)
599 while b';' in merged:
602 while b';' in merged:
600 one, merged = merged.split(b';', 1)
603 one, merged = merged.split(b';', 1)
601 yield unescapearg(one)
604 yield unescapearg(one)
602 chunk = rsp.read(1024)
605 chunk = rsp.read(1024)
603 work = [merged, chunk]
606 work = [merged, chunk]
604 yield unescapearg(b''.join(work))
607 yield unescapearg(b''.join(work))
605
608
606 def _submitone(self, op, args):
609 def _submitone(self, op, args):
607 return self._call(op, **pycompat.strkwargs(args))
610 return self._call(op, **pycompat.strkwargs(args))
608
611
609 def debugwireargs(self, one, two, three=None, four=None, five=None):
612 def debugwireargs(self, one, two, three=None, four=None, five=None):
610 # don't pass optional arguments left at their default value
613 # don't pass optional arguments left at their default value
611 opts = {}
614 opts = {}
612 if three is not None:
615 if three is not None:
613 opts['three'] = three
616 opts['three'] = three
614 if four is not None:
617 if four is not None:
615 opts['four'] = four
618 opts['four'] = four
616 return self._call(b'debugwireargs', one=one, two=two, **opts)
619 return self._call(b'debugwireargs', one=one, two=two, **opts)
617
620
618 def _call(self, cmd, **args):
621 def _call(self, cmd, **args):
619 """execute <cmd> on the server
622 """execute <cmd> on the server
620
623
621 The command is expected to return a simple string.
624 The command is expected to return a simple string.
622
625
623 returns the server reply as a string."""
626 returns the server reply as a string."""
624 raise NotImplementedError()
627 raise NotImplementedError()
625
628
626 def _callstream(self, cmd, **args):
629 def _callstream(self, cmd, **args):
627 """execute <cmd> on the server
630 """execute <cmd> on the server
628
631
629 The command is expected to return a stream. Note that if the
632 The command is expected to return a stream. Note that if the
630 command doesn't return a stream, _callstream behaves
633 command doesn't return a stream, _callstream behaves
631 differently for ssh and http peers.
634 differently for ssh and http peers.
632
635
633 returns the server reply as a file like object.
636 returns the server reply as a file like object.
634 """
637 """
635 raise NotImplementedError()
638 raise NotImplementedError()
636
639
637 def _callcompressable(self, cmd, **args):
640 def _callcompressable(self, cmd, **args):
638 """execute <cmd> on the server
641 """execute <cmd> on the server
639
642
640 The command is expected to return a stream.
643 The command is expected to return a stream.
641
644
642 The stream may have been compressed in some implementations. This
645 The stream may have been compressed in some implementations. This
643 function takes care of the decompression. This is the only difference
646 function takes care of the decompression. This is the only difference
644 with _callstream.
647 with _callstream.
645
648
646 returns the server reply as a file like object.
649 returns the server reply as a file like object.
647 """
650 """
648 raise NotImplementedError()
651 raise NotImplementedError()
649
652
650 def _callpush(self, cmd, fp, **args):
653 def _callpush(self, cmd, fp, **args):
651 """execute a <cmd> on server
654 """execute a <cmd> on server
652
655
653 The command is expected to be related to a push. Push has a special
656 The command is expected to be related to a push. Push has a special
654 return method.
657 return method.
655
658
656 returns the server reply as a (ret, output) tuple. ret is either
659 returns the server reply as a (ret, output) tuple. ret is either
657 empty (error) or a stringified int.
660 empty (error) or a stringified int.
658 """
661 """
659 raise NotImplementedError()
662 raise NotImplementedError()
660
663
661 def _calltwowaystream(self, cmd, fp, **args):
664 def _calltwowaystream(self, cmd, fp, **args):
662 """execute <cmd> on server
665 """execute <cmd> on server
663
666
664 The command will send a stream to the server and get a stream in reply.
667 The command will send a stream to the server and get a stream in reply.
665 """
668 """
666 raise NotImplementedError()
669 raise NotImplementedError()
667
670
668 def _abort(self, exception):
671 def _abort(self, exception):
669 """clearly abort the wire protocol connection and raise the exception"""
672 """clearly abort the wire protocol connection and raise the exception"""
670 raise NotImplementedError()
673 raise NotImplementedError()
General Comments 0
You need to be logged in to leave comments. Login now