##// END OF EJS Templates
wireproto: implement batching on peer executor interface...
Gregory Szorc -
r37649:2f626233 default
parent child Browse files
Show More
@@ -1,267 +1,270 b''
1 1 # setdiscovery.py - improved discovery of common nodeset for mercurial
2 2 #
3 3 # Copyright 2010 Benoit Boissinot <bboissin@gmail.com>
4 4 # and Peter Arrenbrecht <peter@arrenbrecht.ch>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8 """
9 9 Algorithm works in the following way. You have two repository: local and
10 10 remote. They both contains a DAG of changelists.
11 11
12 12 The goal of the discovery protocol is to find one set of node *common*,
13 13 the set of nodes shared by local and remote.
14 14
15 15 One of the issue with the original protocol was latency, it could
16 16 potentially require lots of roundtrips to discover that the local repo was a
17 17 subset of remote (which is a very common case, you usually have few changes
18 18 compared to upstream, while upstream probably had lots of development).
19 19
20 20 The new protocol only requires one interface for the remote repo: `known()`,
21 21 which given a set of changelists tells you if they are present in the DAG.
22 22
23 23 The algorithm then works as follow:
24 24
25 25 - We will be using three sets, `common`, `missing`, `unknown`. Originally
26 26 all nodes are in `unknown`.
27 27 - Take a sample from `unknown`, call `remote.known(sample)`
28 28 - For each node that remote knows, move it and all its ancestors to `common`
29 29 - For each node that remote doesn't know, move it and all its descendants
30 30 to `missing`
31 31 - Iterate until `unknown` is empty
32 32
33 33 There are a couple optimizations, first is instead of starting with a random
34 34 sample of missing, start by sending all heads, in the case where the local
35 35 repo is a subset, you computed the answer in one round trip.
36 36
37 37 Then you can do something similar to the bisecting strategy used when
38 38 finding faulty changesets. Instead of random samples, you can try picking
39 39 nodes that will maximize the number of nodes that will be
40 40 classified with it (since all ancestors or descendants will be marked as well).
41 41 """
42 42
43 43 from __future__ import absolute_import
44 44
45 45 import collections
46 46 import random
47 47
48 48 from .i18n import _
49 49 from .node import (
50 50 nullid,
51 51 nullrev,
52 52 )
53 53 from . import (
54 54 dagutil,
55 55 error,
56 56 util,
57 57 )
58 58
59 59 def _updatesample(dag, nodes, sample, quicksamplesize=0):
60 60 """update an existing sample to match the expected size
61 61
62 62 The sample is updated with nodes exponentially distant from each head of the
63 63 <nodes> set. (H~1, H~2, H~4, H~8, etc).
64 64
65 65 If a target size is specified, the sampling will stop once this size is
66 66 reached. Otherwise sampling will happen until roots of the <nodes> set are
67 67 reached.
68 68
69 69 :dag: a dag object from dagutil
70 70 :nodes: set of nodes we want to discover (if None, assume the whole dag)
71 71 :sample: a sample to update
72 72 :quicksamplesize: optional target size of the sample"""
73 73 # if nodes is empty we scan the entire graph
74 74 if nodes:
75 75 heads = dag.headsetofconnecteds(nodes)
76 76 else:
77 77 heads = dag.heads()
78 78 dist = {}
79 79 visit = collections.deque(heads)
80 80 seen = set()
81 81 factor = 1
82 82 while visit:
83 83 curr = visit.popleft()
84 84 if curr in seen:
85 85 continue
86 86 d = dist.setdefault(curr, 1)
87 87 if d > factor:
88 88 factor *= 2
89 89 if d == factor:
90 90 sample.add(curr)
91 91 if quicksamplesize and (len(sample) >= quicksamplesize):
92 92 return
93 93 seen.add(curr)
94 94 for p in dag.parents(curr):
95 95 if not nodes or p in nodes:
96 96 dist.setdefault(p, d + 1)
97 97 visit.append(p)
98 98
99 99 def _takequicksample(dag, nodes, size):
100 100 """takes a quick sample of size <size>
101 101
102 102 It is meant for initial sampling and focuses on querying heads and close
103 103 ancestors of heads.
104 104
105 105 :dag: a dag object
106 106 :nodes: set of nodes to discover
107 107 :size: the maximum size of the sample"""
108 108 sample = dag.headsetofconnecteds(nodes)
109 109 if len(sample) >= size:
110 110 return _limitsample(sample, size)
111 111 _updatesample(dag, None, sample, quicksamplesize=size)
112 112 return sample
113 113
114 114 def _takefullsample(dag, nodes, size):
115 115 sample = dag.headsetofconnecteds(nodes)
116 116 # update from heads
117 117 _updatesample(dag, nodes, sample)
118 118 # update from roots
119 119 _updatesample(dag.inverse(), nodes, sample)
120 120 assert sample
121 121 sample = _limitsample(sample, size)
122 122 if len(sample) < size:
123 123 more = size - len(sample)
124 124 sample.update(random.sample(list(nodes - sample), more))
125 125 return sample
126 126
127 127 def _limitsample(sample, desiredlen):
128 128 """return a random subset of sample of at most desiredlen item"""
129 129 if len(sample) > desiredlen:
130 130 sample = set(random.sample(sample, desiredlen))
131 131 return sample
132 132
133 133 def findcommonheads(ui, local, remote,
134 134 initialsamplesize=100,
135 135 fullsamplesize=200,
136 136 abortwhenunrelated=True,
137 137 ancestorsof=None):
138 138 '''Return a tuple (common, anyincoming, remoteheads) used to identify
139 139 missing nodes from or in remote.
140 140 '''
141 141 start = util.timer()
142 142
143 143 roundtrips = 0
144 144 cl = local.changelog
145 145 localsubset = None
146 146 if ancestorsof is not None:
147 147 rev = local.changelog.rev
148 148 localsubset = [rev(n) for n in ancestorsof]
149 149 dag = dagutil.revlogdag(cl, localsubset=localsubset)
150 150
151 151 # early exit if we know all the specified remote heads already
152 152 ui.debug("query 1; heads\n")
153 153 roundtrips += 1
154 154 ownheads = dag.heads()
155 155 sample = _limitsample(ownheads, initialsamplesize)
156 156 # indices between sample and externalized version must match
157 157 sample = list(sample)
158 batch = remote.iterbatch()
159 batch.heads()
160 batch.known(dag.externalizeall(sample))
161 batch.submit()
162 srvheadhashes, yesno = batch.results()
158
159 with remote.commandexecutor() as e:
160 fheads = e.callcommand('heads', {})
161 fknown = e.callcommand('known', {
162 'nodes': dag.externalizeall(sample),
163 })
164
165 srvheadhashes, yesno = fheads.result(), fknown.result()
163 166
164 167 if cl.tip() == nullid:
165 168 if srvheadhashes != [nullid]:
166 169 return [nullid], True, srvheadhashes
167 170 return [nullid], False, []
168 171
169 172 # start actual discovery (we note this before the next "if" for
170 173 # compatibility reasons)
171 174 ui.status(_("searching for changes\n"))
172 175
173 176 srvheads = dag.internalizeall(srvheadhashes, filterunknown=True)
174 177 if len(srvheads) == len(srvheadhashes):
175 178 ui.debug("all remote heads known locally\n")
176 179 return (srvheadhashes, False, srvheadhashes,)
177 180
178 181 if len(sample) == len(ownheads) and all(yesno):
179 182 ui.note(_("all local heads known remotely\n"))
180 183 ownheadhashes = dag.externalizeall(ownheads)
181 184 return (ownheadhashes, True, srvheadhashes,)
182 185
183 186 # full blown discovery
184 187
185 188 # own nodes I know we both know
186 189 # treat remote heads (and maybe own heads) as a first implicit sample
187 190 # response
188 191 common = cl.incrementalmissingrevs(srvheads)
189 192 commoninsample = set(n for i, n in enumerate(sample) if yesno[i])
190 193 common.addbases(commoninsample)
191 194 # own nodes where I don't know if remote knows them
192 195 undecided = set(common.missingancestors(ownheads))
193 196 # own nodes I know remote lacks
194 197 missing = set()
195 198
196 199 full = False
197 200 while undecided:
198 201
199 202 if sample:
200 203 missinginsample = [n for i, n in enumerate(sample) if not yesno[i]]
201 204 missing.update(dag.descendantset(missinginsample, missing))
202 205
203 206 undecided.difference_update(missing)
204 207
205 208 if not undecided:
206 209 break
207 210
208 211 if full or common.hasbases():
209 212 if full:
210 213 ui.note(_("sampling from both directions\n"))
211 214 else:
212 215 ui.debug("taking initial sample\n")
213 216 samplefunc = _takefullsample
214 217 targetsize = fullsamplesize
215 218 else:
216 219 # use even cheaper initial sample
217 220 ui.debug("taking quick initial sample\n")
218 221 samplefunc = _takequicksample
219 222 targetsize = initialsamplesize
220 223 if len(undecided) < targetsize:
221 224 sample = list(undecided)
222 225 else:
223 226 sample = samplefunc(dag, undecided, targetsize)
224 227
225 228 roundtrips += 1
226 229 ui.progress(_('searching'), roundtrips, unit=_('queries'))
227 230 ui.debug("query %i; still undecided: %i, sample size is: %i\n"
228 231 % (roundtrips, len(undecided), len(sample)))
229 232 # indices between sample and externalized version must match
230 233 sample = list(sample)
231 234
232 235 with remote.commandexecutor() as e:
233 236 yesno = e.callcommand('known', {
234 237 'nodes': dag.externalizeall(sample),
235 238 }).result()
236 239
237 240 full = True
238 241
239 242 if sample:
240 243 commoninsample = set(n for i, n in enumerate(sample) if yesno[i])
241 244 common.addbases(commoninsample)
242 245 common.removeancestorsfrom(undecided)
243 246
244 247 # heads(common) == heads(common.bases) since common represents common.bases
245 248 # and all its ancestors
246 249 result = dag.headsetofconnecteds(common.bases)
247 250 # common.bases can include nullrev, but our contract requires us to not
248 251 # return any heads in that case, so discard that
249 252 result.discard(nullrev)
250 253 elapsed = util.timer() - start
251 254 ui.progress(_('searching'), None)
252 255 ui.debug("%d total queries in %.4fs\n" % (roundtrips, elapsed))
253 256 msg = ('found %d common and %d unknown server heads,'
254 257 ' %d roundtrips in %.4fs\n')
255 258 missing = set(result) - set(srvheads)
256 259 ui.log('discovery', msg, len(result), len(missing), roundtrips,
257 260 elapsed)
258 261
259 262 if not result and srvheadhashes != [nullid]:
260 263 if abortwhenunrelated:
261 264 raise error.Abort(_("repository is unrelated"))
262 265 else:
263 266 ui.warn(_("warning: repository is unrelated\n"))
264 267 return ({nullid}, True, srvheadhashes,)
265 268
266 269 anyincoming = (srvheadhashes != [nullid])
267 270 return dag.externalizeall(result), anyincoming, srvheadhashes
@@ -1,576 +1,708 b''
1 1 # wireprotov1peer.py - Client-side functionality for wire protocol version 1.
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import sys
12 import weakref
12 13
13 14 from .i18n import _
14 15 from .node import (
15 16 bin,
16 17 )
17 18 from .thirdparty.zope import (
18 19 interface as zi,
19 20 )
20 21 from . import (
21 22 bundle2,
22 23 changegroup as changegroupmod,
23 24 encoding,
24 25 error,
25 26 pushkey as pushkeymod,
26 27 pycompat,
27 28 repository,
28 29 util,
29 30 wireprototypes,
30 31 )
31 32
32 33 urlreq = util.urlreq
33 34
34 35 def batchable(f):
35 36 '''annotation for batchable methods
36 37
37 38 Such methods must implement a coroutine as follows:
38 39
39 40 @batchable
40 41 def sample(self, one, two=None):
41 42 # Build list of encoded arguments suitable for your wire protocol:
42 43 encargs = [('one', encode(one),), ('two', encode(two),)]
43 44 # Create future for injection of encoded result:
44 45 encresref = future()
45 46 # Return encoded arguments and future:
46 47 yield encargs, encresref
47 48 # Assuming the future to be filled with the result from the batched
48 49 # request now. Decode it:
49 50 yield decode(encresref.value)
50 51
51 52 The decorator returns a function which wraps this coroutine as a plain
52 53 method, but adds the original method as an attribute called "batchable",
53 54 which is used by remotebatch to split the call into separate encoding and
54 55 decoding phases.
55 56 '''
56 57 def plain(*args, **opts):
57 58 batchable = f(*args, **opts)
58 59 encargsorres, encresref = next(batchable)
59 60 if not encresref:
60 61 return encargsorres # a local result in this case
61 62 self = args[0]
62 63 cmd = pycompat.bytesurl(f.__name__) # ensure cmd is ascii bytestr
63 64 encresref.set(self._submitone(cmd, encargsorres))
64 65 return next(batchable)
65 66 setattr(plain, 'batchable', f)
66 67 return plain
67 68
68 69 class future(object):
69 70 '''placeholder for a value to be set later'''
70 71 def set(self, value):
71 72 if util.safehasattr(self, 'value'):
72 73 raise error.RepoError("future is already set")
73 74 self.value = value
74 75
75 76 class batcher(object):
76 77 '''base class for batches of commands submittable in a single request
77 78
78 79 All methods invoked on instances of this class are simply queued and
79 80 return a a future for the result. Once you call submit(), all the queued
80 81 calls are performed and the results set in their respective futures.
81 82 '''
82 83 def __init__(self):
83 84 self.calls = []
84 85 def __getattr__(self, name):
85 86 def call(*args, **opts):
86 87 resref = future()
87 88 # Please don't invent non-ascii method names, or you will
88 89 # give core hg a very sad time.
89 90 self.calls.append((name.encode('ascii'), args, opts, resref,))
90 91 return resref
91 92 return call
92 93 def submit(self):
93 94 raise NotImplementedError()
94 95
95 96 class iterbatcher(batcher):
96 97
97 98 def submit(self):
98 99 raise NotImplementedError()
99 100
100 101 def results(self):
101 102 raise NotImplementedError()
102 103
103 104 class remoteiterbatcher(iterbatcher):
104 105 def __init__(self, remote):
105 106 super(remoteiterbatcher, self).__init__()
106 107 self._remote = remote
107 108
108 109 def __getattr__(self, name):
109 110 # Validate this method is batchable, since submit() only supports
110 111 # batchable methods.
111 112 fn = getattr(self._remote, name)
112 113 if not getattr(fn, 'batchable', None):
113 114 raise error.ProgrammingError('Attempted to batch a non-batchable '
114 115 'call to %r' % name)
115 116
116 117 return super(remoteiterbatcher, self).__getattr__(name)
117 118
118 119 def submit(self):
119 120 """Break the batch request into many patch calls and pipeline them.
120 121
121 122 This is mostly valuable over http where request sizes can be
122 123 limited, but can be used in other places as well.
123 124 """
124 125 # 2-tuple of (command, arguments) that represents what will be
125 126 # sent over the wire.
126 127 requests = []
127 128
128 129 # 4-tuple of (command, final future, @batchable generator, remote
129 130 # future).
130 131 results = []
131 132
132 133 for command, args, opts, finalfuture in self.calls:
133 134 mtd = getattr(self._remote, command)
134 135 batchable = mtd.batchable(mtd.__self__, *args, **opts)
135 136
136 137 commandargs, fremote = next(batchable)
137 138 assert fremote
138 139 requests.append((command, commandargs))
139 140 results.append((command, finalfuture, batchable, fremote))
140 141
141 142 if requests:
142 143 self._resultiter = self._remote._submitbatch(requests)
143 144
144 145 self._results = results
145 146
146 147 def results(self):
147 148 for command, finalfuture, batchable, remotefuture in self._results:
148 149 # Get the raw result, set it in the remote future, feed it
149 150 # back into the @batchable generator so it can be decoded, and
150 151 # set the result on the final future to this value.
151 152 remoteresult = next(self._resultiter)
152 153 remotefuture.set(remoteresult)
153 154 finalfuture.set(next(batchable))
154 155
155 156 # Verify our @batchable generators only emit 2 values.
156 157 try:
157 158 next(batchable)
158 159 except StopIteration:
159 160 pass
160 161 else:
161 162 raise error.ProgrammingError('%s @batchable generator emitted '
162 163 'unexpected value count' % command)
163 164
164 165 yield finalfuture.value
165 166
166 167 def encodebatchcmds(req):
167 168 """Return a ``cmds`` argument value for the ``batch`` command."""
168 169 escapearg = wireprototypes.escapebatcharg
169 170
170 171 cmds = []
171 172 for op, argsdict in req:
172 173 # Old servers didn't properly unescape argument names. So prevent
173 174 # the sending of argument names that may not be decoded properly by
174 175 # servers.
175 176 assert all(escapearg(k) == k for k in argsdict)
176 177
177 178 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
178 179 for k, v in argsdict.iteritems())
179 180 cmds.append('%s %s' % (op, args))
180 181
181 182 return ';'.join(cmds)
182 183
184 class unsentfuture(pycompat.futures.Future):
185 """A Future variation to represent an unsent command.
186
187 Because we buffer commands and don't submit them immediately, calling
188 ``result()`` on an unsent future could deadlock. Futures for buffered
189 commands are represented by this type, which wraps ``result()`` to
190 call ``sendcommands()``.
191 """
192
193 def result(self, timeout=None):
194 if self.done():
195 return pycompat.futures.Future.result(self, timeout)
196
197 self._peerexecutor.sendcommands()
198
199 # This looks like it will infinitely recurse. However,
200 # sendcommands() should modify __class__. This call serves as a check
201 # on that.
202 return self.result(timeout)
203
183 204 @zi.implementer(repository.ipeercommandexecutor)
184 205 class peerexecutor(object):
185 206 def __init__(self, peer):
186 207 self._peer = peer
187 208 self._sent = False
188 209 self._closed = False
189 210 self._calls = []
211 self._futures = weakref.WeakSet()
212 self._responseexecutor = None
213 self._responsef = None
190 214
191 215 def __enter__(self):
192 216 return self
193 217
194 218 def __exit__(self, exctype, excvalee, exctb):
195 219 self.close()
196 220
197 221 def callcommand(self, command, args):
198 222 if self._sent:
199 223 raise error.ProgrammingError('callcommand() cannot be used '
200 224 'after commands are sent')
201 225
202 226 if self._closed:
203 227 raise error.ProgrammingError('callcommand() cannot be used '
204 228 'after close()')
205 229
206 230 # Commands are dispatched through methods on the peer.
207 231 fn = getattr(self._peer, pycompat.sysstr(command), None)
208 232
209 233 if not fn:
210 234 raise error.ProgrammingError(
211 235 'cannot call command %s: method of same name not available '
212 236 'on peer' % command)
213 237
214 238 # Commands are either batchable or they aren't. If a command
215 239 # isn't batchable, we send it immediately because the executor
216 240 # can no longer accept new commands after a non-batchable command.
217 # If a command is batchable, we queue it for later.
241 # If a command is batchable, we queue it for later. But we have
242 # to account for the case of a non-batchable command arriving after
243 # a batchable one and refuse to service it.
244
245 def addcall():
246 f = pycompat.futures.Future()
247 self._futures.add(f)
248 self._calls.append((command, args, fn, f))
249 return f
218 250
219 251 if getattr(fn, 'batchable', False):
220 pass
252 f = addcall()
253
254 # But since we don't issue it immediately, we wrap its result()
255 # to trigger sending so we avoid deadlocks.
256 f.__class__ = unsentfuture
257 f._peerexecutor = self
221 258 else:
222 259 if self._calls:
223 260 raise error.ProgrammingError(
224 261 '%s is not batchable and cannot be called on a command '
225 262 'executor along with other commands' % command)
226 263
227 # We don't support batching yet. So resolve it immediately.
228 f = pycompat.futures.Future()
229 self._calls.append((command, args, fn, f))
264 f = addcall()
265
266 # Non-batchable commands can never coexist with another command
267 # in this executor. So send the command immediately.
230 268 self.sendcommands()
269
231 270 return f
232 271
233 272 def sendcommands(self):
234 273 if self._sent:
235 274 return
236 275
237 276 if not self._calls:
238 277 return
239 278
240 279 self._sent = True
241 280
281 # Unhack any future types so caller seens a clean type and to break
282 # cycle between us and futures.
283 for f in self._futures:
284 if isinstance(f, unsentfuture):
285 f.__class__ = pycompat.futures.Future
286 f._peerexecutor = None
287
242 288 calls = self._calls
243 289 # Mainly to destroy references to futures.
244 290 self._calls = None
245 291
292 # Simple case of a single command. We call it synchronously.
246 293 if len(calls) == 1:
247 294 command, args, fn, f = calls[0]
248 295
249 296 # Future was cancelled. Ignore it.
250 297 if not f.set_running_or_notify_cancel():
251 298 return
252 299
253 300 try:
254 301 result = fn(**pycompat.strkwargs(args))
255 302 except Exception:
256 303 f.set_exception_info(*sys.exc_info()[1:])
257 304 else:
258 305 f.set_result(result)
259 306
260 307 return
261 308
262 raise error.ProgrammingError('support for multiple commands not '
263 'yet implemented')
309 # Batch commands are a bit harder. First, we have to deal with the
310 # @batchable coroutine. That's a bit annoying. Furthermore, we also
311 # need to preserve streaming. i.e. it should be possible for the
312 # futures to resolve as data is coming in off the wire without having
313 # to wait for the final byte of the final response. We do this by
314 # spinning up a thread to read the responses.
315
316 requests = []
317 states = []
318
319 for command, args, fn, f in calls:
320 # Future was cancelled. Ignore it.
321 if not f.set_running_or_notify_cancel():
322 continue
323
324 try:
325 batchable = fn.batchable(fn.__self__,
326 **pycompat.strkwargs(args))
327 except Exception:
328 f.set_exception_info(*sys.exc_info()[1:])
329 return
330
331 # Encoded arguments and future holding remote result.
332 try:
333 encodedargs, fremote = next(batchable)
334 except Exception:
335 f.set_exception_info(*sys.exc_info()[1:])
336 return
337
338 requests.append((command, encodedargs))
339 states.append((command, f, batchable, fremote))
340
341 if not requests:
342 return
343
344 # This will emit responses in order they were executed.
345 wireresults = self._peer._submitbatch(requests)
346
347 # The use of a thread pool executor here is a bit weird for something
348 # that only spins up a single thread. However, thread management is
349 # hard and it is easy to encounter race conditions, deadlocks, etc.
350 # concurrent.futures already solves these problems and its thread pool
351 # executor has minimal overhead. So we use it.
352 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
353 self._responsef = self._responseexecutor.submit(self._readbatchresponse,
354 states, wireresults)
264 355
265 356 def close(self):
266 357 self.sendcommands()
267 358
359 if self._closed:
360 return
361
268 362 self._closed = True
269 363
364 if not self._responsef:
365 return
366
367 # We need to wait on our in-flight response and then shut down the
368 # executor once we have a result.
369 try:
370 self._responsef.result()
371 finally:
372 self._responseexecutor.shutdown(wait=True)
373 self._responsef = None
374 self._responseexecutor = None
375
376 # If any of our futures are still in progress, mark them as
377 # errored. Otherwise a result() could wait indefinitely.
378 for f in self._futures:
379 if not f.done():
380 f.set_exception(error.ResponseError(
381 _('unfulfilled batch command response')))
382
383 self._futures = None
384
385 def _readbatchresponse(self, states, wireresults):
386 # Executes in a thread to read data off the wire.
387
388 for command, f, batchable, fremote in states:
389 # Grab raw result off the wire and teach the internal future
390 # about it.
391 remoteresult = next(wireresults)
392 fremote.set(remoteresult)
393
394 # And ask the coroutine to decode that value.
395 try:
396 result = next(batchable)
397 except Exception:
398 f.set_exception_info(*sys.exc_info()[1:])
399 else:
400 f.set_result(result)
401
270 402 class wirepeer(repository.legacypeer):
271 403 """Client-side interface for communicating with a peer repository.
272 404
273 405 Methods commonly call wire protocol commands of the same name.
274 406
275 407 See also httppeer.py and sshpeer.py for protocol-specific
276 408 implementations of this interface.
277 409 """
278 410 def commandexecutor(self):
279 411 return peerexecutor(self)
280 412
281 413 # Begin of ipeercommands interface.
282 414
283 415 def iterbatch(self):
284 416 return remoteiterbatcher(self)
285 417
286 418 @batchable
287 419 def lookup(self, key):
288 420 self.requirecap('lookup', _('look up remote revision'))
289 421 f = future()
290 422 yield {'key': encoding.fromlocal(key)}, f
291 423 d = f.value
292 424 success, data = d[:-1].split(" ", 1)
293 425 if int(success):
294 426 yield bin(data)
295 427 else:
296 428 self._abort(error.RepoError(data))
297 429
298 430 @batchable
299 431 def heads(self):
300 432 f = future()
301 433 yield {}, f
302 434 d = f.value
303 435 try:
304 436 yield wireprototypes.decodelist(d[:-1])
305 437 except ValueError:
306 438 self._abort(error.ResponseError(_("unexpected response:"), d))
307 439
308 440 @batchable
309 441 def known(self, nodes):
310 442 f = future()
311 443 yield {'nodes': wireprototypes.encodelist(nodes)}, f
312 444 d = f.value
313 445 try:
314 446 yield [bool(int(b)) for b in d]
315 447 except ValueError:
316 448 self._abort(error.ResponseError(_("unexpected response:"), d))
317 449
318 450 @batchable
319 451 def branchmap(self):
320 452 f = future()
321 453 yield {}, f
322 454 d = f.value
323 455 try:
324 456 branchmap = {}
325 457 for branchpart in d.splitlines():
326 458 branchname, branchheads = branchpart.split(' ', 1)
327 459 branchname = encoding.tolocal(urlreq.unquote(branchname))
328 460 branchheads = wireprototypes.decodelist(branchheads)
329 461 branchmap[branchname] = branchheads
330 462 yield branchmap
331 463 except TypeError:
332 464 self._abort(error.ResponseError(_("unexpected response:"), d))
333 465
334 466 @batchable
335 467 def listkeys(self, namespace):
336 468 if not self.capable('pushkey'):
337 469 yield {}, None
338 470 f = future()
339 471 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
340 472 yield {'namespace': encoding.fromlocal(namespace)}, f
341 473 d = f.value
342 474 self.ui.debug('received listkey for "%s": %i bytes\n'
343 475 % (namespace, len(d)))
344 476 yield pushkeymod.decodekeys(d)
345 477
346 478 @batchable
347 479 def pushkey(self, namespace, key, old, new):
348 480 if not self.capable('pushkey'):
349 481 yield False, None
350 482 f = future()
351 483 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
352 484 yield {'namespace': encoding.fromlocal(namespace),
353 485 'key': encoding.fromlocal(key),
354 486 'old': encoding.fromlocal(old),
355 487 'new': encoding.fromlocal(new)}, f
356 488 d = f.value
357 489 d, output = d.split('\n', 1)
358 490 try:
359 491 d = bool(int(d))
360 492 except ValueError:
361 493 raise error.ResponseError(
362 494 _('push failed (unexpected response):'), d)
363 495 for l in output.splitlines(True):
364 496 self.ui.status(_('remote: '), l)
365 497 yield d
366 498
367 499 def stream_out(self):
368 500 return self._callstream('stream_out')
369 501
370 502 def getbundle(self, source, **kwargs):
371 503 kwargs = pycompat.byteskwargs(kwargs)
372 504 self.requirecap('getbundle', _('look up remote changes'))
373 505 opts = {}
374 506 bundlecaps = kwargs.get('bundlecaps') or set()
375 507 for key, value in kwargs.iteritems():
376 508 if value is None:
377 509 continue
378 510 keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
379 511 if keytype is None:
380 512 raise error.ProgrammingError(
381 513 'Unexpectedly None keytype for key %s' % key)
382 514 elif keytype == 'nodes':
383 515 value = wireprototypes.encodelist(value)
384 516 elif keytype == 'csv':
385 517 value = ','.join(value)
386 518 elif keytype == 'scsv':
387 519 value = ','.join(sorted(value))
388 520 elif keytype == 'boolean':
389 521 value = '%i' % bool(value)
390 522 elif keytype != 'plain':
391 523 raise KeyError('unknown getbundle option type %s'
392 524 % keytype)
393 525 opts[key] = value
394 526 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
395 527 if any((cap.startswith('HG2') for cap in bundlecaps)):
396 528 return bundle2.getunbundler(self.ui, f)
397 529 else:
398 530 return changegroupmod.cg1unpacker(f, 'UN')
399 531
400 532 def unbundle(self, cg, heads, url):
401 533 '''Send cg (a readable file-like object representing the
402 534 changegroup to push, typically a chunkbuffer object) to the
403 535 remote server as a bundle.
404 536
405 537 When pushing a bundle10 stream, return an integer indicating the
406 538 result of the push (see changegroup.apply()).
407 539
408 540 When pushing a bundle20 stream, return a bundle20 stream.
409 541
410 542 `url` is the url the client thinks it's pushing to, which is
411 543 visible to hooks.
412 544 '''
413 545
414 546 if heads != ['force'] and self.capable('unbundlehash'):
415 547 heads = wireprototypes.encodelist(
416 548 ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()])
417 549 else:
418 550 heads = wireprototypes.encodelist(heads)
419 551
420 552 if util.safehasattr(cg, 'deltaheader'):
421 553 # this a bundle10, do the old style call sequence
422 554 ret, output = self._callpush("unbundle", cg, heads=heads)
423 555 if ret == "":
424 556 raise error.ResponseError(
425 557 _('push failed:'), output)
426 558 try:
427 559 ret = int(ret)
428 560 except ValueError:
429 561 raise error.ResponseError(
430 562 _('push failed (unexpected response):'), ret)
431 563
432 564 for l in output.splitlines(True):
433 565 self.ui.status(_('remote: '), l)
434 566 else:
435 567 # bundle2 push. Send a stream, fetch a stream.
436 568 stream = self._calltwowaystream('unbundle', cg, heads=heads)
437 569 ret = bundle2.getunbundler(self.ui, stream)
438 570 return ret
439 571
440 572 # End of ipeercommands interface.
441 573
442 574 # Begin of ipeerlegacycommands interface.
443 575
444 576 def branches(self, nodes):
445 577 n = wireprototypes.encodelist(nodes)
446 578 d = self._call("branches", nodes=n)
447 579 try:
448 580 br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
449 581 return br
450 582 except ValueError:
451 583 self._abort(error.ResponseError(_("unexpected response:"), d))
452 584
453 585 def between(self, pairs):
454 586 batch = 8 # avoid giant requests
455 587 r = []
456 588 for i in xrange(0, len(pairs), batch):
457 589 n = " ".join([wireprototypes.encodelist(p, '-')
458 590 for p in pairs[i:i + batch]])
459 591 d = self._call("between", pairs=n)
460 592 try:
461 593 r.extend(l and wireprototypes.decodelist(l) or []
462 594 for l in d.splitlines())
463 595 except ValueError:
464 596 self._abort(error.ResponseError(_("unexpected response:"), d))
465 597 return r
466 598
467 599 def changegroup(self, nodes, kind):
468 600 n = wireprototypes.encodelist(nodes)
469 601 f = self._callcompressable("changegroup", roots=n)
470 602 return changegroupmod.cg1unpacker(f, 'UN')
471 603
472 604 def changegroupsubset(self, bases, heads, kind):
473 605 self.requirecap('changegroupsubset', _('look up remote changes'))
474 606 bases = wireprototypes.encodelist(bases)
475 607 heads = wireprototypes.encodelist(heads)
476 608 f = self._callcompressable("changegroupsubset",
477 609 bases=bases, heads=heads)
478 610 return changegroupmod.cg1unpacker(f, 'UN')
479 611
480 612 # End of ipeerlegacycommands interface.
481 613
482 614 def _submitbatch(self, req):
483 615 """run batch request <req> on the server
484 616
485 617 Returns an iterator of the raw responses from the server.
486 618 """
487 619 ui = self.ui
488 620 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
489 621 ui.debug('devel-peer-request: batched-content\n')
490 622 for op, args in req:
491 623 msg = 'devel-peer-request: - %s (%d arguments)\n'
492 624 ui.debug(msg % (op, len(args)))
493 625
494 626 unescapearg = wireprototypes.unescapebatcharg
495 627
496 628 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
497 629 chunk = rsp.read(1024)
498 630 work = [chunk]
499 631 while chunk:
500 632 while ';' not in chunk and chunk:
501 633 chunk = rsp.read(1024)
502 634 work.append(chunk)
503 635 merged = ''.join(work)
504 636 while ';' in merged:
505 637 one, merged = merged.split(';', 1)
506 638 yield unescapearg(one)
507 639 chunk = rsp.read(1024)
508 640 work = [merged, chunk]
509 641 yield unescapearg(''.join(work))
510 642
511 643 def _submitone(self, op, args):
512 644 return self._call(op, **pycompat.strkwargs(args))
513 645
514 646 def debugwireargs(self, one, two, three=None, four=None, five=None):
515 647 # don't pass optional arguments left at their default value
516 648 opts = {}
517 649 if three is not None:
518 650 opts[r'three'] = three
519 651 if four is not None:
520 652 opts[r'four'] = four
521 653 return self._call('debugwireargs', one=one, two=two, **opts)
522 654
523 655 def _call(self, cmd, **args):
524 656 """execute <cmd> on the server
525 657
526 658 The command is expected to return a simple string.
527 659
528 660 returns the server reply as a string."""
529 661 raise NotImplementedError()
530 662
531 663 def _callstream(self, cmd, **args):
532 664 """execute <cmd> on the server
533 665
534 666 The command is expected to return a stream. Note that if the
535 667 command doesn't return a stream, _callstream behaves
536 668 differently for ssh and http peers.
537 669
538 670 returns the server reply as a file like object.
539 671 """
540 672 raise NotImplementedError()
541 673
542 674 def _callcompressable(self, cmd, **args):
543 675 """execute <cmd> on the server
544 676
545 677 The command is expected to return a stream.
546 678
547 679 The stream may have been compressed in some implementations. This
548 680 function takes care of the decompression. This is the only difference
549 681 with _callstream.
550 682
551 683 returns the server reply as a file like object.
552 684 """
553 685 raise NotImplementedError()
554 686
555 687 def _callpush(self, cmd, fp, **args):
556 688 """execute a <cmd> on server
557 689
558 690 The command is expected to be related to a push. Push has a special
559 691 return method.
560 692
561 693 returns the server reply as a (ret, output) tuple. ret is either
562 694 empty (error) or a stringified int.
563 695 """
564 696 raise NotImplementedError()
565 697
566 698 def _calltwowaystream(self, cmd, fp, **args):
567 699 """execute <cmd> on server
568 700
569 701 The command will send a stream to the server and get a stream in reply.
570 702 """
571 703 raise NotImplementedError()
572 704
573 705 def _abort(self, exception):
574 706 """clearly abort the wire protocol connection and raise the exception
575 707 """
576 708 raise NotImplementedError()
General Comments 0
You need to be logged in to leave comments. Login now