##// END OF EJS Templates
wireproto: in batch queries, support queries with immediate responses...
Valentin Gatien-Baron -
r41085:55e8da48 default
parent child Browse files
Show More
@@ -0,0 +1,46 b''
1 Checking how hg behaves when one side of a pull/push doesn't support
2 some capability (because it's running an older hg version, usually).
3
4 $ hg init repo1
5 $ cd repo1
6 $ echo a > a; hg add -q a; hg commit -q -m a
7 $ hg bookmark a
8 $ hg clone -q . ../repo2
9 $ cd ../repo2
10
11 $ touch $TESTTMP/disable-lookup.py
12 $ disable_cap() {
13 > rm -f $TESTTMP/disable-lookup.pyc # pyc caching is buggy
14 > cat <<EOF > $TESTTMP/disable-lookup.py
15 > from mercurial import extensions, wireprotov1server
16 > def wcapabilities(orig, *args, **kwargs):
17 > cap = orig(*args, **kwargs)
18 > cap.remove('$1')
19 > return cap
20 > extensions.wrapfunction(wireprotov1server, '_capabilities', wcapabilities)
21 > EOF
22 > }
23 $ cat >> ../repo1/.hg/hgrc <<EOF
24 > [extensions]
25 > disable-lookup = $TESTTMP/disable-lookup.py
26 > EOF
27 $ cat >> .hg/hgrc <<EOF
28 > [ui]
29 > ssh = "$PYTHON" "$TESTDIR/dummyssh"
30 > EOF
31
32 $ hg pull ssh://user@dummy/repo1 -r tip -B a
33 pulling from ssh://user@dummy/repo1
34 no changes found
35
36 $ disable_cap lookup
37 $ hg pull ssh://user@dummy/repo1 -r tip -B a
38 pulling from ssh://user@dummy/repo1
39 abort: other repository doesn't support revision lookup, so a rev cannot be specified.
40 [255]
41
42 $ disable_cap pushkey
43 $ hg pull ssh://user@dummy/repo1 -r tip -B a
44 pulling from ssh://user@dummy/repo1
45 abort: remote bookmark a not found!
46 [255]
@@ -1,621 +1,624 b''
1 # wireprotov1peer.py - Client-side functionality for wire protocol version 1.
1 # wireprotov1peer.py - Client-side functionality for wire protocol version 1.
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import sys
11 import sys
12 import weakref
12 import weakref
13
13
14 from .i18n import _
14 from .i18n import _
15 from .node import (
15 from .node import (
16 bin,
16 bin,
17 )
17 )
18 from . import (
18 from . import (
19 bundle2,
19 bundle2,
20 changegroup as changegroupmod,
20 changegroup as changegroupmod,
21 encoding,
21 encoding,
22 error,
22 error,
23 pushkey as pushkeymod,
23 pushkey as pushkeymod,
24 pycompat,
24 pycompat,
25 repository,
25 repository,
26 util,
26 util,
27 wireprototypes,
27 wireprototypes,
28 )
28 )
29 from .utils import (
29 from .utils import (
30 interfaceutil,
30 interfaceutil,
31 )
31 )
32
32
33 urlreq = util.urlreq
33 urlreq = util.urlreq
34
34
35 def batchable(f):
35 def batchable(f):
36 '''annotation for batchable methods
36 '''annotation for batchable methods
37
37
38 Such methods must implement a coroutine as follows:
38 Such methods must implement a coroutine as follows:
39
39
40 @batchable
40 @batchable
41 def sample(self, one, two=None):
41 def sample(self, one, two=None):
42 # Build list of encoded arguments suitable for your wire protocol:
42 # Build list of encoded arguments suitable for your wire protocol:
43 encargs = [('one', encode(one),), ('two', encode(two),)]
43 encargs = [('one', encode(one),), ('two', encode(two),)]
44 # Create future for injection of encoded result:
44 # Create future for injection of encoded result:
45 encresref = future()
45 encresref = future()
46 # Return encoded arguments and future:
46 # Return encoded arguments and future:
47 yield encargs, encresref
47 yield encargs, encresref
48 # Assuming the future to be filled with the result from the batched
48 # Assuming the future to be filled with the result from the batched
49 # request now. Decode it:
49 # request now. Decode it:
50 yield decode(encresref.value)
50 yield decode(encresref.value)
51
51
52 The decorator returns a function which wraps this coroutine as a plain
52 The decorator returns a function which wraps this coroutine as a plain
53 method, but adds the original method as an attribute called "batchable",
53 method, but adds the original method as an attribute called "batchable",
54 which is used by remotebatch to split the call into separate encoding and
54 which is used by remotebatch to split the call into separate encoding and
55 decoding phases.
55 decoding phases.
56 '''
56 '''
57 def plain(*args, **opts):
57 def plain(*args, **opts):
58 batchable = f(*args, **opts)
58 batchable = f(*args, **opts)
59 encargsorres, encresref = next(batchable)
59 encargsorres, encresref = next(batchable)
60 if not encresref:
60 if not encresref:
61 return encargsorres # a local result in this case
61 return encargsorres # a local result in this case
62 self = args[0]
62 self = args[0]
63 cmd = pycompat.bytesurl(f.__name__) # ensure cmd is ascii bytestr
63 cmd = pycompat.bytesurl(f.__name__) # ensure cmd is ascii bytestr
64 encresref.set(self._submitone(cmd, encargsorres))
64 encresref.set(self._submitone(cmd, encargsorres))
65 return next(batchable)
65 return next(batchable)
66 setattr(plain, 'batchable', f)
66 setattr(plain, 'batchable', f)
67 setattr(plain, '__name__', f.__name__)
67 setattr(plain, '__name__', f.__name__)
68 return plain
68 return plain
69
69
70 class future(object):
70 class future(object):
71 '''placeholder for a value to be set later'''
71 '''placeholder for a value to be set later'''
72 def set(self, value):
72 def set(self, value):
73 if util.safehasattr(self, 'value'):
73 if util.safehasattr(self, 'value'):
74 raise error.RepoError("future is already set")
74 raise error.RepoError("future is already set")
75 self.value = value
75 self.value = value
76
76
77 def encodebatchcmds(req):
77 def encodebatchcmds(req):
78 """Return a ``cmds`` argument value for the ``batch`` command."""
78 """Return a ``cmds`` argument value for the ``batch`` command."""
79 escapearg = wireprototypes.escapebatcharg
79 escapearg = wireprototypes.escapebatcharg
80
80
81 cmds = []
81 cmds = []
82 for op, argsdict in req:
82 for op, argsdict in req:
83 # Old servers didn't properly unescape argument names. So prevent
83 # Old servers didn't properly unescape argument names. So prevent
84 # the sending of argument names that may not be decoded properly by
84 # the sending of argument names that may not be decoded properly by
85 # servers.
85 # servers.
86 assert all(escapearg(k) == k for k in argsdict)
86 assert all(escapearg(k) == k for k in argsdict)
87
87
88 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
88 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
89 for k, v in argsdict.iteritems())
89 for k, v in argsdict.iteritems())
90 cmds.append('%s %s' % (op, args))
90 cmds.append('%s %s' % (op, args))
91
91
92 return ';'.join(cmds)
92 return ';'.join(cmds)
93
93
94 class unsentfuture(pycompat.futures.Future):
94 class unsentfuture(pycompat.futures.Future):
95 """A Future variation to represent an unsent command.
95 """A Future variation to represent an unsent command.
96
96
97 Because we buffer commands and don't submit them immediately, calling
97 Because we buffer commands and don't submit them immediately, calling
98 ``result()`` on an unsent future could deadlock. Futures for buffered
98 ``result()`` on an unsent future could deadlock. Futures for buffered
99 commands are represented by this type, which wraps ``result()`` to
99 commands are represented by this type, which wraps ``result()`` to
100 call ``sendcommands()``.
100 call ``sendcommands()``.
101 """
101 """
102
102
103 def result(self, timeout=None):
103 def result(self, timeout=None):
104 if self.done():
104 if self.done():
105 return pycompat.futures.Future.result(self, timeout)
105 return pycompat.futures.Future.result(self, timeout)
106
106
107 self._peerexecutor.sendcommands()
107 self._peerexecutor.sendcommands()
108
108
109 # This looks like it will infinitely recurse. However,
109 # This looks like it will infinitely recurse. However,
110 # sendcommands() should modify __class__. This call serves as a check
110 # sendcommands() should modify __class__. This call serves as a check
111 # on that.
111 # on that.
112 return self.result(timeout)
112 return self.result(timeout)
113
113
114 @interfaceutil.implementer(repository.ipeercommandexecutor)
114 @interfaceutil.implementer(repository.ipeercommandexecutor)
115 class peerexecutor(object):
115 class peerexecutor(object):
116 def __init__(self, peer):
116 def __init__(self, peer):
117 self._peer = peer
117 self._peer = peer
118 self._sent = False
118 self._sent = False
119 self._closed = False
119 self._closed = False
120 self._calls = []
120 self._calls = []
121 self._futures = weakref.WeakSet()
121 self._futures = weakref.WeakSet()
122 self._responseexecutor = None
122 self._responseexecutor = None
123 self._responsef = None
123 self._responsef = None
124
124
125 def __enter__(self):
125 def __enter__(self):
126 return self
126 return self
127
127
128 def __exit__(self, exctype, excvalee, exctb):
128 def __exit__(self, exctype, excvalee, exctb):
129 self.close()
129 self.close()
130
130
131 def callcommand(self, command, args):
131 def callcommand(self, command, args):
132 if self._sent:
132 if self._sent:
133 raise error.ProgrammingError('callcommand() cannot be used '
133 raise error.ProgrammingError('callcommand() cannot be used '
134 'after commands are sent')
134 'after commands are sent')
135
135
136 if self._closed:
136 if self._closed:
137 raise error.ProgrammingError('callcommand() cannot be used '
137 raise error.ProgrammingError('callcommand() cannot be used '
138 'after close()')
138 'after close()')
139
139
140 # Commands are dispatched through methods on the peer.
140 # Commands are dispatched through methods on the peer.
141 fn = getattr(self._peer, pycompat.sysstr(command), None)
141 fn = getattr(self._peer, pycompat.sysstr(command), None)
142
142
143 if not fn:
143 if not fn:
144 raise error.ProgrammingError(
144 raise error.ProgrammingError(
145 'cannot call command %s: method of same name not available '
145 'cannot call command %s: method of same name not available '
146 'on peer' % command)
146 'on peer' % command)
147
147
148 # Commands are either batchable or they aren't. If a command
148 # Commands are either batchable or they aren't. If a command
149 # isn't batchable, we send it immediately because the executor
149 # isn't batchable, we send it immediately because the executor
150 # can no longer accept new commands after a non-batchable command.
150 # can no longer accept new commands after a non-batchable command.
151 # If a command is batchable, we queue it for later. But we have
151 # If a command is batchable, we queue it for later. But we have
152 # to account for the case of a non-batchable command arriving after
152 # to account for the case of a non-batchable command arriving after
153 # a batchable one and refuse to service it.
153 # a batchable one and refuse to service it.
154
154
155 def addcall():
155 def addcall():
156 f = pycompat.futures.Future()
156 f = pycompat.futures.Future()
157 self._futures.add(f)
157 self._futures.add(f)
158 self._calls.append((command, args, fn, f))
158 self._calls.append((command, args, fn, f))
159 return f
159 return f
160
160
161 if getattr(fn, 'batchable', False):
161 if getattr(fn, 'batchable', False):
162 f = addcall()
162 f = addcall()
163
163
164 # But since we don't issue it immediately, we wrap its result()
164 # But since we don't issue it immediately, we wrap its result()
165 # to trigger sending so we avoid deadlocks.
165 # to trigger sending so we avoid deadlocks.
166 f.__class__ = unsentfuture
166 f.__class__ = unsentfuture
167 f._peerexecutor = self
167 f._peerexecutor = self
168 else:
168 else:
169 if self._calls:
169 if self._calls:
170 raise error.ProgrammingError(
170 raise error.ProgrammingError(
171 '%s is not batchable and cannot be called on a command '
171 '%s is not batchable and cannot be called on a command '
172 'executor along with other commands' % command)
172 'executor along with other commands' % command)
173
173
174 f = addcall()
174 f = addcall()
175
175
176 # Non-batchable commands can never coexist with another command
176 # Non-batchable commands can never coexist with another command
177 # in this executor. So send the command immediately.
177 # in this executor. So send the command immediately.
178 self.sendcommands()
178 self.sendcommands()
179
179
180 return f
180 return f
181
181
182 def sendcommands(self):
182 def sendcommands(self):
183 if self._sent:
183 if self._sent:
184 return
184 return
185
185
186 if not self._calls:
186 if not self._calls:
187 return
187 return
188
188
189 self._sent = True
189 self._sent = True
190
190
191 # Unhack any future types so caller seens a clean type and to break
191 # Unhack any future types so caller seens a clean type and to break
192 # cycle between us and futures.
192 # cycle between us and futures.
193 for f in self._futures:
193 for f in self._futures:
194 if isinstance(f, unsentfuture):
194 if isinstance(f, unsentfuture):
195 f.__class__ = pycompat.futures.Future
195 f.__class__ = pycompat.futures.Future
196 f._peerexecutor = None
196 f._peerexecutor = None
197
197
198 calls = self._calls
198 calls = self._calls
199 # Mainly to destroy references to futures.
199 # Mainly to destroy references to futures.
200 self._calls = None
200 self._calls = None
201
201
202 # Simple case of a single command. We call it synchronously.
202 # Simple case of a single command. We call it synchronously.
203 if len(calls) == 1:
203 if len(calls) == 1:
204 command, args, fn, f = calls[0]
204 command, args, fn, f = calls[0]
205
205
206 # Future was cancelled. Ignore it.
206 # Future was cancelled. Ignore it.
207 if not f.set_running_or_notify_cancel():
207 if not f.set_running_or_notify_cancel():
208 return
208 return
209
209
210 try:
210 try:
211 result = fn(**pycompat.strkwargs(args))
211 result = fn(**pycompat.strkwargs(args))
212 except Exception:
212 except Exception:
213 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
213 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
214 else:
214 else:
215 f.set_result(result)
215 f.set_result(result)
216
216
217 return
217 return
218
218
219 # Batch commands are a bit harder. First, we have to deal with the
219 # Batch commands are a bit harder. First, we have to deal with the
220 # @batchable coroutine. That's a bit annoying. Furthermore, we also
220 # @batchable coroutine. That's a bit annoying. Furthermore, we also
221 # need to preserve streaming. i.e. it should be possible for the
221 # need to preserve streaming. i.e. it should be possible for the
222 # futures to resolve as data is coming in off the wire without having
222 # futures to resolve as data is coming in off the wire without having
223 # to wait for the final byte of the final response. We do this by
223 # to wait for the final byte of the final response. We do this by
224 # spinning up a thread to read the responses.
224 # spinning up a thread to read the responses.
225
225
226 requests = []
226 requests = []
227 states = []
227 states = []
228
228
229 for command, args, fn, f in calls:
229 for command, args, fn, f in calls:
230 # Future was cancelled. Ignore it.
230 # Future was cancelled. Ignore it.
231 if not f.set_running_or_notify_cancel():
231 if not f.set_running_or_notify_cancel():
232 continue
232 continue
233
233
234 try:
234 try:
235 batchable = fn.batchable(fn.__self__,
235 batchable = fn.batchable(fn.__self__,
236 **pycompat.strkwargs(args))
236 **pycompat.strkwargs(args))
237 except Exception:
237 except Exception:
238 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
238 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
239 return
239 return
240
240
241 # Encoded arguments and future holding remote result.
241 # Encoded arguments and future holding remote result.
242 try:
242 try:
243 encodedargs, fremote = next(batchable)
243 encargsorres, fremote = next(batchable)
244 except Exception:
244 except Exception:
245 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
245 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
246 return
246 return
247
247
248 requests.append((command, encodedargs))
248 if not fremote:
249 states.append((command, f, batchable, fremote))
249 f.set_result(encargsorres)
250 else:
251 requests.append((command, encargsorres))
252 states.append((command, f, batchable, fremote))
250
253
251 if not requests:
254 if not requests:
252 return
255 return
253
256
254 # This will emit responses in order they were executed.
257 # This will emit responses in order they were executed.
255 wireresults = self._peer._submitbatch(requests)
258 wireresults = self._peer._submitbatch(requests)
256
259
257 # The use of a thread pool executor here is a bit weird for something
260 # The use of a thread pool executor here is a bit weird for something
258 # that only spins up a single thread. However, thread management is
261 # that only spins up a single thread. However, thread management is
259 # hard and it is easy to encounter race conditions, deadlocks, etc.
262 # hard and it is easy to encounter race conditions, deadlocks, etc.
260 # concurrent.futures already solves these problems and its thread pool
263 # concurrent.futures already solves these problems and its thread pool
261 # executor has minimal overhead. So we use it.
264 # executor has minimal overhead. So we use it.
262 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
265 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
263 self._responsef = self._responseexecutor.submit(self._readbatchresponse,
266 self._responsef = self._responseexecutor.submit(self._readbatchresponse,
264 states, wireresults)
267 states, wireresults)
265
268
266 def close(self):
269 def close(self):
267 self.sendcommands()
270 self.sendcommands()
268
271
269 if self._closed:
272 if self._closed:
270 return
273 return
271
274
272 self._closed = True
275 self._closed = True
273
276
274 if not self._responsef:
277 if not self._responsef:
275 return
278 return
276
279
277 # We need to wait on our in-flight response and then shut down the
280 # We need to wait on our in-flight response and then shut down the
278 # executor once we have a result.
281 # executor once we have a result.
279 try:
282 try:
280 self._responsef.result()
283 self._responsef.result()
281 finally:
284 finally:
282 self._responseexecutor.shutdown(wait=True)
285 self._responseexecutor.shutdown(wait=True)
283 self._responsef = None
286 self._responsef = None
284 self._responseexecutor = None
287 self._responseexecutor = None
285
288
286 # If any of our futures are still in progress, mark them as
289 # If any of our futures are still in progress, mark them as
287 # errored. Otherwise a result() could wait indefinitely.
290 # errored. Otherwise a result() could wait indefinitely.
288 for f in self._futures:
291 for f in self._futures:
289 if not f.done():
292 if not f.done():
290 f.set_exception(error.ResponseError(
293 f.set_exception(error.ResponseError(
291 _('unfulfilled batch command response')))
294 _('unfulfilled batch command response')))
292
295
293 self._futures = None
296 self._futures = None
294
297
295 def _readbatchresponse(self, states, wireresults):
298 def _readbatchresponse(self, states, wireresults):
296 # Executes in a thread to read data off the wire.
299 # Executes in a thread to read data off the wire.
297
300
298 for command, f, batchable, fremote in states:
301 for command, f, batchable, fremote in states:
299 # Grab raw result off the wire and teach the internal future
302 # Grab raw result off the wire and teach the internal future
300 # about it.
303 # about it.
301 remoteresult = next(wireresults)
304 remoteresult = next(wireresults)
302 fremote.set(remoteresult)
305 fremote.set(remoteresult)
303
306
304 # And ask the coroutine to decode that value.
307 # And ask the coroutine to decode that value.
305 try:
308 try:
306 result = next(batchable)
309 result = next(batchable)
307 except Exception:
310 except Exception:
308 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
311 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
309 else:
312 else:
310 f.set_result(result)
313 f.set_result(result)
311
314
312 @interfaceutil.implementer(repository.ipeercommands,
315 @interfaceutil.implementer(repository.ipeercommands,
313 repository.ipeerlegacycommands)
316 repository.ipeerlegacycommands)
314 class wirepeer(repository.peer):
317 class wirepeer(repository.peer):
315 """Client-side interface for communicating with a peer repository.
318 """Client-side interface for communicating with a peer repository.
316
319
317 Methods commonly call wire protocol commands of the same name.
320 Methods commonly call wire protocol commands of the same name.
318
321
319 See also httppeer.py and sshpeer.py for protocol-specific
322 See also httppeer.py and sshpeer.py for protocol-specific
320 implementations of this interface.
323 implementations of this interface.
321 """
324 """
322 def commandexecutor(self):
325 def commandexecutor(self):
323 return peerexecutor(self)
326 return peerexecutor(self)
324
327
325 # Begin of ipeercommands interface.
328 # Begin of ipeercommands interface.
326
329
327 def clonebundles(self):
330 def clonebundles(self):
328 self.requirecap('clonebundles', _('clone bundles'))
331 self.requirecap('clonebundles', _('clone bundles'))
329 return self._call('clonebundles')
332 return self._call('clonebundles')
330
333
331 @batchable
334 @batchable
332 def lookup(self, key):
335 def lookup(self, key):
333 self.requirecap('lookup', _('look up remote revision'))
336 self.requirecap('lookup', _('look up remote revision'))
334 f = future()
337 f = future()
335 yield {'key': encoding.fromlocal(key)}, f
338 yield {'key': encoding.fromlocal(key)}, f
336 d = f.value
339 d = f.value
337 success, data = d[:-1].split(" ", 1)
340 success, data = d[:-1].split(" ", 1)
338 if int(success):
341 if int(success):
339 yield bin(data)
342 yield bin(data)
340 else:
343 else:
341 self._abort(error.RepoError(data))
344 self._abort(error.RepoError(data))
342
345
343 @batchable
346 @batchable
344 def heads(self):
347 def heads(self):
345 f = future()
348 f = future()
346 yield {}, f
349 yield {}, f
347 d = f.value
350 d = f.value
348 try:
351 try:
349 yield wireprototypes.decodelist(d[:-1])
352 yield wireprototypes.decodelist(d[:-1])
350 except ValueError:
353 except ValueError:
351 self._abort(error.ResponseError(_("unexpected response:"), d))
354 self._abort(error.ResponseError(_("unexpected response:"), d))
352
355
353 @batchable
356 @batchable
354 def known(self, nodes):
357 def known(self, nodes):
355 f = future()
358 f = future()
356 yield {'nodes': wireprototypes.encodelist(nodes)}, f
359 yield {'nodes': wireprototypes.encodelist(nodes)}, f
357 d = f.value
360 d = f.value
358 try:
361 try:
359 yield [bool(int(b)) for b in pycompat.iterbytestr(d)]
362 yield [bool(int(b)) for b in pycompat.iterbytestr(d)]
360 except ValueError:
363 except ValueError:
361 self._abort(error.ResponseError(_("unexpected response:"), d))
364 self._abort(error.ResponseError(_("unexpected response:"), d))
362
365
363 @batchable
366 @batchable
364 def branchmap(self):
367 def branchmap(self):
365 f = future()
368 f = future()
366 yield {}, f
369 yield {}, f
367 d = f.value
370 d = f.value
368 try:
371 try:
369 branchmap = {}
372 branchmap = {}
370 for branchpart in d.splitlines():
373 for branchpart in d.splitlines():
371 branchname, branchheads = branchpart.split(' ', 1)
374 branchname, branchheads = branchpart.split(' ', 1)
372 branchname = encoding.tolocal(urlreq.unquote(branchname))
375 branchname = encoding.tolocal(urlreq.unquote(branchname))
373 branchheads = wireprototypes.decodelist(branchheads)
376 branchheads = wireprototypes.decodelist(branchheads)
374 branchmap[branchname] = branchheads
377 branchmap[branchname] = branchheads
375 yield branchmap
378 yield branchmap
376 except TypeError:
379 except TypeError:
377 self._abort(error.ResponseError(_("unexpected response:"), d))
380 self._abort(error.ResponseError(_("unexpected response:"), d))
378
381
379 @batchable
382 @batchable
380 def listkeys(self, namespace):
383 def listkeys(self, namespace):
381 if not self.capable('pushkey'):
384 if not self.capable('pushkey'):
382 yield {}, None
385 yield {}, None
383 f = future()
386 f = future()
384 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
387 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
385 yield {'namespace': encoding.fromlocal(namespace)}, f
388 yield {'namespace': encoding.fromlocal(namespace)}, f
386 d = f.value
389 d = f.value
387 self.ui.debug('received listkey for "%s": %i bytes\n'
390 self.ui.debug('received listkey for "%s": %i bytes\n'
388 % (namespace, len(d)))
391 % (namespace, len(d)))
389 yield pushkeymod.decodekeys(d)
392 yield pushkeymod.decodekeys(d)
390
393
391 @batchable
394 @batchable
392 def pushkey(self, namespace, key, old, new):
395 def pushkey(self, namespace, key, old, new):
393 if not self.capable('pushkey'):
396 if not self.capable('pushkey'):
394 yield False, None
397 yield False, None
395 f = future()
398 f = future()
396 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
399 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
397 yield {'namespace': encoding.fromlocal(namespace),
400 yield {'namespace': encoding.fromlocal(namespace),
398 'key': encoding.fromlocal(key),
401 'key': encoding.fromlocal(key),
399 'old': encoding.fromlocal(old),
402 'old': encoding.fromlocal(old),
400 'new': encoding.fromlocal(new)}, f
403 'new': encoding.fromlocal(new)}, f
401 d = f.value
404 d = f.value
402 d, output = d.split('\n', 1)
405 d, output = d.split('\n', 1)
403 try:
406 try:
404 d = bool(int(d))
407 d = bool(int(d))
405 except ValueError:
408 except ValueError:
406 raise error.ResponseError(
409 raise error.ResponseError(
407 _('push failed (unexpected response):'), d)
410 _('push failed (unexpected response):'), d)
408 for l in output.splitlines(True):
411 for l in output.splitlines(True):
409 self.ui.status(_('remote: '), l)
412 self.ui.status(_('remote: '), l)
410 yield d
413 yield d
411
414
412 def stream_out(self):
415 def stream_out(self):
413 return self._callstream('stream_out')
416 return self._callstream('stream_out')
414
417
415 def getbundle(self, source, **kwargs):
418 def getbundle(self, source, **kwargs):
416 kwargs = pycompat.byteskwargs(kwargs)
419 kwargs = pycompat.byteskwargs(kwargs)
417 self.requirecap('getbundle', _('look up remote changes'))
420 self.requirecap('getbundle', _('look up remote changes'))
418 opts = {}
421 opts = {}
419 bundlecaps = kwargs.get('bundlecaps') or set()
422 bundlecaps = kwargs.get('bundlecaps') or set()
420 for key, value in kwargs.iteritems():
423 for key, value in kwargs.iteritems():
421 if value is None:
424 if value is None:
422 continue
425 continue
423 keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
426 keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
424 if keytype is None:
427 if keytype is None:
425 raise error.ProgrammingError(
428 raise error.ProgrammingError(
426 'Unexpectedly None keytype for key %s' % key)
429 'Unexpectedly None keytype for key %s' % key)
427 elif keytype == 'nodes':
430 elif keytype == 'nodes':
428 value = wireprototypes.encodelist(value)
431 value = wireprototypes.encodelist(value)
429 elif keytype == 'csv':
432 elif keytype == 'csv':
430 value = ','.join(value)
433 value = ','.join(value)
431 elif keytype == 'scsv':
434 elif keytype == 'scsv':
432 value = ','.join(sorted(value))
435 value = ','.join(sorted(value))
433 elif keytype == 'boolean':
436 elif keytype == 'boolean':
434 value = '%i' % bool(value)
437 value = '%i' % bool(value)
435 elif keytype != 'plain':
438 elif keytype != 'plain':
436 raise KeyError('unknown getbundle option type %s'
439 raise KeyError('unknown getbundle option type %s'
437 % keytype)
440 % keytype)
438 opts[key] = value
441 opts[key] = value
439 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
442 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
440 if any((cap.startswith('HG2') for cap in bundlecaps)):
443 if any((cap.startswith('HG2') for cap in bundlecaps)):
441 return bundle2.getunbundler(self.ui, f)
444 return bundle2.getunbundler(self.ui, f)
442 else:
445 else:
443 return changegroupmod.cg1unpacker(f, 'UN')
446 return changegroupmod.cg1unpacker(f, 'UN')
444
447
445 def unbundle(self, bundle, heads, url):
448 def unbundle(self, bundle, heads, url):
446 '''Send cg (a readable file-like object representing the
449 '''Send cg (a readable file-like object representing the
447 changegroup to push, typically a chunkbuffer object) to the
450 changegroup to push, typically a chunkbuffer object) to the
448 remote server as a bundle.
451 remote server as a bundle.
449
452
450 When pushing a bundle10 stream, return an integer indicating the
453 When pushing a bundle10 stream, return an integer indicating the
451 result of the push (see changegroup.apply()).
454 result of the push (see changegroup.apply()).
452
455
453 When pushing a bundle20 stream, return a bundle20 stream.
456 When pushing a bundle20 stream, return a bundle20 stream.
454
457
455 `url` is the url the client thinks it's pushing to, which is
458 `url` is the url the client thinks it's pushing to, which is
456 visible to hooks.
459 visible to hooks.
457 '''
460 '''
458
461
459 if heads != ['force'] and self.capable('unbundlehash'):
462 if heads != ['force'] and self.capable('unbundlehash'):
460 heads = wireprototypes.encodelist(
463 heads = wireprototypes.encodelist(
461 ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()])
464 ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()])
462 else:
465 else:
463 heads = wireprototypes.encodelist(heads)
466 heads = wireprototypes.encodelist(heads)
464
467
465 if util.safehasattr(bundle, 'deltaheader'):
468 if util.safehasattr(bundle, 'deltaheader'):
466 # this a bundle10, do the old style call sequence
469 # this a bundle10, do the old style call sequence
467 ret, output = self._callpush("unbundle", bundle, heads=heads)
470 ret, output = self._callpush("unbundle", bundle, heads=heads)
468 if ret == "":
471 if ret == "":
469 raise error.ResponseError(
472 raise error.ResponseError(
470 _('push failed:'), output)
473 _('push failed:'), output)
471 try:
474 try:
472 ret = int(ret)
475 ret = int(ret)
473 except ValueError:
476 except ValueError:
474 raise error.ResponseError(
477 raise error.ResponseError(
475 _('push failed (unexpected response):'), ret)
478 _('push failed (unexpected response):'), ret)
476
479
477 for l in output.splitlines(True):
480 for l in output.splitlines(True):
478 self.ui.status(_('remote: '), l)
481 self.ui.status(_('remote: '), l)
479 else:
482 else:
480 # bundle2 push. Send a stream, fetch a stream.
483 # bundle2 push. Send a stream, fetch a stream.
481 stream = self._calltwowaystream('unbundle', bundle, heads=heads)
484 stream = self._calltwowaystream('unbundle', bundle, heads=heads)
482 ret = bundle2.getunbundler(self.ui, stream)
485 ret = bundle2.getunbundler(self.ui, stream)
483 return ret
486 return ret
484
487
485 # End of ipeercommands interface.
488 # End of ipeercommands interface.
486
489
487 # Begin of ipeerlegacycommands interface.
490 # Begin of ipeerlegacycommands interface.
488
491
489 def branches(self, nodes):
492 def branches(self, nodes):
490 n = wireprototypes.encodelist(nodes)
493 n = wireprototypes.encodelist(nodes)
491 d = self._call("branches", nodes=n)
494 d = self._call("branches", nodes=n)
492 try:
495 try:
493 br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
496 br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
494 return br
497 return br
495 except ValueError:
498 except ValueError:
496 self._abort(error.ResponseError(_("unexpected response:"), d))
499 self._abort(error.ResponseError(_("unexpected response:"), d))
497
500
498 def between(self, pairs):
501 def between(self, pairs):
499 batch = 8 # avoid giant requests
502 batch = 8 # avoid giant requests
500 r = []
503 r = []
501 for i in pycompat.xrange(0, len(pairs), batch):
504 for i in pycompat.xrange(0, len(pairs), batch):
502 n = " ".join([wireprototypes.encodelist(p, '-')
505 n = " ".join([wireprototypes.encodelist(p, '-')
503 for p in pairs[i:i + batch]])
506 for p in pairs[i:i + batch]])
504 d = self._call("between", pairs=n)
507 d = self._call("between", pairs=n)
505 try:
508 try:
506 r.extend(l and wireprototypes.decodelist(l) or []
509 r.extend(l and wireprototypes.decodelist(l) or []
507 for l in d.splitlines())
510 for l in d.splitlines())
508 except ValueError:
511 except ValueError:
509 self._abort(error.ResponseError(_("unexpected response:"), d))
512 self._abort(error.ResponseError(_("unexpected response:"), d))
510 return r
513 return r
511
514
512 def changegroup(self, nodes, source):
515 def changegroup(self, nodes, source):
513 n = wireprototypes.encodelist(nodes)
516 n = wireprototypes.encodelist(nodes)
514 f = self._callcompressable("changegroup", roots=n)
517 f = self._callcompressable("changegroup", roots=n)
515 return changegroupmod.cg1unpacker(f, 'UN')
518 return changegroupmod.cg1unpacker(f, 'UN')
516
519
517 def changegroupsubset(self, bases, heads, source):
520 def changegroupsubset(self, bases, heads, source):
518 self.requirecap('changegroupsubset', _('look up remote changes'))
521 self.requirecap('changegroupsubset', _('look up remote changes'))
519 bases = wireprototypes.encodelist(bases)
522 bases = wireprototypes.encodelist(bases)
520 heads = wireprototypes.encodelist(heads)
523 heads = wireprototypes.encodelist(heads)
521 f = self._callcompressable("changegroupsubset",
524 f = self._callcompressable("changegroupsubset",
522 bases=bases, heads=heads)
525 bases=bases, heads=heads)
523 return changegroupmod.cg1unpacker(f, 'UN')
526 return changegroupmod.cg1unpacker(f, 'UN')
524
527
525 # End of ipeerlegacycommands interface.
528 # End of ipeerlegacycommands interface.
526
529
527 def _submitbatch(self, req):
530 def _submitbatch(self, req):
528 """run batch request <req> on the server
531 """run batch request <req> on the server
529
532
530 Returns an iterator of the raw responses from the server.
533 Returns an iterator of the raw responses from the server.
531 """
534 """
532 ui = self.ui
535 ui = self.ui
533 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
536 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
534 ui.debug('devel-peer-request: batched-content\n')
537 ui.debug('devel-peer-request: batched-content\n')
535 for op, args in req:
538 for op, args in req:
536 msg = 'devel-peer-request: - %s (%d arguments)\n'
539 msg = 'devel-peer-request: - %s (%d arguments)\n'
537 ui.debug(msg % (op, len(args)))
540 ui.debug(msg % (op, len(args)))
538
541
539 unescapearg = wireprototypes.unescapebatcharg
542 unescapearg = wireprototypes.unescapebatcharg
540
543
541 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
544 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
542 chunk = rsp.read(1024)
545 chunk = rsp.read(1024)
543 work = [chunk]
546 work = [chunk]
544 while chunk:
547 while chunk:
545 while ';' not in chunk and chunk:
548 while ';' not in chunk and chunk:
546 chunk = rsp.read(1024)
549 chunk = rsp.read(1024)
547 work.append(chunk)
550 work.append(chunk)
548 merged = ''.join(work)
551 merged = ''.join(work)
549 while ';' in merged:
552 while ';' in merged:
550 one, merged = merged.split(';', 1)
553 one, merged = merged.split(';', 1)
551 yield unescapearg(one)
554 yield unescapearg(one)
552 chunk = rsp.read(1024)
555 chunk = rsp.read(1024)
553 work = [merged, chunk]
556 work = [merged, chunk]
554 yield unescapearg(''.join(work))
557 yield unescapearg(''.join(work))
555
558
556 def _submitone(self, op, args):
559 def _submitone(self, op, args):
557 return self._call(op, **pycompat.strkwargs(args))
560 return self._call(op, **pycompat.strkwargs(args))
558
561
559 def debugwireargs(self, one, two, three=None, four=None, five=None):
562 def debugwireargs(self, one, two, three=None, four=None, five=None):
560 # don't pass optional arguments left at their default value
563 # don't pass optional arguments left at their default value
561 opts = {}
564 opts = {}
562 if three is not None:
565 if three is not None:
563 opts[r'three'] = three
566 opts[r'three'] = three
564 if four is not None:
567 if four is not None:
565 opts[r'four'] = four
568 opts[r'four'] = four
566 return self._call('debugwireargs', one=one, two=two, **opts)
569 return self._call('debugwireargs', one=one, two=two, **opts)
567
570
568 def _call(self, cmd, **args):
571 def _call(self, cmd, **args):
569 """execute <cmd> on the server
572 """execute <cmd> on the server
570
573
571 The command is expected to return a simple string.
574 The command is expected to return a simple string.
572
575
573 returns the server reply as a string."""
576 returns the server reply as a string."""
574 raise NotImplementedError()
577 raise NotImplementedError()
575
578
576 def _callstream(self, cmd, **args):
579 def _callstream(self, cmd, **args):
577 """execute <cmd> on the server
580 """execute <cmd> on the server
578
581
579 The command is expected to return a stream. Note that if the
582 The command is expected to return a stream. Note that if the
580 command doesn't return a stream, _callstream behaves
583 command doesn't return a stream, _callstream behaves
581 differently for ssh and http peers.
584 differently for ssh and http peers.
582
585
583 returns the server reply as a file like object.
586 returns the server reply as a file like object.
584 """
587 """
585 raise NotImplementedError()
588 raise NotImplementedError()
586
589
587 def _callcompressable(self, cmd, **args):
590 def _callcompressable(self, cmd, **args):
588 """execute <cmd> on the server
591 """execute <cmd> on the server
589
592
590 The command is expected to return a stream.
593 The command is expected to return a stream.
591
594
592 The stream may have been compressed in some implementations. This
595 The stream may have been compressed in some implementations. This
593 function takes care of the decompression. This is the only difference
596 function takes care of the decompression. This is the only difference
594 with _callstream.
597 with _callstream.
595
598
596 returns the server reply as a file like object.
599 returns the server reply as a file like object.
597 """
600 """
598 raise NotImplementedError()
601 raise NotImplementedError()
599
602
600 def _callpush(self, cmd, fp, **args):
603 def _callpush(self, cmd, fp, **args):
601 """execute a <cmd> on server
604 """execute a <cmd> on server
602
605
603 The command is expected to be related to a push. Push has a special
606 The command is expected to be related to a push. Push has a special
604 return method.
607 return method.
605
608
606 returns the server reply as a (ret, output) tuple. ret is either
609 returns the server reply as a (ret, output) tuple. ret is either
607 empty (error) or a stringified int.
610 empty (error) or a stringified int.
608 """
611 """
609 raise NotImplementedError()
612 raise NotImplementedError()
610
613
611 def _calltwowaystream(self, cmd, fp, **args):
614 def _calltwowaystream(self, cmd, fp, **args):
612 """execute <cmd> on server
615 """execute <cmd> on server
613
616
614 The command will send a stream to the server and get a stream in reply.
617 The command will send a stream to the server and get a stream in reply.
615 """
618 """
616 raise NotImplementedError()
619 raise NotImplementedError()
617
620
618 def _abort(self, exception):
621 def _abort(self, exception):
619 """clearly abort the wire protocol connection and raise the exception
622 """clearly abort the wire protocol connection and raise the exception
620 """
623 """
621 raise NotImplementedError()
624 raise NotImplementedError()
General Comments 0
You need to be logged in to leave comments. Login now