##// END OF EJS Templates
clonebundle: use 'repo.vfs' instead of 'repo.opener'...
Pierre-Yves David -
r31146:16d8bec0 default
parent child Browse files
Show More
@@ -1,186 +1,186
1 1 # This software may be used and distributed according to the terms of the
2 2 # GNU General Public License version 2 or any later version.
3 3
4 4 """advertise pre-generated bundles to seed clones
5 5
6 6 "clonebundles" is a server-side extension used to advertise the existence
7 7 of pre-generated, externally hosted bundle files to clients that are
8 8 cloning so that cloning can be faster, more reliable, and require less
9 9 resources on the server.
10 10
11 11 Cloning can be a CPU and I/O intensive operation on servers. Traditionally,
12 12 the server, in response to a client's request to clone, dynamically generates
13 13 a bundle containing the entire repository content and sends it to the client.
14 14 There is no caching on the server and the server will have to redundantly
15 15 generate the same outgoing bundle in response to each clone request. For
16 16 servers with large repositories or with high clone volume, the load from
17 17 clones can make scaling the server challenging and costly.
18 18
19 19 This extension provides server operators the ability to offload potentially
20 20 expensive clone load to an external service. Here's how it works.
21 21
22 22 1. A server operator establishes a mechanism for making bundle files available
23 23 on a hosting service where Mercurial clients can fetch them.
24 24 2. A manifest file listing available bundle URLs and some optional metadata
25 25 is added to the Mercurial repository on the server.
26 26 3. A client initiates a clone against a clone bundles aware server.
27 27 4. The client sees the server is advertising clone bundles and fetches the
28 28 manifest listing available bundles.
29 29 5. The client filters and sorts the available bundles based on what it
30 30 supports and prefers.
31 31 6. The client downloads and applies an available bundle from the
32 32 server-specified URL.
33 33 7. The client reconnects to the original server and performs the equivalent
34 34 of :hg:`pull` to retrieve all repository data not in the bundle. (The
35 35 repository could have been updated between when the bundle was created
36 36 and when the client started the clone.)
37 37
38 38 Instead of the server generating full repository bundles for every clone
39 39 request, it generates full bundles once and they are subsequently reused to
40 40 bootstrap new clones. The server may still transfer data at clone time.
41 41 However, this is only data that has been added/changed since the bundle was
42 42 created. For large, established repositories, this can reduce server load for
43 43 clones to less than 1% of original.
44 44
45 45 To work, this extension requires the following of server operators:
46 46
47 47 * Generating bundle files of repository content (typically periodically,
48 48 such as once per day).
49 49 * A file server that clients have network access to and that Python knows
50 50 how to talk to through its normal URL handling facility (typically an
51 51 HTTP server).
52 52 * A process for keeping the bundles manifest in sync with available bundle
53 53 files.
54 54
55 55 Strictly speaking, using a static file hosting server isn't required: a server
56 56 operator could use a dynamic service for retrieving bundle data. However,
57 57 static file hosting services are simple and scalable and should be sufficient
58 58 for most needs.
59 59
60 60 Bundle files can be generated with the :hg:`bundle` command. Typically
61 61 :hg:`bundle --all` is used to produce a bundle of the entire repository.
62 62
63 63 :hg:`debugcreatestreamclonebundle` can be used to produce a special
64 64 *streaming clone bundle*. These are bundle files that are extremely efficient
65 65 to produce and consume (read: fast). However, they are larger than
66 66 traditional bundle formats and require that clients support the exact set
67 67 of repository data store formats in use by the repository that created them.
68 68 Typically, a newer server can serve data that is compatible with older clients.
69 69 However, *streaming clone bundles* don't have this guarantee. **Server
70 70 operators need to be aware that newer versions of Mercurial may produce
71 71 streaming clone bundles incompatible with older Mercurial versions.**
72 72
73 73 A server operator is responsible for creating a ``.hg/clonebundles.manifest``
74 74 file containing the list of available bundle files suitable for seeding
75 75 clones. If this file does not exist, the repository will not advertise the
76 76 existence of clone bundles when clients connect.
77 77
78 78 The manifest file contains a newline (\n) delimited list of entries.
79 79
80 80 Each line in this file defines an available bundle. Lines have the format:
81 81
82 82 <URL> [<key>=<value>[ <key>=<value>]]
83 83
84 84 That is, a URL followed by an optional, space-delimited list of key=value
85 85 pairs describing additional properties of this bundle. Both keys and values
86 86 are URI encoded.
87 87
88 88 Keys in UPPERCASE are reserved for use by Mercurial and are defined below.
89 89 All non-uppercase keys can be used by site installations. An example use
90 90 for custom properties is to use the *datacenter* attribute to define which
91 91 data center a file is hosted in. Clients could then prefer a server in the
92 92 data center closest to them.
93 93
94 94 The following reserved keys are currently defined:
95 95
96 96 BUNDLESPEC
97 97 A "bundle specification" string that describes the type of the bundle.
98 98
99 99 These are string values that are accepted by the "--type" argument of
100 100 :hg:`bundle`.
101 101
102 102 The values are parsed in strict mode, which means they must be of the
103 103 "<compression>-<type>" form. See
104 104 mercurial.exchange.parsebundlespec() for more details.
105 105
106 106 :hg:`debugbundle --spec` can be used to print the bundle specification
107 107 string for a bundle file. The output of this command can be used verbatim
108 108 for the value of ``BUNDLESPEC`` (it is already escaped).
109 109
110 110 Clients will automatically filter out specifications that are unknown or
111 111 unsupported so they won't attempt to download something that likely won't
112 112 apply.
113 113
114 114 The actual value doesn't impact client behavior beyond filtering:
115 115 clients will still sniff the bundle type from the header of downloaded
116 116 files.
117 117
118 118 **Use of this key is highly recommended**, as it allows clients to
119 119 easily skip unsupported bundles. If this key is not defined, an old
120 120 client may attempt to apply a bundle that it is incapable of reading.
121 121
122 122 REQUIRESNI
123 123 Whether Server Name Indication (SNI) is required to connect to the URL.
124 124 SNI allows servers to use multiple certificates on the same IP. It is
125 125 somewhat common in CDNs and other hosting providers. Older Python
126 126 versions do not support SNI. Defining this attribute enables clients
127 127 with older Python versions to filter this entry without experiencing
128 128 an opaque SSL failure at connection time.
129 129
130 130 If this is defined, it is important to advertise a non-SNI fallback
131 131 URL or clients running old Python releases may not be able to clone
132 132 with the clonebundles facility.
133 133
134 134 Value should be "true".
135 135
136 136 Manifests can contain multiple entries. Assuming metadata is defined, clients
137 137 will filter entries from the manifest that they don't support. The remaining
138 138 entries are optionally sorted by client preferences
139 139 (``experimental.clonebundleprefers`` config option). The client then attempts
140 140 to fetch the bundle at the first URL in the remaining list.
141 141
142 142 **Errors when downloading a bundle will fail the entire clone operation:
143 143 clients do not automatically fall back to a traditional clone.** The reason
144 144 for this is that if a server is using clone bundles, it is probably doing so
145 145 because the feature is necessary to help it scale. In other words, there
146 146 is an assumption that clone load will be offloaded to another service and
147 147 that the Mercurial server isn't responsible for serving this clone load.
148 148 If that other service experiences issues and clients start mass falling back to
149 149 the original Mercurial server, the added clone load could overwhelm the server
150 150 due to unexpected load and effectively take it offline. Not having clients
151 151 automatically fall back to cloning from the original server mitigates this
152 152 scenario.
153 153
154 154 Because there is no automatic Mercurial server fallback on failure of the
155 155 bundle hosting service, it is important for server operators to view the bundle
156 156 hosting service as an extension of the Mercurial server in terms of
157 157 availability and service level agreements: if the bundle hosting service goes
158 158 down, so does the ability for clients to clone. Note: clients will see a
159 159 message informing them how to bypass the clone bundles facility when a failure
160 160 occurs. So server operators should prepare for some people to follow these
161 161 instructions when a failure occurs, thus driving more load to the original
162 162 Mercurial server when the bundle hosting service fails.
163 163 """
164 164
165 165 from __future__ import absolute_import
166 166
167 167 from mercurial import (
168 168 extensions,
169 169 wireproto,
170 170 )
171 171
172 172 testedwith = 'ships-with-hg-core'
173 173
174 174 def capabilities(orig, repo, proto):
175 175 caps = orig(repo, proto)
176 176
177 177 # Only advertise if a manifest exists. This does add some I/O to requests.
178 178 # But this should be cheaper than a wasted network round trip due to
179 179 # missing file.
180 if repo.opener.exists('clonebundles.manifest'):
180 if repo.vfs.exists('clonebundles.manifest'):
181 181 caps.append('clonebundles')
182 182
183 183 return caps
184 184
185 185 def extsetup(ui):
186 186 extensions.wrapfunction(wireproto, '_capabilities', capabilities)
@@ -1,1050 +1,1050
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import itertools
12 12 import os
13 13 import tempfile
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 bin,
18 18 hex,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 encoding,
25 25 error,
26 26 exchange,
27 27 peer,
28 28 pushkey as pushkeymod,
29 29 pycompat,
30 30 streamclone,
31 31 util,
32 32 )
33 33
34 34 urlerr = util.urlerr
35 35 urlreq = util.urlreq
36 36
37 37 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
38 38 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
39 39 'IncompatibleClient')
40 40 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
41 41
42 42 class abstractserverproto(object):
43 43 """abstract class that summarizes the protocol API
44 44
45 45 Used as reference and documentation.
46 46 """
47 47
48 48 def getargs(self, args):
49 49 """return the value for arguments in <args>
50 50
51 51 returns a list of values (same order as <args>)"""
52 52 raise NotImplementedError()
53 53
54 54 def getfile(self, fp):
55 55 """write the whole content of a file into a file like object
56 56
57 57 The file is in the form::
58 58
59 59 (<chunk-size>\n<chunk>)+0\n
60 60
61 61 chunk size is the ascii version of the int.
62 62 """
63 63 raise NotImplementedError()
64 64
65 65 def redirect(self):
66 66 """may setup interception for stdout and stderr
67 67
68 68 See also the `restore` method."""
69 69 raise NotImplementedError()
70 70
71 71 # If the `redirect` function does install interception, the `restore`
72 72 # function MUST be defined. If interception is not used, this function
73 73 # MUST NOT be defined.
74 74 #
75 75 # left commented here on purpose
76 76 #
77 77 #def restore(self):
78 78 # """reinstall previous stdout and stderr and return intercepted stdout
79 79 # """
80 80 # raise NotImplementedError()
81 81
82 82 class remotebatch(peer.batcher):
83 83 '''batches the queued calls; uses as few roundtrips as possible'''
84 84 def __init__(self, remote):
85 85 '''remote must support _submitbatch(encbatch) and
86 86 _submitone(op, encargs)'''
87 87 peer.batcher.__init__(self)
88 88 self.remote = remote
89 89 def submit(self):
90 90 req, rsp = [], []
91 91 for name, args, opts, resref in self.calls:
92 92 mtd = getattr(self.remote, name)
93 93 batchablefn = getattr(mtd, 'batchable', None)
94 94 if batchablefn is not None:
95 95 batchable = batchablefn(mtd.im_self, *args, **opts)
96 96 encargsorres, encresref = next(batchable)
97 97 if encresref:
98 98 req.append((name, encargsorres,))
99 99 rsp.append((batchable, encresref, resref,))
100 100 else:
101 101 resref.set(encargsorres)
102 102 else:
103 103 if req:
104 104 self._submitreq(req, rsp)
105 105 req, rsp = [], []
106 106 resref.set(mtd(*args, **opts))
107 107 if req:
108 108 self._submitreq(req, rsp)
109 109 def _submitreq(self, req, rsp):
110 110 encresults = self.remote._submitbatch(req)
111 111 for encres, r in zip(encresults, rsp):
112 112 batchable, encresref, resref = r
113 113 encresref.set(encres)
114 114 resref.set(next(batchable))
115 115
116 116 class remoteiterbatcher(peer.iterbatcher):
117 117 def __init__(self, remote):
118 118 super(remoteiterbatcher, self).__init__()
119 119 self._remote = remote
120 120
121 121 def __getattr__(self, name):
122 122 if not getattr(self._remote, name, False):
123 123 raise AttributeError(
124 124 'Attempted to iterbatch non-batchable call to %r' % name)
125 125 return super(remoteiterbatcher, self).__getattr__(name)
126 126
127 127 def submit(self):
128 128 """Break the batch request into many patch calls and pipeline them.
129 129
130 130 This is mostly valuable over http where request sizes can be
131 131 limited, but can be used in other places as well.
132 132 """
133 133 req, rsp = [], []
134 134 for name, args, opts, resref in self.calls:
135 135 mtd = getattr(self._remote, name)
136 136 batchable = mtd.batchable(mtd.im_self, *args, **opts)
137 137 encargsorres, encresref = next(batchable)
138 138 assert encresref
139 139 req.append((name, encargsorres))
140 140 rsp.append((batchable, encresref))
141 141 if req:
142 142 self._resultiter = self._remote._submitbatch(req)
143 143 self._rsp = rsp
144 144
145 145 def results(self):
146 146 for (batchable, encresref), encres in itertools.izip(
147 147 self._rsp, self._resultiter):
148 148 encresref.set(encres)
149 149 yield next(batchable)
150 150
151 151 # Forward a couple of names from peer to make wireproto interactions
152 152 # slightly more sensible.
153 153 batchable = peer.batchable
154 154 future = peer.future
155 155
156 156 # list of nodes encoding / decoding
157 157
158 158 def decodelist(l, sep=' '):
159 159 if l:
160 160 return map(bin, l.split(sep))
161 161 return []
162 162
163 163 def encodelist(l, sep=' '):
164 164 try:
165 165 return sep.join(map(hex, l))
166 166 except TypeError:
167 167 raise
168 168
169 169 # batched call argument encoding
170 170
171 171 def escapearg(plain):
172 172 return (plain
173 173 .replace(':', ':c')
174 174 .replace(',', ':o')
175 175 .replace(';', ':s')
176 176 .replace('=', ':e'))
177 177
178 178 def unescapearg(escaped):
179 179 return (escaped
180 180 .replace(':e', '=')
181 181 .replace(':s', ';')
182 182 .replace(':o', ',')
183 183 .replace(':c', ':'))
184 184
185 185 def encodebatchcmds(req):
186 186 """Return a ``cmds`` argument value for the ``batch`` command."""
187 187 cmds = []
188 188 for op, argsdict in req:
189 189 # Old servers didn't properly unescape argument names. So prevent
190 190 # the sending of argument names that may not be decoded properly by
191 191 # servers.
192 192 assert all(escapearg(k) == k for k in argsdict)
193 193
194 194 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
195 195 for k, v in argsdict.iteritems())
196 196 cmds.append('%s %s' % (op, args))
197 197
198 198 return ';'.join(cmds)
199 199
200 200 # mapping of options accepted by getbundle and their types
201 201 #
202 202 # Meant to be extended by extensions. It is extensions responsibility to ensure
203 203 # such options are properly processed in exchange.getbundle.
204 204 #
205 205 # supported types are:
206 206 #
207 207 # :nodes: list of binary nodes
208 208 # :csv: list of comma-separated values
209 209 # :scsv: list of comma-separated values return as set
210 210 # :plain: string with no transformation needed.
211 211 gboptsmap = {'heads': 'nodes',
212 212 'common': 'nodes',
213 213 'obsmarkers': 'boolean',
214 214 'bundlecaps': 'scsv',
215 215 'listkeys': 'csv',
216 216 'cg': 'boolean',
217 217 'cbattempted': 'boolean'}
218 218
219 219 # client side
220 220
221 221 class wirepeer(peer.peerrepository):
222 222 """Client-side interface for communicating with a peer repository.
223 223
224 224 Methods commonly call wire protocol commands of the same name.
225 225
226 226 See also httppeer.py and sshpeer.py for protocol-specific
227 227 implementations of this interface.
228 228 """
229 229 def batch(self):
230 230 if self.capable('batch'):
231 231 return remotebatch(self)
232 232 else:
233 233 return peer.localbatch(self)
234 234 def _submitbatch(self, req):
235 235 """run batch request <req> on the server
236 236
237 237 Returns an iterator of the raw responses from the server.
238 238 """
239 239 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
240 240 chunk = rsp.read(1024)
241 241 work = [chunk]
242 242 while chunk:
243 243 while ';' not in chunk and chunk:
244 244 chunk = rsp.read(1024)
245 245 work.append(chunk)
246 246 merged = ''.join(work)
247 247 while ';' in merged:
248 248 one, merged = merged.split(';', 1)
249 249 yield unescapearg(one)
250 250 chunk = rsp.read(1024)
251 251 work = [merged, chunk]
252 252 yield unescapearg(''.join(work))
253 253
254 254 def _submitone(self, op, args):
255 255 return self._call(op, **args)
256 256
257 257 def iterbatch(self):
258 258 return remoteiterbatcher(self)
259 259
260 260 @batchable
261 261 def lookup(self, key):
262 262 self.requirecap('lookup', _('look up remote revision'))
263 263 f = future()
264 264 yield {'key': encoding.fromlocal(key)}, f
265 265 d = f.value
266 266 success, data = d[:-1].split(" ", 1)
267 267 if int(success):
268 268 yield bin(data)
269 269 self._abort(error.RepoError(data))
270 270
271 271 @batchable
272 272 def heads(self):
273 273 f = future()
274 274 yield {}, f
275 275 d = f.value
276 276 try:
277 277 yield decodelist(d[:-1])
278 278 except ValueError:
279 279 self._abort(error.ResponseError(_("unexpected response:"), d))
280 280
281 281 @batchable
282 282 def known(self, nodes):
283 283 f = future()
284 284 yield {'nodes': encodelist(nodes)}, f
285 285 d = f.value
286 286 try:
287 287 yield [bool(int(b)) for b in d]
288 288 except ValueError:
289 289 self._abort(error.ResponseError(_("unexpected response:"), d))
290 290
291 291 @batchable
292 292 def branchmap(self):
293 293 f = future()
294 294 yield {}, f
295 295 d = f.value
296 296 try:
297 297 branchmap = {}
298 298 for branchpart in d.splitlines():
299 299 branchname, branchheads = branchpart.split(' ', 1)
300 300 branchname = encoding.tolocal(urlreq.unquote(branchname))
301 301 branchheads = decodelist(branchheads)
302 302 branchmap[branchname] = branchheads
303 303 yield branchmap
304 304 except TypeError:
305 305 self._abort(error.ResponseError(_("unexpected response:"), d))
306 306
307 307 def branches(self, nodes):
308 308 n = encodelist(nodes)
309 309 d = self._call("branches", nodes=n)
310 310 try:
311 311 br = [tuple(decodelist(b)) for b in d.splitlines()]
312 312 return br
313 313 except ValueError:
314 314 self._abort(error.ResponseError(_("unexpected response:"), d))
315 315
316 316 def between(self, pairs):
317 317 batch = 8 # avoid giant requests
318 318 r = []
319 319 for i in xrange(0, len(pairs), batch):
320 320 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
321 321 d = self._call("between", pairs=n)
322 322 try:
323 323 r.extend(l and decodelist(l) or [] for l in d.splitlines())
324 324 except ValueError:
325 325 self._abort(error.ResponseError(_("unexpected response:"), d))
326 326 return r
327 327
328 328 @batchable
329 329 def pushkey(self, namespace, key, old, new):
330 330 if not self.capable('pushkey'):
331 331 yield False, None
332 332 f = future()
333 333 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
334 334 yield {'namespace': encoding.fromlocal(namespace),
335 335 'key': encoding.fromlocal(key),
336 336 'old': encoding.fromlocal(old),
337 337 'new': encoding.fromlocal(new)}, f
338 338 d = f.value
339 339 d, output = d.split('\n', 1)
340 340 try:
341 341 d = bool(int(d))
342 342 except ValueError:
343 343 raise error.ResponseError(
344 344 _('push failed (unexpected response):'), d)
345 345 for l in output.splitlines(True):
346 346 self.ui.status(_('remote: '), l)
347 347 yield d
348 348
349 349 @batchable
350 350 def listkeys(self, namespace):
351 351 if not self.capable('pushkey'):
352 352 yield {}, None
353 353 f = future()
354 354 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
355 355 yield {'namespace': encoding.fromlocal(namespace)}, f
356 356 d = f.value
357 357 self.ui.debug('received listkey for "%s": %i bytes\n'
358 358 % (namespace, len(d)))
359 359 yield pushkeymod.decodekeys(d)
360 360
361 361 def stream_out(self):
362 362 return self._callstream('stream_out')
363 363
364 364 def changegroup(self, nodes, kind):
365 365 n = encodelist(nodes)
366 366 f = self._callcompressable("changegroup", roots=n)
367 367 return changegroupmod.cg1unpacker(f, 'UN')
368 368
369 369 def changegroupsubset(self, bases, heads, kind):
370 370 self.requirecap('changegroupsubset', _('look up remote changes'))
371 371 bases = encodelist(bases)
372 372 heads = encodelist(heads)
373 373 f = self._callcompressable("changegroupsubset",
374 374 bases=bases, heads=heads)
375 375 return changegroupmod.cg1unpacker(f, 'UN')
376 376
377 377 def getbundle(self, source, **kwargs):
378 378 self.requirecap('getbundle', _('look up remote changes'))
379 379 opts = {}
380 380 bundlecaps = kwargs.get('bundlecaps')
381 381 if bundlecaps is not None:
382 382 kwargs['bundlecaps'] = sorted(bundlecaps)
383 383 else:
384 384 bundlecaps = () # kwargs could have it to None
385 385 for key, value in kwargs.iteritems():
386 386 if value is None:
387 387 continue
388 388 keytype = gboptsmap.get(key)
389 389 if keytype is None:
390 390 assert False, 'unexpected'
391 391 elif keytype == 'nodes':
392 392 value = encodelist(value)
393 393 elif keytype in ('csv', 'scsv'):
394 394 value = ','.join(value)
395 395 elif keytype == 'boolean':
396 396 value = '%i' % bool(value)
397 397 elif keytype != 'plain':
398 398 raise KeyError('unknown getbundle option type %s'
399 399 % keytype)
400 400 opts[key] = value
401 401 f = self._callcompressable("getbundle", **opts)
402 402 if any((cap.startswith('HG2') for cap in bundlecaps)):
403 403 return bundle2.getunbundler(self.ui, f)
404 404 else:
405 405 return changegroupmod.cg1unpacker(f, 'UN')
406 406
407 407 def unbundle(self, cg, heads, url):
408 408 '''Send cg (a readable file-like object representing the
409 409 changegroup to push, typically a chunkbuffer object) to the
410 410 remote server as a bundle.
411 411
412 412 When pushing a bundle10 stream, return an integer indicating the
413 413 result of the push (see localrepository.addchangegroup()).
414 414
415 415 When pushing a bundle20 stream, return a bundle20 stream.
416 416
417 417 `url` is the url the client thinks it's pushing to, which is
418 418 visible to hooks.
419 419 '''
420 420
421 421 if heads != ['force'] and self.capable('unbundlehash'):
422 422 heads = encodelist(['hashed',
423 423 hashlib.sha1(''.join(sorted(heads))).digest()])
424 424 else:
425 425 heads = encodelist(heads)
426 426
427 427 if util.safehasattr(cg, 'deltaheader'):
428 428 # this a bundle10, do the old style call sequence
429 429 ret, output = self._callpush("unbundle", cg, heads=heads)
430 430 if ret == "":
431 431 raise error.ResponseError(
432 432 _('push failed:'), output)
433 433 try:
434 434 ret = int(ret)
435 435 except ValueError:
436 436 raise error.ResponseError(
437 437 _('push failed (unexpected response):'), ret)
438 438
439 439 for l in output.splitlines(True):
440 440 self.ui.status(_('remote: '), l)
441 441 else:
442 442 # bundle2 push. Send a stream, fetch a stream.
443 443 stream = self._calltwowaystream('unbundle', cg, heads=heads)
444 444 ret = bundle2.getunbundler(self.ui, stream)
445 445 return ret
446 446
447 447 def debugwireargs(self, one, two, three=None, four=None, five=None):
448 448 # don't pass optional arguments left at their default value
449 449 opts = {}
450 450 if three is not None:
451 451 opts['three'] = three
452 452 if four is not None:
453 453 opts['four'] = four
454 454 return self._call('debugwireargs', one=one, two=two, **opts)
455 455
456 456 def _call(self, cmd, **args):
457 457 """execute <cmd> on the server
458 458
459 459 The command is expected to return a simple string.
460 460
461 461 returns the server reply as a string."""
462 462 raise NotImplementedError()
463 463
464 464 def _callstream(self, cmd, **args):
465 465 """execute <cmd> on the server
466 466
467 467 The command is expected to return a stream. Note that if the
468 468 command doesn't return a stream, _callstream behaves
469 469 differently for ssh and http peers.
470 470
471 471 returns the server reply as a file like object.
472 472 """
473 473 raise NotImplementedError()
474 474
475 475 def _callcompressable(self, cmd, **args):
476 476 """execute <cmd> on the server
477 477
478 478 The command is expected to return a stream.
479 479
480 480 The stream may have been compressed in some implementations. This
481 481 function takes care of the decompression. This is the only difference
482 482 with _callstream.
483 483
484 484 returns the server reply as a file like object.
485 485 """
486 486 raise NotImplementedError()
487 487
488 488 def _callpush(self, cmd, fp, **args):
489 489 """execute a <cmd> on server
490 490
491 491 The command is expected to be related to a push. Push has a special
492 492 return method.
493 493
494 494 returns the server reply as a (ret, output) tuple. ret is either
495 495 empty (error) or a stringified int.
496 496 """
497 497 raise NotImplementedError()
498 498
499 499 def _calltwowaystream(self, cmd, fp, **args):
500 500 """execute <cmd> on server
501 501
502 502 The command will send a stream to the server and get a stream in reply.
503 503 """
504 504 raise NotImplementedError()
505 505
506 506 def _abort(self, exception):
507 507 """clearly abort the wire protocol connection and raise the exception
508 508 """
509 509 raise NotImplementedError()
510 510
511 511 # server side
512 512
513 513 # wire protocol command can either return a string or one of these classes.
514 514 class streamres(object):
515 515 """wireproto reply: binary stream
516 516
517 517 The call was successful and the result is a stream.
518 518
519 519 Accepts either a generator or an object with a ``read(size)`` method.
520 520
521 521 ``v1compressible`` indicates whether this data can be compressed to
522 522 "version 1" clients (technically: HTTP peers using
523 523 application/mercurial-0.1 media type). This flag should NOT be used on
524 524 new commands because new clients should support a more modern compression
525 525 mechanism.
526 526 """
527 527 def __init__(self, gen=None, reader=None, v1compressible=False):
528 528 self.gen = gen
529 529 self.reader = reader
530 530 self.v1compressible = v1compressible
531 531
532 532 class pushres(object):
533 533 """wireproto reply: success with simple integer return
534 534
535 535 The call was successful and returned an integer contained in `self.res`.
536 536 """
537 537 def __init__(self, res):
538 538 self.res = res
539 539
540 540 class pusherr(object):
541 541 """wireproto reply: failure
542 542
543 543 The call failed. The `self.res` attribute contains the error message.
544 544 """
545 545 def __init__(self, res):
546 546 self.res = res
547 547
548 548 class ooberror(object):
549 549 """wireproto reply: failure of a batch of operation
550 550
551 551 Something failed during a batch call. The error message is stored in
552 552 `self.message`.
553 553 """
554 554 def __init__(self, message):
555 555 self.message = message
556 556
557 557 def getdispatchrepo(repo, proto, command):
558 558 """Obtain the repo used for processing wire protocol commands.
559 559
560 560 The intent of this function is to serve as a monkeypatch point for
561 561 extensions that need commands to operate on different repo views under
562 562 specialized circumstances.
563 563 """
564 564 return repo.filtered('served')
565 565
566 566 def dispatch(repo, proto, command):
567 567 repo = getdispatchrepo(repo, proto, command)
568 568 func, spec = commands[command]
569 569 args = proto.getargs(spec)
570 570 return func(repo, proto, *args)
571 571
572 572 def options(cmd, keys, others):
573 573 opts = {}
574 574 for k in keys:
575 575 if k in others:
576 576 opts[k] = others[k]
577 577 del others[k]
578 578 if others:
579 579 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
580 580 % (cmd, ",".join(others)))
581 581 return opts
582 582
583 583 def bundle1allowed(repo, action):
584 584 """Whether a bundle1 operation is allowed from the server.
585 585
586 586 Priority is:
587 587
588 588 1. server.bundle1gd.<action> (if generaldelta active)
589 589 2. server.bundle1.<action>
590 590 3. server.bundle1gd (if generaldelta active)
591 591 4. server.bundle1
592 592 """
593 593 ui = repo.ui
594 594 gd = 'generaldelta' in repo.requirements
595 595
596 596 if gd:
597 597 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
598 598 if v is not None:
599 599 return v
600 600
601 601 v = ui.configbool('server', 'bundle1.%s' % action, None)
602 602 if v is not None:
603 603 return v
604 604
605 605 if gd:
606 606 v = ui.configbool('server', 'bundle1gd', None)
607 607 if v is not None:
608 608 return v
609 609
610 610 return ui.configbool('server', 'bundle1', True)
611 611
612 612 def supportedcompengines(ui, proto, role):
613 613 """Obtain the list of supported compression engines for a request."""
614 614 assert role in (util.CLIENTROLE, util.SERVERROLE)
615 615
616 616 compengines = util.compengines.supportedwireengines(role)
617 617
618 618 # Allow config to override default list and ordering.
619 619 if role == util.SERVERROLE:
620 620 configengines = ui.configlist('server', 'compressionengines')
621 621 config = 'server.compressionengines'
622 622 else:
623 623 # This is currently implemented mainly to facilitate testing. In most
624 624 # cases, the server should be in charge of choosing a compression engine
625 625 # because a server has the most to lose from a sub-optimal choice. (e.g.
626 626 # CPU DoS due to an expensive engine or a network DoS due to poor
627 627 # compression ratio).
628 628 configengines = ui.configlist('experimental',
629 629 'clientcompressionengines')
630 630 config = 'experimental.clientcompressionengines'
631 631
632 632 # No explicit config. Filter out the ones that aren't supposed to be
633 633 # advertised and return default ordering.
634 634 if not configengines:
635 635 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
636 636 return [e for e in compengines
637 637 if getattr(e.wireprotosupport(), attr) > 0]
638 638
639 639 # If compression engines are listed in the config, assume there is a good
640 640 # reason for it (like server operators wanting to achieve specific
641 641 # performance characteristics). So fail fast if the config references
642 642 # unusable compression engines.
643 643 validnames = set(e.name() for e in compengines)
644 644 invalidnames = set(e for e in configengines if e not in validnames)
645 645 if invalidnames:
646 646 raise error.Abort(_('invalid compression engine defined in %s: %s') %
647 647 (config, ', '.join(sorted(invalidnames))))
648 648
649 649 compengines = [e for e in compengines if e.name() in configengines]
650 650 compengines = sorted(compengines,
651 651 key=lambda e: configengines.index(e.name()))
652 652
653 653 if not compengines:
654 654 raise error.Abort(_('%s config option does not specify any known '
655 655 'compression engines') % config,
656 656 hint=_('usable compression engines: %s') %
657 657 ', '.sorted(validnames))
658 658
659 659 return compengines
660 660
661 661 # list of commands
662 662 commands = {}
663 663
664 664 def wireprotocommand(name, args=''):
665 665 """decorator for wire protocol command"""
666 666 def register(func):
667 667 commands[name] = (func, args)
668 668 return func
669 669 return register
670 670
671 671 @wireprotocommand('batch', 'cmds *')
672 672 def batch(repo, proto, cmds, others):
673 673 repo = repo.filtered("served")
674 674 res = []
675 675 for pair in cmds.split(';'):
676 676 op, args = pair.split(' ', 1)
677 677 vals = {}
678 678 for a in args.split(','):
679 679 if a:
680 680 n, v = a.split('=')
681 681 vals[unescapearg(n)] = unescapearg(v)
682 682 func, spec = commands[op]
683 683 if spec:
684 684 keys = spec.split()
685 685 data = {}
686 686 for k in keys:
687 687 if k == '*':
688 688 star = {}
689 689 for key in vals.keys():
690 690 if key not in keys:
691 691 star[key] = vals[key]
692 692 data['*'] = star
693 693 else:
694 694 data[k] = vals[k]
695 695 result = func(repo, proto, *[data[k] for k in keys])
696 696 else:
697 697 result = func(repo, proto)
698 698 if isinstance(result, ooberror):
699 699 return result
700 700 res.append(escapearg(result))
701 701 return ';'.join(res)
702 702
703 703 @wireprotocommand('between', 'pairs')
704 704 def between(repo, proto, pairs):
705 705 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
706 706 r = []
707 707 for b in repo.between(pairs):
708 708 r.append(encodelist(b) + "\n")
709 709 return "".join(r)
710 710
711 711 @wireprotocommand('branchmap')
712 712 def branchmap(repo, proto):
713 713 branchmap = repo.branchmap()
714 714 heads = []
715 715 for branch, nodes in branchmap.iteritems():
716 716 branchname = urlreq.quote(encoding.fromlocal(branch))
717 717 branchnodes = encodelist(nodes)
718 718 heads.append('%s %s' % (branchname, branchnodes))
719 719 return '\n'.join(heads)
720 720
721 721 @wireprotocommand('branches', 'nodes')
722 722 def branches(repo, proto, nodes):
723 723 nodes = decodelist(nodes)
724 724 r = []
725 725 for b in repo.branches(nodes):
726 726 r.append(encodelist(b) + "\n")
727 727 return "".join(r)
728 728
729 729 @wireprotocommand('clonebundles', '')
730 730 def clonebundles(repo, proto):
731 731 """Server command for returning info for available bundles to seed clones.
732 732
733 733 Clients will parse this response and determine what bundle to fetch.
734 734
735 735 Extensions may wrap this command to filter or dynamically emit data
736 736 depending on the request. e.g. you could advertise URLs for the closest
737 737 data center given the client's IP address.
738 738 """
739 return repo.opener.tryread('clonebundles.manifest')
739 return repo.vfs.tryread('clonebundles.manifest')
740 740
741 741 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
742 742 'known', 'getbundle', 'unbundlehash', 'batch']
743 743
744 744 def _capabilities(repo, proto):
745 745 """return a list of capabilities for a repo
746 746
747 747 This function exists to allow extensions to easily wrap capabilities
748 748 computation
749 749
750 750 - returns a lists: easy to alter
751 751 - change done here will be propagated to both `capabilities` and `hello`
752 752 command without any other action needed.
753 753 """
754 754 # copy to prevent modification of the global list
755 755 caps = list(wireprotocaps)
756 756 if streamclone.allowservergeneration(repo.ui):
757 757 if repo.ui.configbool('server', 'preferuncompressed', False):
758 758 caps.append('stream-preferred')
759 759 requiredformats = repo.requirements & repo.supportedformats
760 760 # if our local revlogs are just revlogv1, add 'stream' cap
761 761 if not requiredformats - set(('revlogv1',)):
762 762 caps.append('stream')
763 763 # otherwise, add 'streamreqs' detailing our local revlog format
764 764 else:
765 765 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
766 766 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
767 767 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
768 768 caps.append('bundle2=' + urlreq.quote(capsblob))
769 769 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
770 770
771 771 if proto.name == 'http':
772 772 caps.append('httpheader=%d' %
773 773 repo.ui.configint('server', 'maxhttpheaderlen', 1024))
774 774 if repo.ui.configbool('experimental', 'httppostargs', False):
775 775 caps.append('httppostargs')
776 776
777 777 # FUTURE advertise 0.2rx once support is implemented
778 778 # FUTURE advertise minrx and mintx after consulting config option
779 779 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
780 780
781 781 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
782 782 if compengines:
783 783 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
784 784 for e in compengines)
785 785 caps.append('compression=%s' % comptypes)
786 786
787 787 return caps
788 788
789 789 # If you are writing an extension and consider wrapping this function. Wrap
790 790 # `_capabilities` instead.
791 791 @wireprotocommand('capabilities')
792 792 def capabilities(repo, proto):
793 793 return ' '.join(_capabilities(repo, proto))
794 794
795 795 @wireprotocommand('changegroup', 'roots')
796 796 def changegroup(repo, proto, roots):
797 797 nodes = decodelist(roots)
798 798 cg = changegroupmod.changegroup(repo, nodes, 'serve')
799 799 return streamres(reader=cg, v1compressible=True)
800 800
801 801 @wireprotocommand('changegroupsubset', 'bases heads')
802 802 def changegroupsubset(repo, proto, bases, heads):
803 803 bases = decodelist(bases)
804 804 heads = decodelist(heads)
805 805 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
806 806 return streamres(reader=cg, v1compressible=True)
807 807
808 808 @wireprotocommand('debugwireargs', 'one two *')
809 809 def debugwireargs(repo, proto, one, two, others):
810 810 # only accept optional args from the known set
811 811 opts = options('debugwireargs', ['three', 'four'], others)
812 812 return repo.debugwireargs(one, two, **opts)
813 813
814 814 @wireprotocommand('getbundle', '*')
815 815 def getbundle(repo, proto, others):
816 816 opts = options('getbundle', gboptsmap.keys(), others)
817 817 for k, v in opts.iteritems():
818 818 keytype = gboptsmap[k]
819 819 if keytype == 'nodes':
820 820 opts[k] = decodelist(v)
821 821 elif keytype == 'csv':
822 822 opts[k] = list(v.split(','))
823 823 elif keytype == 'scsv':
824 824 opts[k] = set(v.split(','))
825 825 elif keytype == 'boolean':
826 826 # Client should serialize False as '0', which is a non-empty string
827 827 # so it evaluates as a True bool.
828 828 if v == '0':
829 829 opts[k] = False
830 830 else:
831 831 opts[k] = bool(v)
832 832 elif keytype != 'plain':
833 833 raise KeyError('unknown getbundle option type %s'
834 834 % keytype)
835 835
836 836 if not bundle1allowed(repo, 'pull'):
837 837 if not exchange.bundle2requested(opts.get('bundlecaps')):
838 838 if proto.name == 'http':
839 839 return ooberror(bundle2required)
840 840 raise error.Abort(bundle2requiredmain,
841 841 hint=bundle2requiredhint)
842 842
843 843 try:
844 844 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
845 845 except error.Abort as exc:
846 846 # cleanly forward Abort error to the client
847 847 if not exchange.bundle2requested(opts.get('bundlecaps')):
848 848 if proto.name == 'http':
849 849 return ooberror(str(exc) + '\n')
850 850 raise # cannot do better for bundle1 + ssh
851 851 # bundle2 request expect a bundle2 reply
852 852 bundler = bundle2.bundle20(repo.ui)
853 853 manargs = [('message', str(exc))]
854 854 advargs = []
855 855 if exc.hint is not None:
856 856 advargs.append(('hint', exc.hint))
857 857 bundler.addpart(bundle2.bundlepart('error:abort',
858 858 manargs, advargs))
859 859 return streamres(gen=bundler.getchunks(), v1compressible=True)
860 860 return streamres(gen=chunks, v1compressible=True)
861 861
862 862 @wireprotocommand('heads')
863 863 def heads(repo, proto):
864 864 h = repo.heads()
865 865 return encodelist(h) + "\n"
866 866
867 867 @wireprotocommand('hello')
868 868 def hello(repo, proto):
869 869 '''the hello command returns a set of lines describing various
870 870 interesting things about the server, in an RFC822-like format.
871 871 Currently the only one defined is "capabilities", which
872 872 consists of a line in the form:
873 873
874 874 capabilities: space separated list of tokens
875 875 '''
876 876 return "capabilities: %s\n" % (capabilities(repo, proto))
877 877
878 878 @wireprotocommand('listkeys', 'namespace')
879 879 def listkeys(repo, proto, namespace):
880 880 d = repo.listkeys(encoding.tolocal(namespace)).items()
881 881 return pushkeymod.encodekeys(d)
882 882
883 883 @wireprotocommand('lookup', 'key')
884 884 def lookup(repo, proto, key):
885 885 try:
886 886 k = encoding.tolocal(key)
887 887 c = repo[k]
888 888 r = c.hex()
889 889 success = 1
890 890 except Exception as inst:
891 891 r = str(inst)
892 892 success = 0
893 893 return "%s %s\n" % (success, r)
894 894
895 895 @wireprotocommand('known', 'nodes *')
896 896 def known(repo, proto, nodes, others):
897 897 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
898 898
899 899 @wireprotocommand('pushkey', 'namespace key old new')
900 900 def pushkey(repo, proto, namespace, key, old, new):
901 901 # compatibility with pre-1.8 clients which were accidentally
902 902 # sending raw binary nodes rather than utf-8-encoded hex
903 903 if len(new) == 20 and new.encode('string-escape') != new:
904 904 # looks like it could be a binary node
905 905 try:
906 906 new.decode('utf-8')
907 907 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
908 908 except UnicodeDecodeError:
909 909 pass # binary, leave unmodified
910 910 else:
911 911 new = encoding.tolocal(new) # normal path
912 912
913 913 if util.safehasattr(proto, 'restore'):
914 914
915 915 proto.redirect()
916 916
917 917 try:
918 918 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
919 919 encoding.tolocal(old), new) or False
920 920 except error.Abort:
921 921 r = False
922 922
923 923 output = proto.restore()
924 924
925 925 return '%s\n%s' % (int(r), output)
926 926
927 927 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
928 928 encoding.tolocal(old), new)
929 929 return '%s\n' % int(r)
930 930
931 931 @wireprotocommand('stream_out')
932 932 def stream(repo, proto):
933 933 '''If the server supports streaming clone, it advertises the "stream"
934 934 capability with a value representing the version and flags of the repo
935 935 it is serving. Client checks to see if it understands the format.
936 936 '''
937 937 if not streamclone.allowservergeneration(repo.ui):
938 938 return '1\n'
939 939
940 940 def getstream(it):
941 941 yield '0\n'
942 942 for chunk in it:
943 943 yield chunk
944 944
945 945 try:
946 946 # LockError may be raised before the first result is yielded. Don't
947 947 # emit output until we're sure we got the lock successfully.
948 948 it = streamclone.generatev1wireproto(repo)
949 949 return streamres(gen=getstream(it))
950 950 except error.LockError:
951 951 return '2\n'
952 952
953 953 @wireprotocommand('unbundle', 'heads')
954 954 def unbundle(repo, proto, heads):
955 955 their_heads = decodelist(heads)
956 956
957 957 try:
958 958 proto.redirect()
959 959
960 960 exchange.check_heads(repo, their_heads, 'preparing changes')
961 961
962 962 # write bundle data to temporary file because it can be big
963 963 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
964 964 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
965 965 r = 0
966 966 try:
967 967 proto.getfile(fp)
968 968 fp.seek(0)
969 969 gen = exchange.readbundle(repo.ui, fp, None)
970 970 if (isinstance(gen, changegroupmod.cg1unpacker)
971 971 and not bundle1allowed(repo, 'push')):
972 972 if proto.name == 'http':
973 973 # need to special case http because stderr do not get to
974 974 # the http client on failed push so we need to abuse some
975 975 # other error type to make sure the message get to the
976 976 # user.
977 977 return ooberror(bundle2required)
978 978 raise error.Abort(bundle2requiredmain,
979 979 hint=bundle2requiredhint)
980 980
981 981 r = exchange.unbundle(repo, gen, their_heads, 'serve',
982 982 proto._client())
983 983 if util.safehasattr(r, 'addpart'):
984 984 # The return looks streamable, we are in the bundle2 case and
985 985 # should return a stream.
986 986 return streamres(gen=r.getchunks())
987 987 return pushres(r)
988 988
989 989 finally:
990 990 fp.close()
991 991 os.unlink(tempname)
992 992
993 993 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
994 994 # handle non-bundle2 case first
995 995 if not getattr(exc, 'duringunbundle2', False):
996 996 try:
997 997 raise
998 998 except error.Abort:
999 999 # The old code we moved used util.stderr directly.
1000 1000 # We did not change it to minimise code change.
1001 1001 # This need to be moved to something proper.
1002 1002 # Feel free to do it.
1003 1003 util.stderr.write("abort: %s\n" % exc)
1004 1004 if exc.hint is not None:
1005 1005 util.stderr.write("(%s)\n" % exc.hint)
1006 1006 return pushres(0)
1007 1007 except error.PushRaced:
1008 1008 return pusherr(str(exc))
1009 1009
1010 1010 bundler = bundle2.bundle20(repo.ui)
1011 1011 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1012 1012 bundler.addpart(out)
1013 1013 try:
1014 1014 try:
1015 1015 raise
1016 1016 except error.PushkeyFailed as exc:
1017 1017 # check client caps
1018 1018 remotecaps = getattr(exc, '_replycaps', None)
1019 1019 if (remotecaps is not None
1020 1020 and 'pushkey' not in remotecaps.get('error', ())):
1021 1021 # no support remote side, fallback to Abort handler.
1022 1022 raise
1023 1023 part = bundler.newpart('error:pushkey')
1024 1024 part.addparam('in-reply-to', exc.partid)
1025 1025 if exc.namespace is not None:
1026 1026 part.addparam('namespace', exc.namespace, mandatory=False)
1027 1027 if exc.key is not None:
1028 1028 part.addparam('key', exc.key, mandatory=False)
1029 1029 if exc.new is not None:
1030 1030 part.addparam('new', exc.new, mandatory=False)
1031 1031 if exc.old is not None:
1032 1032 part.addparam('old', exc.old, mandatory=False)
1033 1033 if exc.ret is not None:
1034 1034 part.addparam('ret', exc.ret, mandatory=False)
1035 1035 except error.BundleValueError as exc:
1036 1036 errpart = bundler.newpart('error:unsupportedcontent')
1037 1037 if exc.parttype is not None:
1038 1038 errpart.addparam('parttype', exc.parttype)
1039 1039 if exc.params:
1040 1040 errpart.addparam('params', '\0'.join(exc.params))
1041 1041 except error.Abort as exc:
1042 1042 manargs = [('message', str(exc))]
1043 1043 advargs = []
1044 1044 if exc.hint is not None:
1045 1045 advargs.append(('hint', exc.hint))
1046 1046 bundler.addpart(bundle2.bundlepart('error:abort',
1047 1047 manargs, advargs))
1048 1048 except error.PushRaced as exc:
1049 1049 bundler.newpart('error:pushraced', [('message', str(exc))])
1050 1050 return streamres(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now