##// END OF EJS Templates
safehasattr: pass attribute name as string instead of bytes...
marmoute -
r51444:b1fb4185 default
parent child Browse files
Show More
@@ -1,648 +1,648 b''
1 # wireprotov1peer.py - Client-side functionality for wire protocol version 1.
1 # wireprotov1peer.py - Client-side functionality for wire protocol version 1.
2 #
2 #
3 # Copyright 2005-2010 Olivia Mackall <olivia@selenic.com>
3 # Copyright 2005-2010 Olivia Mackall <olivia@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8
8
9 import sys
9 import sys
10 import weakref
10 import weakref
11
11
12 from concurrent import futures
12 from concurrent import futures
13 from .i18n import _
13 from .i18n import _
14 from .node import bin
14 from .node import bin
15 from .pycompat import (
15 from .pycompat import (
16 getattr,
16 getattr,
17 setattr,
17 setattr,
18 )
18 )
19 from . import (
19 from . import (
20 bundle2,
20 bundle2,
21 changegroup as changegroupmod,
21 changegroup as changegroupmod,
22 encoding,
22 encoding,
23 error,
23 error,
24 pushkey as pushkeymod,
24 pushkey as pushkeymod,
25 pycompat,
25 pycompat,
26 util,
26 util,
27 wireprototypes,
27 wireprototypes,
28 )
28 )
29 from .interfaces import (
29 from .interfaces import (
30 repository,
30 repository,
31 util as interfaceutil,
31 util as interfaceutil,
32 )
32 )
33 from .utils import hashutil
33 from .utils import hashutil
34
34
35 urlreq = util.urlreq
35 urlreq = util.urlreq
36
36
37
37
38 def batchable(f):
38 def batchable(f):
39 """annotation for batchable methods
39 """annotation for batchable methods
40
40
41 Such methods must implement a coroutine as follows:
41 Such methods must implement a coroutine as follows:
42
42
43 @batchable
43 @batchable
44 def sample(self, one, two=None):
44 def sample(self, one, two=None):
45 # Build list of encoded arguments suitable for your wire protocol:
45 # Build list of encoded arguments suitable for your wire protocol:
46 encoded_args = [('one', encode(one),), ('two', encode(two),)]
46 encoded_args = [('one', encode(one),), ('two', encode(two),)]
47 # Return it, along with a function that will receive the result
47 # Return it, along with a function that will receive the result
48 # from the batched request.
48 # from the batched request.
49 return encoded_args, decode
49 return encoded_args, decode
50
50
51 The decorator returns a function which wraps this coroutine as a plain
51 The decorator returns a function which wraps this coroutine as a plain
52 method, but adds the original method as an attribute called "batchable",
52 method, but adds the original method as an attribute called "batchable",
53 which is used by remotebatch to split the call into separate encoding and
53 which is used by remotebatch to split the call into separate encoding and
54 decoding phases.
54 decoding phases.
55 """
55 """
56
56
57 def plain(*args, **opts):
57 def plain(*args, **opts):
58 encoded_args_or_res, decode = f(*args, **opts)
58 encoded_args_or_res, decode = f(*args, **opts)
59 if not decode:
59 if not decode:
60 return encoded_args_or_res # a local result in this case
60 return encoded_args_or_res # a local result in this case
61 self = args[0]
61 self = args[0]
62 cmd = pycompat.bytesurl(f.__name__) # ensure cmd is ascii bytestr
62 cmd = pycompat.bytesurl(f.__name__) # ensure cmd is ascii bytestr
63 encoded_res = self._submitone(cmd, encoded_args_or_res)
63 encoded_res = self._submitone(cmd, encoded_args_or_res)
64 return decode(encoded_res)
64 return decode(encoded_res)
65
65
66 setattr(plain, 'batchable', f)
66 setattr(plain, 'batchable', f)
67 setattr(plain, '__name__', f.__name__)
67 setattr(plain, '__name__', f.__name__)
68 return plain
68 return plain
69
69
70
70
71 def encodebatchcmds(req):
71 def encodebatchcmds(req):
72 """Return a ``cmds`` argument value for the ``batch`` command."""
72 """Return a ``cmds`` argument value for the ``batch`` command."""
73 escapearg = wireprototypes.escapebatcharg
73 escapearg = wireprototypes.escapebatcharg
74
74
75 cmds = []
75 cmds = []
76 for op, argsdict in req:
76 for op, argsdict in req:
77 # Old servers didn't properly unescape argument names. So prevent
77 # Old servers didn't properly unescape argument names. So prevent
78 # the sending of argument names that may not be decoded properly by
78 # the sending of argument names that may not be decoded properly by
79 # servers.
79 # servers.
80 assert all(escapearg(k) == k for k in argsdict)
80 assert all(escapearg(k) == k for k in argsdict)
81
81
82 args = b','.join(
82 args = b','.join(
83 b'%s=%s' % (escapearg(k), escapearg(v)) for k, v in argsdict.items()
83 b'%s=%s' % (escapearg(k), escapearg(v)) for k, v in argsdict.items()
84 )
84 )
85 cmds.append(b'%s %s' % (op, args))
85 cmds.append(b'%s %s' % (op, args))
86
86
87 return b';'.join(cmds)
87 return b';'.join(cmds)
88
88
89
89
90 class unsentfuture(futures.Future):
90 class unsentfuture(futures.Future):
91 """A Future variation to represent an unsent command.
91 """A Future variation to represent an unsent command.
92
92
93 Because we buffer commands and don't submit them immediately, calling
93 Because we buffer commands and don't submit them immediately, calling
94 ``result()`` on an unsent future could deadlock. Futures for buffered
94 ``result()`` on an unsent future could deadlock. Futures for buffered
95 commands are represented by this type, which wraps ``result()`` to
95 commands are represented by this type, which wraps ``result()`` to
96 call ``sendcommands()``.
96 call ``sendcommands()``.
97 """
97 """
98
98
99 def result(self, timeout=None):
99 def result(self, timeout=None):
100 if self.done():
100 if self.done():
101 return futures.Future.result(self, timeout)
101 return futures.Future.result(self, timeout)
102
102
103 self._peerexecutor.sendcommands()
103 self._peerexecutor.sendcommands()
104
104
105 # This looks like it will infinitely recurse. However,
105 # This looks like it will infinitely recurse. However,
106 # sendcommands() should modify __class__. This call serves as a check
106 # sendcommands() should modify __class__. This call serves as a check
107 # on that.
107 # on that.
108 return self.result(timeout)
108 return self.result(timeout)
109
109
110
110
111 @interfaceutil.implementer(repository.ipeercommandexecutor)
111 @interfaceutil.implementer(repository.ipeercommandexecutor)
112 class peerexecutor:
112 class peerexecutor:
113 def __init__(self, peer):
113 def __init__(self, peer):
114 self._peer = peer
114 self._peer = peer
115 self._sent = False
115 self._sent = False
116 self._closed = False
116 self._closed = False
117 self._calls = []
117 self._calls = []
118 self._futures = weakref.WeakSet()
118 self._futures = weakref.WeakSet()
119 self._responseexecutor = None
119 self._responseexecutor = None
120 self._responsef = None
120 self._responsef = None
121
121
122 def __enter__(self):
122 def __enter__(self):
123 return self
123 return self
124
124
125 def __exit__(self, exctype, excvalee, exctb):
125 def __exit__(self, exctype, excvalee, exctb):
126 self.close()
126 self.close()
127
127
128 def callcommand(self, command, args):
128 def callcommand(self, command, args):
129 if self._sent:
129 if self._sent:
130 raise error.ProgrammingError(
130 raise error.ProgrammingError(
131 b'callcommand() cannot be used after commands are sent'
131 b'callcommand() cannot be used after commands are sent'
132 )
132 )
133
133
134 if self._closed:
134 if self._closed:
135 raise error.ProgrammingError(
135 raise error.ProgrammingError(
136 b'callcommand() cannot be used after close()'
136 b'callcommand() cannot be used after close()'
137 )
137 )
138
138
139 # Commands are dispatched through methods on the peer.
139 # Commands are dispatched through methods on the peer.
140 fn = getattr(self._peer, pycompat.sysstr(command), None)
140 fn = getattr(self._peer, pycompat.sysstr(command), None)
141
141
142 if not fn:
142 if not fn:
143 raise error.ProgrammingError(
143 raise error.ProgrammingError(
144 b'cannot call command %s: method of same name not available '
144 b'cannot call command %s: method of same name not available '
145 b'on peer' % command
145 b'on peer' % command
146 )
146 )
147
147
148 # Commands are either batchable or they aren't. If a command
148 # Commands are either batchable or they aren't. If a command
149 # isn't batchable, we send it immediately because the executor
149 # isn't batchable, we send it immediately because the executor
150 # can no longer accept new commands after a non-batchable command.
150 # can no longer accept new commands after a non-batchable command.
151 # If a command is batchable, we queue it for later. But we have
151 # If a command is batchable, we queue it for later. But we have
152 # to account for the case of a non-batchable command arriving after
152 # to account for the case of a non-batchable command arriving after
153 # a batchable one and refuse to service it.
153 # a batchable one and refuse to service it.
154
154
155 def addcall():
155 def addcall():
156 f = futures.Future()
156 f = futures.Future()
157 self._futures.add(f)
157 self._futures.add(f)
158 self._calls.append((command, args, fn, f))
158 self._calls.append((command, args, fn, f))
159 return f
159 return f
160
160
161 if getattr(fn, 'batchable', False):
161 if getattr(fn, 'batchable', False):
162 f = addcall()
162 f = addcall()
163
163
164 # But since we don't issue it immediately, we wrap its result()
164 # But since we don't issue it immediately, we wrap its result()
165 # to trigger sending so we avoid deadlocks.
165 # to trigger sending so we avoid deadlocks.
166 f.__class__ = unsentfuture
166 f.__class__ = unsentfuture
167 f._peerexecutor = self
167 f._peerexecutor = self
168 else:
168 else:
169 if self._calls:
169 if self._calls:
170 raise error.ProgrammingError(
170 raise error.ProgrammingError(
171 b'%s is not batchable and cannot be called on a command '
171 b'%s is not batchable and cannot be called on a command '
172 b'executor along with other commands' % command
172 b'executor along with other commands' % command
173 )
173 )
174
174
175 f = addcall()
175 f = addcall()
176
176
177 # Non-batchable commands can never coexist with another command
177 # Non-batchable commands can never coexist with another command
178 # in this executor. So send the command immediately.
178 # in this executor. So send the command immediately.
179 self.sendcommands()
179 self.sendcommands()
180
180
181 return f
181 return f
182
182
183 def sendcommands(self):
183 def sendcommands(self):
184 if self._sent:
184 if self._sent:
185 return
185 return
186
186
187 if not self._calls:
187 if not self._calls:
188 return
188 return
189
189
190 self._sent = True
190 self._sent = True
191
191
192 # Unhack any future types so caller seens a clean type and to break
192 # Unhack any future types so caller seens a clean type and to break
193 # cycle between us and futures.
193 # cycle between us and futures.
194 for f in self._futures:
194 for f in self._futures:
195 if isinstance(f, unsentfuture):
195 if isinstance(f, unsentfuture):
196 f.__class__ = futures.Future
196 f.__class__ = futures.Future
197 f._peerexecutor = None
197 f._peerexecutor = None
198
198
199 calls = self._calls
199 calls = self._calls
200 # Mainly to destroy references to futures.
200 # Mainly to destroy references to futures.
201 self._calls = None
201 self._calls = None
202
202
203 # Simple case of a single command. We call it synchronously.
203 # Simple case of a single command. We call it synchronously.
204 if len(calls) == 1:
204 if len(calls) == 1:
205 command, args, fn, f = calls[0]
205 command, args, fn, f = calls[0]
206
206
207 # Future was cancelled. Ignore it.
207 # Future was cancelled. Ignore it.
208 if not f.set_running_or_notify_cancel():
208 if not f.set_running_or_notify_cancel():
209 return
209 return
210
210
211 try:
211 try:
212 result = fn(**pycompat.strkwargs(args))
212 result = fn(**pycompat.strkwargs(args))
213 except Exception:
213 except Exception:
214 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
214 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
215 else:
215 else:
216 f.set_result(result)
216 f.set_result(result)
217
217
218 return
218 return
219
219
220 # Batch commands are a bit harder. First, we have to deal with the
220 # Batch commands are a bit harder. First, we have to deal with the
221 # @batchable coroutine. That's a bit annoying. Furthermore, we also
221 # @batchable coroutine. That's a bit annoying. Furthermore, we also
222 # need to preserve streaming. i.e. it should be possible for the
222 # need to preserve streaming. i.e. it should be possible for the
223 # futures to resolve as data is coming in off the wire without having
223 # futures to resolve as data is coming in off the wire without having
224 # to wait for the final byte of the final response. We do this by
224 # to wait for the final byte of the final response. We do this by
225 # spinning up a thread to read the responses.
225 # spinning up a thread to read the responses.
226
226
227 requests = []
227 requests = []
228 states = []
228 states = []
229
229
230 for command, args, fn, f in calls:
230 for command, args, fn, f in calls:
231 # Future was cancelled. Ignore it.
231 # Future was cancelled. Ignore it.
232 if not f.set_running_or_notify_cancel():
232 if not f.set_running_or_notify_cancel():
233 continue
233 continue
234
234
235 try:
235 try:
236 encoded_args_or_res, decode = fn.batchable(
236 encoded_args_or_res, decode = fn.batchable(
237 fn.__self__, **pycompat.strkwargs(args)
237 fn.__self__, **pycompat.strkwargs(args)
238 )
238 )
239 except Exception:
239 except Exception:
240 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
240 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
241 return
241 return
242
242
243 if not decode:
243 if not decode:
244 f.set_result(encoded_args_or_res)
244 f.set_result(encoded_args_or_res)
245 else:
245 else:
246 requests.append((command, encoded_args_or_res))
246 requests.append((command, encoded_args_or_res))
247 states.append((command, f, batchable, decode))
247 states.append((command, f, batchable, decode))
248
248
249 if not requests:
249 if not requests:
250 return
250 return
251
251
252 # This will emit responses in order they were executed.
252 # This will emit responses in order they were executed.
253 wireresults = self._peer._submitbatch(requests)
253 wireresults = self._peer._submitbatch(requests)
254
254
255 # The use of a thread pool executor here is a bit weird for something
255 # The use of a thread pool executor here is a bit weird for something
256 # that only spins up a single thread. However, thread management is
256 # that only spins up a single thread. However, thread management is
257 # hard and it is easy to encounter race conditions, deadlocks, etc.
257 # hard and it is easy to encounter race conditions, deadlocks, etc.
258 # concurrent.futures already solves these problems and its thread pool
258 # concurrent.futures already solves these problems and its thread pool
259 # executor has minimal overhead. So we use it.
259 # executor has minimal overhead. So we use it.
260 self._responseexecutor = futures.ThreadPoolExecutor(1)
260 self._responseexecutor = futures.ThreadPoolExecutor(1)
261 self._responsef = self._responseexecutor.submit(
261 self._responsef = self._responseexecutor.submit(
262 self._readbatchresponse, states, wireresults
262 self._readbatchresponse, states, wireresults
263 )
263 )
264
264
265 def close(self):
265 def close(self):
266 self.sendcommands()
266 self.sendcommands()
267
267
268 if self._closed:
268 if self._closed:
269 return
269 return
270
270
271 self._closed = True
271 self._closed = True
272
272
273 if not self._responsef:
273 if not self._responsef:
274 return
274 return
275
275
276 # We need to wait on our in-flight response and then shut down the
276 # We need to wait on our in-flight response and then shut down the
277 # executor once we have a result.
277 # executor once we have a result.
278 try:
278 try:
279 self._responsef.result()
279 self._responsef.result()
280 finally:
280 finally:
281 self._responseexecutor.shutdown(wait=True)
281 self._responseexecutor.shutdown(wait=True)
282 self._responsef = None
282 self._responsef = None
283 self._responseexecutor = None
283 self._responseexecutor = None
284
284
285 # If any of our futures are still in progress, mark them as
285 # If any of our futures are still in progress, mark them as
286 # errored. Otherwise a result() could wait indefinitely.
286 # errored. Otherwise a result() could wait indefinitely.
287 for f in self._futures:
287 for f in self._futures:
288 if not f.done():
288 if not f.done():
289 f.set_exception(
289 f.set_exception(
290 error.ResponseError(
290 error.ResponseError(
291 _(b'unfulfilled batch command response'), None
291 _(b'unfulfilled batch command response'), None
292 )
292 )
293 )
293 )
294
294
295 self._futures = None
295 self._futures = None
296
296
297 def _readbatchresponse(self, states, wireresults):
297 def _readbatchresponse(self, states, wireresults):
298 # Executes in a thread to read data off the wire.
298 # Executes in a thread to read data off the wire.
299
299
300 for command, f, batchable, decode in states:
300 for command, f, batchable, decode in states:
301 # Grab raw result off the wire and teach the internal future
301 # Grab raw result off the wire and teach the internal future
302 # about it.
302 # about it.
303 try:
303 try:
304 remoteresult = next(wireresults)
304 remoteresult = next(wireresults)
305 except StopIteration:
305 except StopIteration:
306 # This can happen in particular because next(batchable)
306 # This can happen in particular because next(batchable)
307 # in the previous iteration can call peer._abort, which
307 # in the previous iteration can call peer._abort, which
308 # may close the peer.
308 # may close the peer.
309 f.set_exception(
309 f.set_exception(
310 error.ResponseError(
310 error.ResponseError(
311 _(b'unfulfilled batch command response'), None
311 _(b'unfulfilled batch command response'), None
312 )
312 )
313 )
313 )
314 else:
314 else:
315 try:
315 try:
316 result = decode(remoteresult)
316 result = decode(remoteresult)
317 except Exception:
317 except Exception:
318 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
318 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
319 else:
319 else:
320 f.set_result(result)
320 f.set_result(result)
321
321
322
322
323 @interfaceutil.implementer(
323 @interfaceutil.implementer(
324 repository.ipeercommands, repository.ipeerlegacycommands
324 repository.ipeercommands, repository.ipeerlegacycommands
325 )
325 )
326 class wirepeer(repository.peer):
326 class wirepeer(repository.peer):
327 """Client-side interface for communicating with a peer repository.
327 """Client-side interface for communicating with a peer repository.
328
328
329 Methods commonly call wire protocol commands of the same name.
329 Methods commonly call wire protocol commands of the same name.
330
330
331 See also httppeer.py and sshpeer.py for protocol-specific
331 See also httppeer.py and sshpeer.py for protocol-specific
332 implementations of this interface.
332 implementations of this interface.
333 """
333 """
334
334
335 def commandexecutor(self):
335 def commandexecutor(self):
336 return peerexecutor(self)
336 return peerexecutor(self)
337
337
338 # Begin of ipeercommands interface.
338 # Begin of ipeercommands interface.
339
339
340 def clonebundles(self):
340 def clonebundles(self):
341 self.requirecap(b'clonebundles', _(b'clone bundles'))
341 self.requirecap(b'clonebundles', _(b'clone bundles'))
342 return self._call(b'clonebundles')
342 return self._call(b'clonebundles')
343
343
344 @batchable
344 @batchable
345 def lookup(self, key):
345 def lookup(self, key):
346 self.requirecap(b'lookup', _(b'look up remote revision'))
346 self.requirecap(b'lookup', _(b'look up remote revision'))
347
347
348 def decode(d):
348 def decode(d):
349 success, data = d[:-1].split(b" ", 1)
349 success, data = d[:-1].split(b" ", 1)
350 if int(success):
350 if int(success):
351 return bin(data)
351 return bin(data)
352 else:
352 else:
353 self._abort(error.RepoError(data))
353 self._abort(error.RepoError(data))
354
354
355 return {b'key': encoding.fromlocal(key)}, decode
355 return {b'key': encoding.fromlocal(key)}, decode
356
356
357 @batchable
357 @batchable
358 def heads(self):
358 def heads(self):
359 def decode(d):
359 def decode(d):
360 try:
360 try:
361 return wireprototypes.decodelist(d[:-1])
361 return wireprototypes.decodelist(d[:-1])
362 except ValueError:
362 except ValueError:
363 self._abort(error.ResponseError(_(b"unexpected response:"), d))
363 self._abort(error.ResponseError(_(b"unexpected response:"), d))
364
364
365 return {}, decode
365 return {}, decode
366
366
367 @batchable
367 @batchable
368 def known(self, nodes):
368 def known(self, nodes):
369 def decode(d):
369 def decode(d):
370 try:
370 try:
371 return [bool(int(b)) for b in pycompat.iterbytestr(d)]
371 return [bool(int(b)) for b in pycompat.iterbytestr(d)]
372 except ValueError:
372 except ValueError:
373 self._abort(error.ResponseError(_(b"unexpected response:"), d))
373 self._abort(error.ResponseError(_(b"unexpected response:"), d))
374
374
375 return {b'nodes': wireprototypes.encodelist(nodes)}, decode
375 return {b'nodes': wireprototypes.encodelist(nodes)}, decode
376
376
377 @batchable
377 @batchable
378 def branchmap(self):
378 def branchmap(self):
379 def decode(d):
379 def decode(d):
380 try:
380 try:
381 branchmap = {}
381 branchmap = {}
382 for branchpart in d.splitlines():
382 for branchpart in d.splitlines():
383 branchname, branchheads = branchpart.split(b' ', 1)
383 branchname, branchheads = branchpart.split(b' ', 1)
384 branchname = encoding.tolocal(urlreq.unquote(branchname))
384 branchname = encoding.tolocal(urlreq.unquote(branchname))
385 branchheads = wireprototypes.decodelist(branchheads)
385 branchheads = wireprototypes.decodelist(branchheads)
386 branchmap[branchname] = branchheads
386 branchmap[branchname] = branchheads
387 return branchmap
387 return branchmap
388 except TypeError:
388 except TypeError:
389 self._abort(error.ResponseError(_(b"unexpected response:"), d))
389 self._abort(error.ResponseError(_(b"unexpected response:"), d))
390
390
391 return {}, decode
391 return {}, decode
392
392
393 @batchable
393 @batchable
394 def listkeys(self, namespace):
394 def listkeys(self, namespace):
395 if not self.capable(b'pushkey'):
395 if not self.capable(b'pushkey'):
396 return {}, None
396 return {}, None
397 self.ui.debug(b'preparing listkeys for "%s"\n' % namespace)
397 self.ui.debug(b'preparing listkeys for "%s"\n' % namespace)
398
398
399 def decode(d):
399 def decode(d):
400 self.ui.debug(
400 self.ui.debug(
401 b'received listkey for "%s": %i bytes\n' % (namespace, len(d))
401 b'received listkey for "%s": %i bytes\n' % (namespace, len(d))
402 )
402 )
403 return pushkeymod.decodekeys(d)
403 return pushkeymod.decodekeys(d)
404
404
405 return {b'namespace': encoding.fromlocal(namespace)}, decode
405 return {b'namespace': encoding.fromlocal(namespace)}, decode
406
406
407 @batchable
407 @batchable
408 def pushkey(self, namespace, key, old, new):
408 def pushkey(self, namespace, key, old, new):
409 if not self.capable(b'pushkey'):
409 if not self.capable(b'pushkey'):
410 return False, None
410 return False, None
411 self.ui.debug(b'preparing pushkey for "%s:%s"\n' % (namespace, key))
411 self.ui.debug(b'preparing pushkey for "%s:%s"\n' % (namespace, key))
412
412
413 def decode(d):
413 def decode(d):
414 d, output = d.split(b'\n', 1)
414 d, output = d.split(b'\n', 1)
415 try:
415 try:
416 d = bool(int(d))
416 d = bool(int(d))
417 except ValueError:
417 except ValueError:
418 raise error.ResponseError(
418 raise error.ResponseError(
419 _(b'push failed (unexpected response):'), d
419 _(b'push failed (unexpected response):'), d
420 )
420 )
421 for l in output.splitlines(True):
421 for l in output.splitlines(True):
422 self.ui.status(_(b'remote: '), l)
422 self.ui.status(_(b'remote: '), l)
423 return d
423 return d
424
424
425 return {
425 return {
426 b'namespace': encoding.fromlocal(namespace),
426 b'namespace': encoding.fromlocal(namespace),
427 b'key': encoding.fromlocal(key),
427 b'key': encoding.fromlocal(key),
428 b'old': encoding.fromlocal(old),
428 b'old': encoding.fromlocal(old),
429 b'new': encoding.fromlocal(new),
429 b'new': encoding.fromlocal(new),
430 }, decode
430 }, decode
431
431
432 def stream_out(self):
432 def stream_out(self):
433 return self._callstream(b'stream_out')
433 return self._callstream(b'stream_out')
434
434
435 def getbundle(self, source, **kwargs):
435 def getbundle(self, source, **kwargs):
436 kwargs = pycompat.byteskwargs(kwargs)
436 kwargs = pycompat.byteskwargs(kwargs)
437 self.requirecap(b'getbundle', _(b'look up remote changes'))
437 self.requirecap(b'getbundle', _(b'look up remote changes'))
438 opts = {}
438 opts = {}
439 bundlecaps = kwargs.get(b'bundlecaps') or set()
439 bundlecaps = kwargs.get(b'bundlecaps') or set()
440 for key, value in kwargs.items():
440 for key, value in kwargs.items():
441 if value is None:
441 if value is None:
442 continue
442 continue
443 keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
443 keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
444 if keytype is None:
444 if keytype is None:
445 raise error.ProgrammingError(
445 raise error.ProgrammingError(
446 b'Unexpectedly None keytype for key %s' % key
446 b'Unexpectedly None keytype for key %s' % key
447 )
447 )
448 elif keytype == b'nodes':
448 elif keytype == b'nodes':
449 value = wireprototypes.encodelist(value)
449 value = wireprototypes.encodelist(value)
450 elif keytype == b'csv':
450 elif keytype == b'csv':
451 value = b','.join(value)
451 value = b','.join(value)
452 elif keytype == b'scsv':
452 elif keytype == b'scsv':
453 value = b','.join(sorted(value))
453 value = b','.join(sorted(value))
454 elif keytype == b'boolean':
454 elif keytype == b'boolean':
455 value = b'%i' % bool(value)
455 value = b'%i' % bool(value)
456 elif keytype != b'plain':
456 elif keytype != b'plain':
457 raise KeyError(b'unknown getbundle option type %s' % keytype)
457 raise KeyError(b'unknown getbundle option type %s' % keytype)
458 opts[key] = value
458 opts[key] = value
459 f = self._callcompressable(b"getbundle", **pycompat.strkwargs(opts))
459 f = self._callcompressable(b"getbundle", **pycompat.strkwargs(opts))
460 if any((cap.startswith(b'HG2') for cap in bundlecaps)):
460 if any((cap.startswith(b'HG2') for cap in bundlecaps)):
461 return bundle2.getunbundler(self.ui, f)
461 return bundle2.getunbundler(self.ui, f)
462 else:
462 else:
463 return changegroupmod.cg1unpacker(f, b'UN')
463 return changegroupmod.cg1unpacker(f, b'UN')
464
464
465 def unbundle(self, bundle, heads, url):
465 def unbundle(self, bundle, heads, url):
466 """Send cg (a readable file-like object representing the
466 """Send cg (a readable file-like object representing the
467 changegroup to push, typically a chunkbuffer object) to the
467 changegroup to push, typically a chunkbuffer object) to the
468 remote server as a bundle.
468 remote server as a bundle.
469
469
470 When pushing a bundle10 stream, return an integer indicating the
470 When pushing a bundle10 stream, return an integer indicating the
471 result of the push (see changegroup.apply()).
471 result of the push (see changegroup.apply()).
472
472
473 When pushing a bundle20 stream, return a bundle20 stream.
473 When pushing a bundle20 stream, return a bundle20 stream.
474
474
475 `url` is the url the client thinks it's pushing to, which is
475 `url` is the url the client thinks it's pushing to, which is
476 visible to hooks.
476 visible to hooks.
477 """
477 """
478
478
479 if heads != [b'force'] and self.capable(b'unbundlehash'):
479 if heads != [b'force'] and self.capable(b'unbundlehash'):
480 heads = wireprototypes.encodelist(
480 heads = wireprototypes.encodelist(
481 [b'hashed', hashutil.sha1(b''.join(sorted(heads))).digest()]
481 [b'hashed', hashutil.sha1(b''.join(sorted(heads))).digest()]
482 )
482 )
483 else:
483 else:
484 heads = wireprototypes.encodelist(heads)
484 heads = wireprototypes.encodelist(heads)
485
485
486 if util.safehasattr(bundle, b'deltaheader'):
486 if util.safehasattr(bundle, 'deltaheader'):
487 # this a bundle10, do the old style call sequence
487 # this a bundle10, do the old style call sequence
488 ret, output = self._callpush(b"unbundle", bundle, heads=heads)
488 ret, output = self._callpush(b"unbundle", bundle, heads=heads)
489 if ret == b"":
489 if ret == b"":
490 raise error.ResponseError(_(b'push failed:'), output)
490 raise error.ResponseError(_(b'push failed:'), output)
491 try:
491 try:
492 ret = int(ret)
492 ret = int(ret)
493 except ValueError:
493 except ValueError:
494 raise error.ResponseError(
494 raise error.ResponseError(
495 _(b'push failed (unexpected response):'), ret
495 _(b'push failed (unexpected response):'), ret
496 )
496 )
497
497
498 for l in output.splitlines(True):
498 for l in output.splitlines(True):
499 self.ui.status(_(b'remote: '), l)
499 self.ui.status(_(b'remote: '), l)
500 else:
500 else:
501 # bundle2 push. Send a stream, fetch a stream.
501 # bundle2 push. Send a stream, fetch a stream.
502 stream = self._calltwowaystream(b'unbundle', bundle, heads=heads)
502 stream = self._calltwowaystream(b'unbundle', bundle, heads=heads)
503 ret = bundle2.getunbundler(self.ui, stream)
503 ret = bundle2.getunbundler(self.ui, stream)
504 return ret
504 return ret
505
505
506 # End of ipeercommands interface.
506 # End of ipeercommands interface.
507
507
508 # Begin of ipeerlegacycommands interface.
508 # Begin of ipeerlegacycommands interface.
509
509
510 def branches(self, nodes):
510 def branches(self, nodes):
511 n = wireprototypes.encodelist(nodes)
511 n = wireprototypes.encodelist(nodes)
512 d = self._call(b"branches", nodes=n)
512 d = self._call(b"branches", nodes=n)
513 try:
513 try:
514 br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
514 br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
515 return br
515 return br
516 except ValueError:
516 except ValueError:
517 self._abort(error.ResponseError(_(b"unexpected response:"), d))
517 self._abort(error.ResponseError(_(b"unexpected response:"), d))
518
518
519 def between(self, pairs):
519 def between(self, pairs):
520 batch = 8 # avoid giant requests
520 batch = 8 # avoid giant requests
521 r = []
521 r = []
522 for i in range(0, len(pairs), batch):
522 for i in range(0, len(pairs), batch):
523 n = b" ".join(
523 n = b" ".join(
524 [
524 [
525 wireprototypes.encodelist(p, b'-')
525 wireprototypes.encodelist(p, b'-')
526 for p in pairs[i : i + batch]
526 for p in pairs[i : i + batch]
527 ]
527 ]
528 )
528 )
529 d = self._call(b"between", pairs=n)
529 d = self._call(b"between", pairs=n)
530 try:
530 try:
531 r.extend(
531 r.extend(
532 l and wireprototypes.decodelist(l) or []
532 l and wireprototypes.decodelist(l) or []
533 for l in d.splitlines()
533 for l in d.splitlines()
534 )
534 )
535 except ValueError:
535 except ValueError:
536 self._abort(error.ResponseError(_(b"unexpected response:"), d))
536 self._abort(error.ResponseError(_(b"unexpected response:"), d))
537 return r
537 return r
538
538
539 def changegroup(self, nodes, source):
539 def changegroup(self, nodes, source):
540 n = wireprototypes.encodelist(nodes)
540 n = wireprototypes.encodelist(nodes)
541 f = self._callcompressable(b"changegroup", roots=n)
541 f = self._callcompressable(b"changegroup", roots=n)
542 return changegroupmod.cg1unpacker(f, b'UN')
542 return changegroupmod.cg1unpacker(f, b'UN')
543
543
544 def changegroupsubset(self, bases, heads, source):
544 def changegroupsubset(self, bases, heads, source):
545 self.requirecap(b'changegroupsubset', _(b'look up remote changes'))
545 self.requirecap(b'changegroupsubset', _(b'look up remote changes'))
546 bases = wireprototypes.encodelist(bases)
546 bases = wireprototypes.encodelist(bases)
547 heads = wireprototypes.encodelist(heads)
547 heads = wireprototypes.encodelist(heads)
548 f = self._callcompressable(
548 f = self._callcompressable(
549 b"changegroupsubset", bases=bases, heads=heads
549 b"changegroupsubset", bases=bases, heads=heads
550 )
550 )
551 return changegroupmod.cg1unpacker(f, b'UN')
551 return changegroupmod.cg1unpacker(f, b'UN')
552
552
553 # End of ipeerlegacycommands interface.
553 # End of ipeerlegacycommands interface.
554
554
555 def _submitbatch(self, req):
555 def _submitbatch(self, req):
556 """run batch request <req> on the server
556 """run batch request <req> on the server
557
557
558 Returns an iterator of the raw responses from the server.
558 Returns an iterator of the raw responses from the server.
559 """
559 """
560 ui = self.ui
560 ui = self.ui
561 if ui.debugflag and ui.configbool(b'devel', b'debug.peer-request'):
561 if ui.debugflag and ui.configbool(b'devel', b'debug.peer-request'):
562 ui.debug(b'devel-peer-request: batched-content\n')
562 ui.debug(b'devel-peer-request: batched-content\n')
563 for op, args in req:
563 for op, args in req:
564 msg = b'devel-peer-request: - %s (%d arguments)\n'
564 msg = b'devel-peer-request: - %s (%d arguments)\n'
565 ui.debug(msg % (op, len(args)))
565 ui.debug(msg % (op, len(args)))
566
566
567 unescapearg = wireprototypes.unescapebatcharg
567 unescapearg = wireprototypes.unescapebatcharg
568
568
569 rsp = self._callstream(b"batch", cmds=encodebatchcmds(req))
569 rsp = self._callstream(b"batch", cmds=encodebatchcmds(req))
570 chunk = rsp.read(1024)
570 chunk = rsp.read(1024)
571 work = [chunk]
571 work = [chunk]
572 while chunk:
572 while chunk:
573 while b';' not in chunk and chunk:
573 while b';' not in chunk and chunk:
574 chunk = rsp.read(1024)
574 chunk = rsp.read(1024)
575 work.append(chunk)
575 work.append(chunk)
576 merged = b''.join(work)
576 merged = b''.join(work)
577 while b';' in merged:
577 while b';' in merged:
578 one, merged = merged.split(b';', 1)
578 one, merged = merged.split(b';', 1)
579 yield unescapearg(one)
579 yield unescapearg(one)
580 chunk = rsp.read(1024)
580 chunk = rsp.read(1024)
581 work = [merged, chunk]
581 work = [merged, chunk]
582 yield unescapearg(b''.join(work))
582 yield unescapearg(b''.join(work))
583
583
584 def _submitone(self, op, args):
584 def _submitone(self, op, args):
585 return self._call(op, **pycompat.strkwargs(args))
585 return self._call(op, **pycompat.strkwargs(args))
586
586
587 def debugwireargs(self, one, two, three=None, four=None, five=None):
587 def debugwireargs(self, one, two, three=None, four=None, five=None):
588 # don't pass optional arguments left at their default value
588 # don't pass optional arguments left at their default value
589 opts = {}
589 opts = {}
590 if three is not None:
590 if three is not None:
591 opts['three'] = three
591 opts['three'] = three
592 if four is not None:
592 if four is not None:
593 opts['four'] = four
593 opts['four'] = four
594 return self._call(b'debugwireargs', one=one, two=two, **opts)
594 return self._call(b'debugwireargs', one=one, two=two, **opts)
595
595
596 def _call(self, cmd, **args):
596 def _call(self, cmd, **args):
597 """execute <cmd> on the server
597 """execute <cmd> on the server
598
598
599 The command is expected to return a simple string.
599 The command is expected to return a simple string.
600
600
601 returns the server reply as a string."""
601 returns the server reply as a string."""
602 raise NotImplementedError()
602 raise NotImplementedError()
603
603
604 def _callstream(self, cmd, **args):
604 def _callstream(self, cmd, **args):
605 """execute <cmd> on the server
605 """execute <cmd> on the server
606
606
607 The command is expected to return a stream. Note that if the
607 The command is expected to return a stream. Note that if the
608 command doesn't return a stream, _callstream behaves
608 command doesn't return a stream, _callstream behaves
609 differently for ssh and http peers.
609 differently for ssh and http peers.
610
610
611 returns the server reply as a file like object.
611 returns the server reply as a file like object.
612 """
612 """
613 raise NotImplementedError()
613 raise NotImplementedError()
614
614
615 def _callcompressable(self, cmd, **args):
615 def _callcompressable(self, cmd, **args):
616 """execute <cmd> on the server
616 """execute <cmd> on the server
617
617
618 The command is expected to return a stream.
618 The command is expected to return a stream.
619
619
620 The stream may have been compressed in some implementations. This
620 The stream may have been compressed in some implementations. This
621 function takes care of the decompression. This is the only difference
621 function takes care of the decompression. This is the only difference
622 with _callstream.
622 with _callstream.
623
623
624 returns the server reply as a file like object.
624 returns the server reply as a file like object.
625 """
625 """
626 raise NotImplementedError()
626 raise NotImplementedError()
627
627
628 def _callpush(self, cmd, fp, **args):
628 def _callpush(self, cmd, fp, **args):
629 """execute a <cmd> on server
629 """execute a <cmd> on server
630
630
631 The command is expected to be related to a push. Push has a special
631 The command is expected to be related to a push. Push has a special
632 return method.
632 return method.
633
633
634 returns the server reply as a (ret, output) tuple. ret is either
634 returns the server reply as a (ret, output) tuple. ret is either
635 empty (error) or a stringified int.
635 empty (error) or a stringified int.
636 """
636 """
637 raise NotImplementedError()
637 raise NotImplementedError()
638
638
639 def _calltwowaystream(self, cmd, fp, **args):
639 def _calltwowaystream(self, cmd, fp, **args):
640 """execute <cmd> on server
640 """execute <cmd> on server
641
641
642 The command will send a stream to the server and get a stream in reply.
642 The command will send a stream to the server and get a stream in reply.
643 """
643 """
644 raise NotImplementedError()
644 raise NotImplementedError()
645
645
646 def _abort(self, exception):
646 def _abort(self, exception):
647 """clearly abort the wire protocol connection and raise the exception"""
647 """clearly abort the wire protocol connection and raise the exception"""
648 raise NotImplementedError()
648 raise NotImplementedError()
General Comments 0
You need to be logged in to leave comments. Login now