##// END OF EJS Templates
streamclone: move wire protocol status code from wireproto command...
Gregory Szorc -
r35507:ded3a63f default
parent child Browse files
Show More
@@ -1,414 +1,430 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import struct
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 branchmap,
15 15 error,
16 16 phases,
17 17 store,
18 18 util,
19 19 )
20 20
21 21 def canperformstreamclone(pullop, bailifbundle2supported=False):
22 22 """Whether it is possible to perform a streaming clone as part of pull.
23 23
24 24 ``bailifbundle2supported`` will cause the function to return False if
25 25 bundle2 stream clones are supported. It should only be called by the
26 26 legacy stream clone code path.
27 27
28 28 Returns a tuple of (supported, requirements). ``supported`` is True if
29 29 streaming clone is supported and False otherwise. ``requirements`` is
30 30 a set of repo requirements from the remote, or ``None`` if stream clone
31 31 isn't supported.
32 32 """
33 33 repo = pullop.repo
34 34 remote = pullop.remote
35 35
36 36 bundle2supported = False
37 37 if pullop.canusebundle2:
38 38 if 'v1' in pullop.remotebundle2caps.get('stream', []):
39 39 bundle2supported = True
40 40 # else
41 41 # Server doesn't support bundle2 stream clone or doesn't support
42 42 # the versions we support. Fall back and possibly allow legacy.
43 43
44 44 # Ensures legacy code path uses available bundle2.
45 45 if bailifbundle2supported and bundle2supported:
46 46 return False, None
47 47 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
48 48 #elif not bailifbundle2supported and not bundle2supported:
49 49 # return False, None
50 50
51 51 # Streaming clone only works on empty repositories.
52 52 if len(repo):
53 53 return False, None
54 54
55 55 # Streaming clone only works if all data is being requested.
56 56 if pullop.heads:
57 57 return False, None
58 58
59 59 streamrequested = pullop.streamclonerequested
60 60
61 61 # If we don't have a preference, let the server decide for us. This
62 62 # likely only comes into play in LANs.
63 63 if streamrequested is None:
64 64 # The server can advertise whether to prefer streaming clone.
65 65 streamrequested = remote.capable('stream-preferred')
66 66
67 67 if not streamrequested:
68 68 return False, None
69 69
70 70 # In order for stream clone to work, the client has to support all the
71 71 # requirements advertised by the server.
72 72 #
73 73 # The server advertises its requirements via the "stream" and "streamreqs"
74 74 # capability. "stream" (a value-less capability) is advertised if and only
75 75 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
76 76 # is advertised and contains a comma-delimited list of requirements.
77 77 requirements = set()
78 78 if remote.capable('stream'):
79 79 requirements.add('revlogv1')
80 80 else:
81 81 streamreqs = remote.capable('streamreqs')
82 82 # This is weird and shouldn't happen with modern servers.
83 83 if not streamreqs:
84 84 pullop.repo.ui.warn(_(
85 85 'warning: stream clone requested but server has them '
86 86 'disabled\n'))
87 87 return False, None
88 88
89 89 streamreqs = set(streamreqs.split(','))
90 90 # Server requires something we don't support. Bail.
91 91 missingreqs = streamreqs - repo.supportedformats
92 92 if missingreqs:
93 93 pullop.repo.ui.warn(_(
94 94 'warning: stream clone requested but client is missing '
95 95 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
96 96 pullop.repo.ui.warn(
97 97 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
98 98 'for more information)\n'))
99 99 return False, None
100 100 requirements = streamreqs
101 101
102 102 return True, requirements
103 103
104 104 def maybeperformlegacystreamclone(pullop):
105 105 """Possibly perform a legacy stream clone operation.
106 106
107 107 Legacy stream clones are performed as part of pull but before all other
108 108 operations.
109 109
110 110 A legacy stream clone will not be performed if a bundle2 stream clone is
111 111 supported.
112 112 """
113 113 supported, requirements = canperformstreamclone(pullop)
114 114
115 115 if not supported:
116 116 return
117 117
118 118 repo = pullop.repo
119 119 remote = pullop.remote
120 120
121 121 # Save remote branchmap. We will use it later to speed up branchcache
122 122 # creation.
123 123 rbranchmap = None
124 124 if remote.capable('branchmap'):
125 125 rbranchmap = remote.branchmap()
126 126
127 127 repo.ui.status(_('streaming all changes\n'))
128 128
129 129 fp = remote.stream_out()
130 130 l = fp.readline()
131 131 try:
132 132 resp = int(l)
133 133 except ValueError:
134 134 raise error.ResponseError(
135 135 _('unexpected response from remote server:'), l)
136 136 if resp == 1:
137 137 raise error.Abort(_('operation forbidden by server'))
138 138 elif resp == 2:
139 139 raise error.Abort(_('locking the remote repository failed'))
140 140 elif resp != 0:
141 141 raise error.Abort(_('the server sent an unknown error code'))
142 142
143 143 l = fp.readline()
144 144 try:
145 145 filecount, bytecount = map(int, l.split(' ', 1))
146 146 except (ValueError, TypeError):
147 147 raise error.ResponseError(
148 148 _('unexpected response from remote server:'), l)
149 149
150 150 with repo.lock():
151 151 consumev1(repo, fp, filecount, bytecount)
152 152
153 153 # new requirements = old non-format requirements +
154 154 # new format-related remote requirements
155 155 # requirements from the streamed-in repository
156 156 repo.requirements = requirements | (
157 157 repo.requirements - repo.supportedformats)
158 158 repo._applyopenerreqs()
159 159 repo._writerequirements()
160 160
161 161 if rbranchmap:
162 162 branchmap.replacecache(repo, rbranchmap)
163 163
164 164 repo.invalidate()
165 165
166 166 def allowservergeneration(repo):
167 167 """Whether streaming clones are allowed from the server."""
168 168 if not repo.ui.configbool('server', 'uncompressed', untrusted=True):
169 169 return False
170 170
171 171 # The way stream clone works makes it impossible to hide secret changesets.
172 172 # So don't allow this by default.
173 173 secret = phases.hassecret(repo)
174 174 if secret:
175 175 return repo.ui.configbool('server', 'uncompressedallowsecret')
176 176
177 177 return True
178 178
179 179 # This is it's own function so extensions can override it.
180 180 def _walkstreamfiles(repo):
181 181 return repo.store.walk()
182 182
183 183 def generatev1(repo):
184 184 """Emit content for version 1 of a streaming clone.
185 185
186 186 This returns a 3-tuple of (file count, byte size, data iterator).
187 187
188 188 The data iterator consists of N entries for each file being transferred.
189 189 Each file entry starts as a line with the file name and integer size
190 190 delimited by a null byte.
191 191
192 192 The raw file data follows. Following the raw file data is the next file
193 193 entry, or EOF.
194 194
195 195 When used on the wire protocol, an additional line indicating protocol
196 196 success will be prepended to the stream. This function is not responsible
197 197 for adding it.
198 198
199 199 This function will obtain a repository lock to ensure a consistent view of
200 200 the store is captured. It therefore may raise LockError.
201 201 """
202 202 entries = []
203 203 total_bytes = 0
204 204 # Get consistent snapshot of repo, lock during scan.
205 205 with repo.lock():
206 206 repo.ui.debug('scanning\n')
207 207 for name, ename, size in _walkstreamfiles(repo):
208 208 if size:
209 209 entries.append((name, size))
210 210 total_bytes += size
211 211
212 212 repo.ui.debug('%d files, %d bytes to transfer\n' %
213 213 (len(entries), total_bytes))
214 214
215 215 svfs = repo.svfs
216 216 debugflag = repo.ui.debugflag
217 217
218 218 def emitrevlogdata():
219 219 for name, size in entries:
220 220 if debugflag:
221 221 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
222 222 # partially encode name over the wire for backwards compat
223 223 yield '%s\0%d\n' % (store.encodedir(name), size)
224 224 # auditing at this stage is both pointless (paths are already
225 225 # trusted by the local repo) and expensive
226 226 with svfs(name, 'rb', auditpath=False) as fp:
227 227 if size <= 65536:
228 228 yield fp.read(size)
229 229 else:
230 230 for chunk in util.filechunkiter(fp, limit=size):
231 231 yield chunk
232 232
233 233 return len(entries), total_bytes, emitrevlogdata()
234 234
235 235 def generatev1wireproto(repo):
236 236 """Emit content for version 1 of streaming clone suitable for the wire.
237 237
238 This is the data output from ``generatev1()`` with a header line
239 indicating file count and byte size.
238 This is the data output from ``generatev1()`` with 2 header lines. The
239 first line indicates overall success. The 2nd contains the file count and
240 byte size of payload.
241
242 The success line contains "0" for success, "1" for stream generation not
243 allowed, and "2" for error locking the repository (possibly indicating
244 a permissions error for the server process).
240 245 """
246 if not allowservergeneration(repo):
247 yield '1\n'
248 return
249
250 try:
241 251 filecount, bytecount, it = generatev1(repo)
252 except error.LockError:
253 yield '2\n'
254 return
255
256 # Indicates successful response.
257 yield '0\n'
242 258 yield '%d %d\n' % (filecount, bytecount)
243 259 for chunk in it:
244 260 yield chunk
245 261
246 262 def generatebundlev1(repo, compression='UN'):
247 263 """Emit content for version 1 of a stream clone bundle.
248 264
249 265 The first 4 bytes of the output ("HGS1") denote this as stream clone
250 266 bundle version 1.
251 267
252 268 The next 2 bytes indicate the compression type. Only "UN" is currently
253 269 supported.
254 270
255 271 The next 16 bytes are two 64-bit big endian unsigned integers indicating
256 272 file count and byte count, respectively.
257 273
258 274 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
259 275 of the requirements string, including a trailing \0. The following N bytes
260 276 are the requirements string, which is ASCII containing a comma-delimited
261 277 list of repo requirements that are needed to support the data.
262 278
263 279 The remaining content is the output of ``generatev1()`` (which may be
264 280 compressed in the future).
265 281
266 282 Returns a tuple of (requirements, data generator).
267 283 """
268 284 if compression != 'UN':
269 285 raise ValueError('we do not support the compression argument yet')
270 286
271 287 requirements = repo.requirements & repo.supportedformats
272 288 requires = ','.join(sorted(requirements))
273 289
274 290 def gen():
275 291 yield 'HGS1'
276 292 yield compression
277 293
278 294 filecount, bytecount, it = generatev1(repo)
279 295 repo.ui.status(_('writing %d bytes for %d files\n') %
280 296 (bytecount, filecount))
281 297
282 298 yield struct.pack('>QQ', filecount, bytecount)
283 299 yield struct.pack('>H', len(requires) + 1)
284 300 yield requires + '\0'
285 301
286 302 # This is where we'll add compression in the future.
287 303 assert compression == 'UN'
288 304
289 305 seen = 0
290 306 repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes'))
291 307
292 308 for chunk in it:
293 309 seen += len(chunk)
294 310 repo.ui.progress(_('bundle'), seen, total=bytecount,
295 311 unit=_('bytes'))
296 312 yield chunk
297 313
298 314 repo.ui.progress(_('bundle'), None)
299 315
300 316 return requirements, gen()
301 317
302 318 def consumev1(repo, fp, filecount, bytecount):
303 319 """Apply the contents from version 1 of a streaming clone file handle.
304 320
305 321 This takes the output from "stream_out" and applies it to the specified
306 322 repository.
307 323
308 324 Like "stream_out," the status line added by the wire protocol is not
309 325 handled by this function.
310 326 """
311 327 with repo.lock():
312 328 repo.ui.status(_('%d files to transfer, %s of data\n') %
313 329 (filecount, util.bytecount(bytecount)))
314 330 handled_bytes = 0
315 331 repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes'))
316 332 start = util.timer()
317 333
318 334 # TODO: get rid of (potential) inconsistency
319 335 #
320 336 # If transaction is started and any @filecache property is
321 337 # changed at this point, it causes inconsistency between
322 338 # in-memory cached property and streamclone-ed file on the
323 339 # disk. Nested transaction prevents transaction scope "clone"
324 340 # below from writing in-memory changes out at the end of it,
325 341 # even though in-memory changes are discarded at the end of it
326 342 # regardless of transaction nesting.
327 343 #
328 344 # But transaction nesting can't be simply prohibited, because
329 345 # nesting occurs also in ordinary case (e.g. enabling
330 346 # clonebundles).
331 347
332 348 with repo.transaction('clone'):
333 349 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
334 350 for i in xrange(filecount):
335 351 # XXX doesn't support '\n' or '\r' in filenames
336 352 l = fp.readline()
337 353 try:
338 354 name, size = l.split('\0', 1)
339 355 size = int(size)
340 356 except (ValueError, TypeError):
341 357 raise error.ResponseError(
342 358 _('unexpected response from remote server:'), l)
343 359 if repo.ui.debugflag:
344 360 repo.ui.debug('adding %s (%s)\n' %
345 361 (name, util.bytecount(size)))
346 362 # for backwards compat, name was partially encoded
347 363 path = store.decodedir(name)
348 364 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
349 365 for chunk in util.filechunkiter(fp, limit=size):
350 366 handled_bytes += len(chunk)
351 367 repo.ui.progress(_('clone'), handled_bytes,
352 368 total=bytecount, unit=_('bytes'))
353 369 ofp.write(chunk)
354 370
355 371 # force @filecache properties to be reloaded from
356 372 # streamclone-ed file at next access
357 373 repo.invalidate(clearfilecache=True)
358 374
359 375 elapsed = util.timer() - start
360 376 if elapsed <= 0:
361 377 elapsed = 0.001
362 378 repo.ui.progress(_('clone'), None)
363 379 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
364 380 (util.bytecount(bytecount), elapsed,
365 381 util.bytecount(bytecount / elapsed)))
366 382
367 383 def readbundle1header(fp):
368 384 compression = fp.read(2)
369 385 if compression != 'UN':
370 386 raise error.Abort(_('only uncompressed stream clone bundles are '
371 387 'supported; got %s') % compression)
372 388
373 389 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
374 390 requireslen = struct.unpack('>H', fp.read(2))[0]
375 391 requires = fp.read(requireslen)
376 392
377 393 if not requires.endswith('\0'):
378 394 raise error.Abort(_('malformed stream clone bundle: '
379 395 'requirements not properly encoded'))
380 396
381 397 requirements = set(requires.rstrip('\0').split(','))
382 398
383 399 return filecount, bytecount, requirements
384 400
385 401 def applybundlev1(repo, fp):
386 402 """Apply the content from a stream clone bundle version 1.
387 403
388 404 We assume the 4 byte header has been read and validated and the file handle
389 405 is at the 2 byte compression identifier.
390 406 """
391 407 if len(repo):
392 408 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
393 409 'repo'))
394 410
395 411 filecount, bytecount, requirements = readbundle1header(fp)
396 412 missingreqs = requirements - repo.supportedformats
397 413 if missingreqs:
398 414 raise error.Abort(_('unable to apply stream clone: '
399 415 'unsupported format: %s') %
400 416 ', '.join(sorted(missingreqs)))
401 417
402 418 consumev1(repo, fp, filecount, bytecount)
403 419
404 420 class streamcloneapplier(object):
405 421 """Class to manage applying streaming clone bundles.
406 422
407 423 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
408 424 readers to perform bundle type-specific functionality.
409 425 """
410 426 def __init__(self, fh):
411 427 self._fh = fh
412 428
413 429 def apply(self, repo):
414 430 return applybundlev1(repo, self._fh)
@@ -1,1070 +1,1056 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import os
12 12 import tempfile
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 hex,
18 18 nullid,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 discovery,
25 25 encoding,
26 26 error,
27 27 exchange,
28 28 peer,
29 29 pushkey as pushkeymod,
30 30 pycompat,
31 31 repository,
32 32 streamclone,
33 33 util,
34 34 )
35 35
36 36 urlerr = util.urlerr
37 37 urlreq = util.urlreq
38 38
39 39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
40 40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
41 41 'IncompatibleClient')
42 42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
43 43
44 44 class abstractserverproto(object):
45 45 """abstract class that summarizes the protocol API
46 46
47 47 Used as reference and documentation.
48 48 """
49 49
50 50 def getargs(self, args):
51 51 """return the value for arguments in <args>
52 52
53 53 returns a list of values (same order as <args>)"""
54 54 raise NotImplementedError()
55 55
56 56 def getfile(self, fp):
57 57 """write the whole content of a file into a file like object
58 58
59 59 The file is in the form::
60 60
61 61 (<chunk-size>\n<chunk>)+0\n
62 62
63 63 chunk size is the ascii version of the int.
64 64 """
65 65 raise NotImplementedError()
66 66
67 67 def redirect(self):
68 68 """may setup interception for stdout and stderr
69 69
70 70 See also the `restore` method."""
71 71 raise NotImplementedError()
72 72
73 73 # If the `redirect` function does install interception, the `restore`
74 74 # function MUST be defined. If interception is not used, this function
75 75 # MUST NOT be defined.
76 76 #
77 77 # left commented here on purpose
78 78 #
79 79 #def restore(self):
80 80 # """reinstall previous stdout and stderr and return intercepted stdout
81 81 # """
82 82 # raise NotImplementedError()
83 83
84 84 class remoteiterbatcher(peer.iterbatcher):
85 85 def __init__(self, remote):
86 86 super(remoteiterbatcher, self).__init__()
87 87 self._remote = remote
88 88
89 89 def __getattr__(self, name):
90 90 # Validate this method is batchable, since submit() only supports
91 91 # batchable methods.
92 92 fn = getattr(self._remote, name)
93 93 if not getattr(fn, 'batchable', None):
94 94 raise error.ProgrammingError('Attempted to batch a non-batchable '
95 95 'call to %r' % name)
96 96
97 97 return super(remoteiterbatcher, self).__getattr__(name)
98 98
99 99 def submit(self):
100 100 """Break the batch request into many patch calls and pipeline them.
101 101
102 102 This is mostly valuable over http where request sizes can be
103 103 limited, but can be used in other places as well.
104 104 """
105 105 # 2-tuple of (command, arguments) that represents what will be
106 106 # sent over the wire.
107 107 requests = []
108 108
109 109 # 4-tuple of (command, final future, @batchable generator, remote
110 110 # future).
111 111 results = []
112 112
113 113 for command, args, opts, finalfuture in self.calls:
114 114 mtd = getattr(self._remote, command)
115 115 batchable = mtd.batchable(mtd.__self__, *args, **opts)
116 116
117 117 commandargs, fremote = next(batchable)
118 118 assert fremote
119 119 requests.append((command, commandargs))
120 120 results.append((command, finalfuture, batchable, fremote))
121 121
122 122 if requests:
123 123 self._resultiter = self._remote._submitbatch(requests)
124 124
125 125 self._results = results
126 126
127 127 def results(self):
128 128 for command, finalfuture, batchable, remotefuture in self._results:
129 129 # Get the raw result, set it in the remote future, feed it
130 130 # back into the @batchable generator so it can be decoded, and
131 131 # set the result on the final future to this value.
132 132 remoteresult = next(self._resultiter)
133 133 remotefuture.set(remoteresult)
134 134 finalfuture.set(next(batchable))
135 135
136 136 # Verify our @batchable generators only emit 2 values.
137 137 try:
138 138 next(batchable)
139 139 except StopIteration:
140 140 pass
141 141 else:
142 142 raise error.ProgrammingError('%s @batchable generator emitted '
143 143 'unexpected value count' % command)
144 144
145 145 yield finalfuture.value
146 146
147 147 # Forward a couple of names from peer to make wireproto interactions
148 148 # slightly more sensible.
149 149 batchable = peer.batchable
150 150 future = peer.future
151 151
152 152 # list of nodes encoding / decoding
153 153
154 154 def decodelist(l, sep=' '):
155 155 if l:
156 156 return [bin(v) for v in l.split(sep)]
157 157 return []
158 158
159 159 def encodelist(l, sep=' '):
160 160 try:
161 161 return sep.join(map(hex, l))
162 162 except TypeError:
163 163 raise
164 164
165 165 # batched call argument encoding
166 166
167 167 def escapearg(plain):
168 168 return (plain
169 169 .replace(':', ':c')
170 170 .replace(',', ':o')
171 171 .replace(';', ':s')
172 172 .replace('=', ':e'))
173 173
174 174 def unescapearg(escaped):
175 175 return (escaped
176 176 .replace(':e', '=')
177 177 .replace(':s', ';')
178 178 .replace(':o', ',')
179 179 .replace(':c', ':'))
180 180
181 181 def encodebatchcmds(req):
182 182 """Return a ``cmds`` argument value for the ``batch`` command."""
183 183 cmds = []
184 184 for op, argsdict in req:
185 185 # Old servers didn't properly unescape argument names. So prevent
186 186 # the sending of argument names that may not be decoded properly by
187 187 # servers.
188 188 assert all(escapearg(k) == k for k in argsdict)
189 189
190 190 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
191 191 for k, v in argsdict.iteritems())
192 192 cmds.append('%s %s' % (op, args))
193 193
194 194 return ';'.join(cmds)
195 195
196 196 # mapping of options accepted by getbundle and their types
197 197 #
198 198 # Meant to be extended by extensions. It is extensions responsibility to ensure
199 199 # such options are properly processed in exchange.getbundle.
200 200 #
201 201 # supported types are:
202 202 #
203 203 # :nodes: list of binary nodes
204 204 # :csv: list of comma-separated values
205 205 # :scsv: list of comma-separated values return as set
206 206 # :plain: string with no transformation needed.
207 207 gboptsmap = {'heads': 'nodes',
208 208 'bookmarks': 'boolean',
209 209 'common': 'nodes',
210 210 'obsmarkers': 'boolean',
211 211 'phases': 'boolean',
212 212 'bundlecaps': 'scsv',
213 213 'listkeys': 'csv',
214 214 'cg': 'boolean',
215 215 'cbattempted': 'boolean'}
216 216
217 217 # client side
218 218
219 219 class wirepeer(repository.legacypeer):
220 220 """Client-side interface for communicating with a peer repository.
221 221
222 222 Methods commonly call wire protocol commands of the same name.
223 223
224 224 See also httppeer.py and sshpeer.py for protocol-specific
225 225 implementations of this interface.
226 226 """
227 227 # Begin of basewirepeer interface.
228 228
229 229 def iterbatch(self):
230 230 return remoteiterbatcher(self)
231 231
232 232 @batchable
233 233 def lookup(self, key):
234 234 self.requirecap('lookup', _('look up remote revision'))
235 235 f = future()
236 236 yield {'key': encoding.fromlocal(key)}, f
237 237 d = f.value
238 238 success, data = d[:-1].split(" ", 1)
239 239 if int(success):
240 240 yield bin(data)
241 241 else:
242 242 self._abort(error.RepoError(data))
243 243
244 244 @batchable
245 245 def heads(self):
246 246 f = future()
247 247 yield {}, f
248 248 d = f.value
249 249 try:
250 250 yield decodelist(d[:-1])
251 251 except ValueError:
252 252 self._abort(error.ResponseError(_("unexpected response:"), d))
253 253
254 254 @batchable
255 255 def known(self, nodes):
256 256 f = future()
257 257 yield {'nodes': encodelist(nodes)}, f
258 258 d = f.value
259 259 try:
260 260 yield [bool(int(b)) for b in d]
261 261 except ValueError:
262 262 self._abort(error.ResponseError(_("unexpected response:"), d))
263 263
264 264 @batchable
265 265 def branchmap(self):
266 266 f = future()
267 267 yield {}, f
268 268 d = f.value
269 269 try:
270 270 branchmap = {}
271 271 for branchpart in d.splitlines():
272 272 branchname, branchheads = branchpart.split(' ', 1)
273 273 branchname = encoding.tolocal(urlreq.unquote(branchname))
274 274 branchheads = decodelist(branchheads)
275 275 branchmap[branchname] = branchheads
276 276 yield branchmap
277 277 except TypeError:
278 278 self._abort(error.ResponseError(_("unexpected response:"), d))
279 279
280 280 @batchable
281 281 def listkeys(self, namespace):
282 282 if not self.capable('pushkey'):
283 283 yield {}, None
284 284 f = future()
285 285 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
286 286 yield {'namespace': encoding.fromlocal(namespace)}, f
287 287 d = f.value
288 288 self.ui.debug('received listkey for "%s": %i bytes\n'
289 289 % (namespace, len(d)))
290 290 yield pushkeymod.decodekeys(d)
291 291
292 292 @batchable
293 293 def pushkey(self, namespace, key, old, new):
294 294 if not self.capable('pushkey'):
295 295 yield False, None
296 296 f = future()
297 297 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
298 298 yield {'namespace': encoding.fromlocal(namespace),
299 299 'key': encoding.fromlocal(key),
300 300 'old': encoding.fromlocal(old),
301 301 'new': encoding.fromlocal(new)}, f
302 302 d = f.value
303 303 d, output = d.split('\n', 1)
304 304 try:
305 305 d = bool(int(d))
306 306 except ValueError:
307 307 raise error.ResponseError(
308 308 _('push failed (unexpected response):'), d)
309 309 for l in output.splitlines(True):
310 310 self.ui.status(_('remote: '), l)
311 311 yield d
312 312
313 313 def stream_out(self):
314 314 return self._callstream('stream_out')
315 315
316 316 def getbundle(self, source, **kwargs):
317 317 kwargs = pycompat.byteskwargs(kwargs)
318 318 self.requirecap('getbundle', _('look up remote changes'))
319 319 opts = {}
320 320 bundlecaps = kwargs.get('bundlecaps')
321 321 if bundlecaps is not None:
322 322 kwargs['bundlecaps'] = sorted(bundlecaps)
323 323 else:
324 324 bundlecaps = () # kwargs could have it to None
325 325 for key, value in kwargs.iteritems():
326 326 if value is None:
327 327 continue
328 328 keytype = gboptsmap.get(key)
329 329 if keytype is None:
330 330 raise error.ProgrammingError(
331 331 'Unexpectedly None keytype for key %s' % key)
332 332 elif keytype == 'nodes':
333 333 value = encodelist(value)
334 334 elif keytype in ('csv', 'scsv'):
335 335 value = ','.join(value)
336 336 elif keytype == 'boolean':
337 337 value = '%i' % bool(value)
338 338 elif keytype != 'plain':
339 339 raise KeyError('unknown getbundle option type %s'
340 340 % keytype)
341 341 opts[key] = value
342 342 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
343 343 if any((cap.startswith('HG2') for cap in bundlecaps)):
344 344 return bundle2.getunbundler(self.ui, f)
345 345 else:
346 346 return changegroupmod.cg1unpacker(f, 'UN')
347 347
348 348 def unbundle(self, cg, heads, url):
349 349 '''Send cg (a readable file-like object representing the
350 350 changegroup to push, typically a chunkbuffer object) to the
351 351 remote server as a bundle.
352 352
353 353 When pushing a bundle10 stream, return an integer indicating the
354 354 result of the push (see changegroup.apply()).
355 355
356 356 When pushing a bundle20 stream, return a bundle20 stream.
357 357
358 358 `url` is the url the client thinks it's pushing to, which is
359 359 visible to hooks.
360 360 '''
361 361
362 362 if heads != ['force'] and self.capable('unbundlehash'):
363 363 heads = encodelist(['hashed',
364 364 hashlib.sha1(''.join(sorted(heads))).digest()])
365 365 else:
366 366 heads = encodelist(heads)
367 367
368 368 if util.safehasattr(cg, 'deltaheader'):
369 369 # this a bundle10, do the old style call sequence
370 370 ret, output = self._callpush("unbundle", cg, heads=heads)
371 371 if ret == "":
372 372 raise error.ResponseError(
373 373 _('push failed:'), output)
374 374 try:
375 375 ret = int(ret)
376 376 except ValueError:
377 377 raise error.ResponseError(
378 378 _('push failed (unexpected response):'), ret)
379 379
380 380 for l in output.splitlines(True):
381 381 self.ui.status(_('remote: '), l)
382 382 else:
383 383 # bundle2 push. Send a stream, fetch a stream.
384 384 stream = self._calltwowaystream('unbundle', cg, heads=heads)
385 385 ret = bundle2.getunbundler(self.ui, stream)
386 386 return ret
387 387
388 388 # End of basewirepeer interface.
389 389
390 390 # Begin of baselegacywirepeer interface.
391 391
392 392 def branches(self, nodes):
393 393 n = encodelist(nodes)
394 394 d = self._call("branches", nodes=n)
395 395 try:
396 396 br = [tuple(decodelist(b)) for b in d.splitlines()]
397 397 return br
398 398 except ValueError:
399 399 self._abort(error.ResponseError(_("unexpected response:"), d))
400 400
401 401 def between(self, pairs):
402 402 batch = 8 # avoid giant requests
403 403 r = []
404 404 for i in xrange(0, len(pairs), batch):
405 405 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
406 406 d = self._call("between", pairs=n)
407 407 try:
408 408 r.extend(l and decodelist(l) or [] for l in d.splitlines())
409 409 except ValueError:
410 410 self._abort(error.ResponseError(_("unexpected response:"), d))
411 411 return r
412 412
413 413 def changegroup(self, nodes, kind):
414 414 n = encodelist(nodes)
415 415 f = self._callcompressable("changegroup", roots=n)
416 416 return changegroupmod.cg1unpacker(f, 'UN')
417 417
418 418 def changegroupsubset(self, bases, heads, kind):
419 419 self.requirecap('changegroupsubset', _('look up remote changes'))
420 420 bases = encodelist(bases)
421 421 heads = encodelist(heads)
422 422 f = self._callcompressable("changegroupsubset",
423 423 bases=bases, heads=heads)
424 424 return changegroupmod.cg1unpacker(f, 'UN')
425 425
426 426 # End of baselegacywirepeer interface.
427 427
428 428 def _submitbatch(self, req):
429 429 """run batch request <req> on the server
430 430
431 431 Returns an iterator of the raw responses from the server.
432 432 """
433 433 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
434 434 chunk = rsp.read(1024)
435 435 work = [chunk]
436 436 while chunk:
437 437 while ';' not in chunk and chunk:
438 438 chunk = rsp.read(1024)
439 439 work.append(chunk)
440 440 merged = ''.join(work)
441 441 while ';' in merged:
442 442 one, merged = merged.split(';', 1)
443 443 yield unescapearg(one)
444 444 chunk = rsp.read(1024)
445 445 work = [merged, chunk]
446 446 yield unescapearg(''.join(work))
447 447
448 448 def _submitone(self, op, args):
449 449 return self._call(op, **pycompat.strkwargs(args))
450 450
451 451 def debugwireargs(self, one, two, three=None, four=None, five=None):
452 452 # don't pass optional arguments left at their default value
453 453 opts = {}
454 454 if three is not None:
455 455 opts[r'three'] = three
456 456 if four is not None:
457 457 opts[r'four'] = four
458 458 return self._call('debugwireargs', one=one, two=two, **opts)
459 459
460 460 def _call(self, cmd, **args):
461 461 """execute <cmd> on the server
462 462
463 463 The command is expected to return a simple string.
464 464
465 465 returns the server reply as a string."""
466 466 raise NotImplementedError()
467 467
468 468 def _callstream(self, cmd, **args):
469 469 """execute <cmd> on the server
470 470
471 471 The command is expected to return a stream. Note that if the
472 472 command doesn't return a stream, _callstream behaves
473 473 differently for ssh and http peers.
474 474
475 475 returns the server reply as a file like object.
476 476 """
477 477 raise NotImplementedError()
478 478
479 479 def _callcompressable(self, cmd, **args):
480 480 """execute <cmd> on the server
481 481
482 482 The command is expected to return a stream.
483 483
484 484 The stream may have been compressed in some implementations. This
485 485 function takes care of the decompression. This is the only difference
486 486 with _callstream.
487 487
488 488 returns the server reply as a file like object.
489 489 """
490 490 raise NotImplementedError()
491 491
492 492 def _callpush(self, cmd, fp, **args):
493 493 """execute a <cmd> on server
494 494
495 495 The command is expected to be related to a push. Push has a special
496 496 return method.
497 497
498 498 returns the server reply as a (ret, output) tuple. ret is either
499 499 empty (error) or a stringified int.
500 500 """
501 501 raise NotImplementedError()
502 502
503 503 def _calltwowaystream(self, cmd, fp, **args):
504 504 """execute <cmd> on server
505 505
506 506 The command will send a stream to the server and get a stream in reply.
507 507 """
508 508 raise NotImplementedError()
509 509
510 510 def _abort(self, exception):
511 511 """clearly abort the wire protocol connection and raise the exception
512 512 """
513 513 raise NotImplementedError()
514 514
515 515 # server side
516 516
517 517 # wire protocol command can either return a string or one of these classes.
518 518 class streamres(object):
519 519 """wireproto reply: binary stream
520 520
521 521 The call was successful and the result is a stream.
522 522
523 523 Accepts either a generator or an object with a ``read(size)`` method.
524 524
525 525 ``v1compressible`` indicates whether this data can be compressed to
526 526 "version 1" clients (technically: HTTP peers using
527 527 application/mercurial-0.1 media type). This flag should NOT be used on
528 528 new commands because new clients should support a more modern compression
529 529 mechanism.
530 530 """
531 531 def __init__(self, gen=None, reader=None, v1compressible=False):
532 532 self.gen = gen
533 533 self.reader = reader
534 534 self.v1compressible = v1compressible
535 535
536 536 class pushres(object):
537 537 """wireproto reply: success with simple integer return
538 538
539 539 The call was successful and returned an integer contained in `self.res`.
540 540 """
541 541 def __init__(self, res):
542 542 self.res = res
543 543
544 544 class pusherr(object):
545 545 """wireproto reply: failure
546 546
547 547 The call failed. The `self.res` attribute contains the error message.
548 548 """
549 549 def __init__(self, res):
550 550 self.res = res
551 551
552 552 class ooberror(object):
553 553 """wireproto reply: failure of a batch of operation
554 554
555 555 Something failed during a batch call. The error message is stored in
556 556 `self.message`.
557 557 """
558 558 def __init__(self, message):
559 559 self.message = message
560 560
561 561 def getdispatchrepo(repo, proto, command):
562 562 """Obtain the repo used for processing wire protocol commands.
563 563
564 564 The intent of this function is to serve as a monkeypatch point for
565 565 extensions that need commands to operate on different repo views under
566 566 specialized circumstances.
567 567 """
568 568 return repo.filtered('served')
569 569
570 570 def dispatch(repo, proto, command):
571 571 repo = getdispatchrepo(repo, proto, command)
572 572 func, spec = commands[command]
573 573 args = proto.getargs(spec)
574 574 return func(repo, proto, *args)
575 575
576 576 def options(cmd, keys, others):
577 577 opts = {}
578 578 for k in keys:
579 579 if k in others:
580 580 opts[k] = others[k]
581 581 del others[k]
582 582 if others:
583 583 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
584 584 % (cmd, ",".join(others)))
585 585 return opts
586 586
587 587 def bundle1allowed(repo, action):
588 588 """Whether a bundle1 operation is allowed from the server.
589 589
590 590 Priority is:
591 591
592 592 1. server.bundle1gd.<action> (if generaldelta active)
593 593 2. server.bundle1.<action>
594 594 3. server.bundle1gd (if generaldelta active)
595 595 4. server.bundle1
596 596 """
597 597 ui = repo.ui
598 598 gd = 'generaldelta' in repo.requirements
599 599
600 600 if gd:
601 601 v = ui.configbool('server', 'bundle1gd.%s' % action)
602 602 if v is not None:
603 603 return v
604 604
605 605 v = ui.configbool('server', 'bundle1.%s' % action)
606 606 if v is not None:
607 607 return v
608 608
609 609 if gd:
610 610 v = ui.configbool('server', 'bundle1gd')
611 611 if v is not None:
612 612 return v
613 613
614 614 return ui.configbool('server', 'bundle1')
615 615
616 616 def supportedcompengines(ui, proto, role):
617 617 """Obtain the list of supported compression engines for a request."""
618 618 assert role in (util.CLIENTROLE, util.SERVERROLE)
619 619
620 620 compengines = util.compengines.supportedwireengines(role)
621 621
622 622 # Allow config to override default list and ordering.
623 623 if role == util.SERVERROLE:
624 624 configengines = ui.configlist('server', 'compressionengines')
625 625 config = 'server.compressionengines'
626 626 else:
627 627 # This is currently implemented mainly to facilitate testing. In most
628 628 # cases, the server should be in charge of choosing a compression engine
629 629 # because a server has the most to lose from a sub-optimal choice. (e.g.
630 630 # CPU DoS due to an expensive engine or a network DoS due to poor
631 631 # compression ratio).
632 632 configengines = ui.configlist('experimental',
633 633 'clientcompressionengines')
634 634 config = 'experimental.clientcompressionengines'
635 635
636 636 # No explicit config. Filter out the ones that aren't supposed to be
637 637 # advertised and return default ordering.
638 638 if not configengines:
639 639 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
640 640 return [e for e in compengines
641 641 if getattr(e.wireprotosupport(), attr) > 0]
642 642
643 643 # If compression engines are listed in the config, assume there is a good
644 644 # reason for it (like server operators wanting to achieve specific
645 645 # performance characteristics). So fail fast if the config references
646 646 # unusable compression engines.
647 647 validnames = set(e.name() for e in compengines)
648 648 invalidnames = set(e for e in configengines if e not in validnames)
649 649 if invalidnames:
650 650 raise error.Abort(_('invalid compression engine defined in %s: %s') %
651 651 (config, ', '.join(sorted(invalidnames))))
652 652
653 653 compengines = [e for e in compengines if e.name() in configengines]
654 654 compengines = sorted(compengines,
655 655 key=lambda e: configengines.index(e.name()))
656 656
657 657 if not compengines:
658 658 raise error.Abort(_('%s config option does not specify any known '
659 659 'compression engines') % config,
660 660 hint=_('usable compression engines: %s') %
661 661 ', '.sorted(validnames))
662 662
663 663 return compengines
664 664
665 665 # list of commands
666 666 commands = {}
667 667
668 668 def wireprotocommand(name, args=''):
669 669 """decorator for wire protocol command"""
670 670 def register(func):
671 671 commands[name] = (func, args)
672 672 return func
673 673 return register
674 674
675 675 @wireprotocommand('batch', 'cmds *')
676 676 def batch(repo, proto, cmds, others):
677 677 repo = repo.filtered("served")
678 678 res = []
679 679 for pair in cmds.split(';'):
680 680 op, args = pair.split(' ', 1)
681 681 vals = {}
682 682 for a in args.split(','):
683 683 if a:
684 684 n, v = a.split('=')
685 685 vals[unescapearg(n)] = unescapearg(v)
686 686 func, spec = commands[op]
687 687 if spec:
688 688 keys = spec.split()
689 689 data = {}
690 690 for k in keys:
691 691 if k == '*':
692 692 star = {}
693 693 for key in vals.keys():
694 694 if key not in keys:
695 695 star[key] = vals[key]
696 696 data['*'] = star
697 697 else:
698 698 data[k] = vals[k]
699 699 result = func(repo, proto, *[data[k] for k in keys])
700 700 else:
701 701 result = func(repo, proto)
702 702 if isinstance(result, ooberror):
703 703 return result
704 704 res.append(escapearg(result))
705 705 return ';'.join(res)
706 706
707 707 @wireprotocommand('between', 'pairs')
708 708 def between(repo, proto, pairs):
709 709 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
710 710 r = []
711 711 for b in repo.between(pairs):
712 712 r.append(encodelist(b) + "\n")
713 713 return "".join(r)
714 714
715 715 @wireprotocommand('branchmap')
716 716 def branchmap(repo, proto):
717 717 branchmap = repo.branchmap()
718 718 heads = []
719 719 for branch, nodes in branchmap.iteritems():
720 720 branchname = urlreq.quote(encoding.fromlocal(branch))
721 721 branchnodes = encodelist(nodes)
722 722 heads.append('%s %s' % (branchname, branchnodes))
723 723 return '\n'.join(heads)
724 724
725 725 @wireprotocommand('branches', 'nodes')
726 726 def branches(repo, proto, nodes):
727 727 nodes = decodelist(nodes)
728 728 r = []
729 729 for b in repo.branches(nodes):
730 730 r.append(encodelist(b) + "\n")
731 731 return "".join(r)
732 732
733 733 @wireprotocommand('clonebundles', '')
734 734 def clonebundles(repo, proto):
735 735 """Server command for returning info for available bundles to seed clones.
736 736
737 737 Clients will parse this response and determine what bundle to fetch.
738 738
739 739 Extensions may wrap this command to filter or dynamically emit data
740 740 depending on the request. e.g. you could advertise URLs for the closest
741 741 data center given the client's IP address.
742 742 """
743 743 return repo.vfs.tryread('clonebundles.manifest')
744 744
745 745 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
746 746 'known', 'getbundle', 'unbundlehash', 'batch']
747 747
748 748 def _capabilities(repo, proto):
749 749 """return a list of capabilities for a repo
750 750
751 751 This function exists to allow extensions to easily wrap capabilities
752 752 computation
753 753
754 754 - returns a lists: easy to alter
755 755 - change done here will be propagated to both `capabilities` and `hello`
756 756 command without any other action needed.
757 757 """
758 758 # copy to prevent modification of the global list
759 759 caps = list(wireprotocaps)
760 760 if streamclone.allowservergeneration(repo):
761 761 if repo.ui.configbool('server', 'preferuncompressed'):
762 762 caps.append('stream-preferred')
763 763 requiredformats = repo.requirements & repo.supportedformats
764 764 # if our local revlogs are just revlogv1, add 'stream' cap
765 765 if not requiredformats - {'revlogv1'}:
766 766 caps.append('stream')
767 767 # otherwise, add 'streamreqs' detailing our local revlog format
768 768 else:
769 769 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
770 770 if repo.ui.configbool('experimental', 'bundle2-advertise'):
771 771 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
772 772 caps.append('bundle2=' + urlreq.quote(capsblob))
773 773 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
774 774
775 775 if proto.name == 'http':
776 776 caps.append('httpheader=%d' %
777 777 repo.ui.configint('server', 'maxhttpheaderlen'))
778 778 if repo.ui.configbool('experimental', 'httppostargs'):
779 779 caps.append('httppostargs')
780 780
781 781 # FUTURE advertise 0.2rx once support is implemented
782 782 # FUTURE advertise minrx and mintx after consulting config option
783 783 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
784 784
785 785 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
786 786 if compengines:
787 787 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
788 788 for e in compengines)
789 789 caps.append('compression=%s' % comptypes)
790 790
791 791 return caps
792 792
793 793 # If you are writing an extension and consider wrapping this function. Wrap
794 794 # `_capabilities` instead.
795 795 @wireprotocommand('capabilities')
796 796 def capabilities(repo, proto):
797 797 return ' '.join(_capabilities(repo, proto))
798 798
799 799 @wireprotocommand('changegroup', 'roots')
800 800 def changegroup(repo, proto, roots):
801 801 nodes = decodelist(roots)
802 802 outgoing = discovery.outgoing(repo, missingroots=nodes,
803 803 missingheads=repo.heads())
804 804 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
805 805 return streamres(reader=cg, v1compressible=True)
806 806
807 807 @wireprotocommand('changegroupsubset', 'bases heads')
808 808 def changegroupsubset(repo, proto, bases, heads):
809 809 bases = decodelist(bases)
810 810 heads = decodelist(heads)
811 811 outgoing = discovery.outgoing(repo, missingroots=bases,
812 812 missingheads=heads)
813 813 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
814 814 return streamres(reader=cg, v1compressible=True)
815 815
816 816 @wireprotocommand('debugwireargs', 'one two *')
817 817 def debugwireargs(repo, proto, one, two, others):
818 818 # only accept optional args from the known set
819 819 opts = options('debugwireargs', ['three', 'four'], others)
820 820 return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
821 821
822 822 @wireprotocommand('getbundle', '*')
823 823 def getbundle(repo, proto, others):
824 824 opts = options('getbundle', gboptsmap.keys(), others)
825 825 for k, v in opts.iteritems():
826 826 keytype = gboptsmap[k]
827 827 if keytype == 'nodes':
828 828 opts[k] = decodelist(v)
829 829 elif keytype == 'csv':
830 830 opts[k] = list(v.split(','))
831 831 elif keytype == 'scsv':
832 832 opts[k] = set(v.split(','))
833 833 elif keytype == 'boolean':
834 834 # Client should serialize False as '0', which is a non-empty string
835 835 # so it evaluates as a True bool.
836 836 if v == '0':
837 837 opts[k] = False
838 838 else:
839 839 opts[k] = bool(v)
840 840 elif keytype != 'plain':
841 841 raise KeyError('unknown getbundle option type %s'
842 842 % keytype)
843 843
844 844 if not bundle1allowed(repo, 'pull'):
845 845 if not exchange.bundle2requested(opts.get('bundlecaps')):
846 846 if proto.name == 'http':
847 847 return ooberror(bundle2required)
848 848 raise error.Abort(bundle2requiredmain,
849 849 hint=bundle2requiredhint)
850 850
851 851 try:
852 852 if repo.ui.configbool('server', 'disablefullbundle'):
853 853 # Check to see if this is a full clone.
854 854 clheads = set(repo.changelog.heads())
855 855 heads = set(opts.get('heads', set()))
856 856 common = set(opts.get('common', set()))
857 857 common.discard(nullid)
858 858 if not common and clheads == heads:
859 859 raise error.Abort(
860 860 _('server has pull-based clones disabled'),
861 861 hint=_('remove --pull if specified or upgrade Mercurial'))
862 862
863 863 chunks = exchange.getbundlechunks(repo, 'serve',
864 864 **pycompat.strkwargs(opts))
865 865 except error.Abort as exc:
866 866 # cleanly forward Abort error to the client
867 867 if not exchange.bundle2requested(opts.get('bundlecaps')):
868 868 if proto.name == 'http':
869 869 return ooberror(str(exc) + '\n')
870 870 raise # cannot do better for bundle1 + ssh
871 871 # bundle2 request expect a bundle2 reply
872 872 bundler = bundle2.bundle20(repo.ui)
873 873 manargs = [('message', str(exc))]
874 874 advargs = []
875 875 if exc.hint is not None:
876 876 advargs.append(('hint', exc.hint))
877 877 bundler.addpart(bundle2.bundlepart('error:abort',
878 878 manargs, advargs))
879 879 return streamres(gen=bundler.getchunks(), v1compressible=True)
880 880 return streamres(gen=chunks, v1compressible=True)
881 881
882 882 @wireprotocommand('heads')
883 883 def heads(repo, proto):
884 884 h = repo.heads()
885 885 return encodelist(h) + "\n"
886 886
887 887 @wireprotocommand('hello')
888 888 def hello(repo, proto):
889 889 '''the hello command returns a set of lines describing various
890 890 interesting things about the server, in an RFC822-like format.
891 891 Currently the only one defined is "capabilities", which
892 892 consists of a line in the form:
893 893
894 894 capabilities: space separated list of tokens
895 895 '''
896 896 return "capabilities: %s\n" % (capabilities(repo, proto))
897 897
898 898 @wireprotocommand('listkeys', 'namespace')
899 899 def listkeys(repo, proto, namespace):
900 900 d = repo.listkeys(encoding.tolocal(namespace)).items()
901 901 return pushkeymod.encodekeys(d)
902 902
903 903 @wireprotocommand('lookup', 'key')
904 904 def lookup(repo, proto, key):
905 905 try:
906 906 k = encoding.tolocal(key)
907 907 c = repo[k]
908 908 r = c.hex()
909 909 success = 1
910 910 except Exception as inst:
911 911 r = str(inst)
912 912 success = 0
913 913 return "%d %s\n" % (success, r)
914 914
915 915 @wireprotocommand('known', 'nodes *')
916 916 def known(repo, proto, nodes, others):
917 917 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
918 918
919 919 @wireprotocommand('pushkey', 'namespace key old new')
920 920 def pushkey(repo, proto, namespace, key, old, new):
921 921 # compatibility with pre-1.8 clients which were accidentally
922 922 # sending raw binary nodes rather than utf-8-encoded hex
923 923 if len(new) == 20 and util.escapestr(new) != new:
924 924 # looks like it could be a binary node
925 925 try:
926 926 new.decode('utf-8')
927 927 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
928 928 except UnicodeDecodeError:
929 929 pass # binary, leave unmodified
930 930 else:
931 931 new = encoding.tolocal(new) # normal path
932 932
933 933 if util.safehasattr(proto, 'restore'):
934 934
935 935 proto.redirect()
936 936
937 937 try:
938 938 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
939 939 encoding.tolocal(old), new) or False
940 940 except error.Abort:
941 941 r = False
942 942
943 943 output = proto.restore()
944 944
945 945 return '%s\n%s' % (int(r), output)
946 946
947 947 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
948 948 encoding.tolocal(old), new)
949 949 return '%s\n' % int(r)
950 950
951 951 @wireprotocommand('stream_out')
952 952 def stream(repo, proto):
953 953 '''If the server supports streaming clone, it advertises the "stream"
954 954 capability with a value representing the version and flags of the repo
955 955 it is serving. Client checks to see if it understands the format.
956 956 '''
957 if not streamclone.allowservergeneration(repo):
958 return '1\n'
959
960 def getstream(it):
961 yield '0\n'
962 for chunk in it:
963 yield chunk
964
965 try:
966 # LockError may be raised before the first result is yielded. Don't
967 # emit output until we're sure we got the lock successfully.
968 it = streamclone.generatev1wireproto(repo)
969 return streamres(gen=getstream(it))
970 except error.LockError:
971 return '2\n'
957 return streamres(streamclone.generatev1wireproto(repo))
972 958
973 959 @wireprotocommand('unbundle', 'heads')
974 960 def unbundle(repo, proto, heads):
975 961 their_heads = decodelist(heads)
976 962
977 963 try:
978 964 proto.redirect()
979 965
980 966 exchange.check_heads(repo, their_heads, 'preparing changes')
981 967
982 968 # write bundle data to temporary file because it can be big
983 969 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
984 970 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
985 971 r = 0
986 972 try:
987 973 proto.getfile(fp)
988 974 fp.seek(0)
989 975 gen = exchange.readbundle(repo.ui, fp, None)
990 976 if (isinstance(gen, changegroupmod.cg1unpacker)
991 977 and not bundle1allowed(repo, 'push')):
992 978 if proto.name == 'http':
993 979 # need to special case http because stderr do not get to
994 980 # the http client on failed push so we need to abuse some
995 981 # other error type to make sure the message get to the
996 982 # user.
997 983 return ooberror(bundle2required)
998 984 raise error.Abort(bundle2requiredmain,
999 985 hint=bundle2requiredhint)
1000 986
1001 987 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1002 988 proto._client())
1003 989 if util.safehasattr(r, 'addpart'):
1004 990 # The return looks streamable, we are in the bundle2 case and
1005 991 # should return a stream.
1006 992 return streamres(gen=r.getchunks())
1007 993 return pushres(r)
1008 994
1009 995 finally:
1010 996 fp.close()
1011 997 os.unlink(tempname)
1012 998
1013 999 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1014 1000 # handle non-bundle2 case first
1015 1001 if not getattr(exc, 'duringunbundle2', False):
1016 1002 try:
1017 1003 raise
1018 1004 except error.Abort:
1019 1005 # The old code we moved used util.stderr directly.
1020 1006 # We did not change it to minimise code change.
1021 1007 # This need to be moved to something proper.
1022 1008 # Feel free to do it.
1023 1009 util.stderr.write("abort: %s\n" % exc)
1024 1010 if exc.hint is not None:
1025 1011 util.stderr.write("(%s)\n" % exc.hint)
1026 1012 return pushres(0)
1027 1013 except error.PushRaced:
1028 1014 return pusherr(str(exc))
1029 1015
1030 1016 bundler = bundle2.bundle20(repo.ui)
1031 1017 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1032 1018 bundler.addpart(out)
1033 1019 try:
1034 1020 try:
1035 1021 raise
1036 1022 except error.PushkeyFailed as exc:
1037 1023 # check client caps
1038 1024 remotecaps = getattr(exc, '_replycaps', None)
1039 1025 if (remotecaps is not None
1040 1026 and 'pushkey' not in remotecaps.get('error', ())):
1041 1027 # no support remote side, fallback to Abort handler.
1042 1028 raise
1043 1029 part = bundler.newpart('error:pushkey')
1044 1030 part.addparam('in-reply-to', exc.partid)
1045 1031 if exc.namespace is not None:
1046 1032 part.addparam('namespace', exc.namespace, mandatory=False)
1047 1033 if exc.key is not None:
1048 1034 part.addparam('key', exc.key, mandatory=False)
1049 1035 if exc.new is not None:
1050 1036 part.addparam('new', exc.new, mandatory=False)
1051 1037 if exc.old is not None:
1052 1038 part.addparam('old', exc.old, mandatory=False)
1053 1039 if exc.ret is not None:
1054 1040 part.addparam('ret', exc.ret, mandatory=False)
1055 1041 except error.BundleValueError as exc:
1056 1042 errpart = bundler.newpart('error:unsupportedcontent')
1057 1043 if exc.parttype is not None:
1058 1044 errpart.addparam('parttype', exc.parttype)
1059 1045 if exc.params:
1060 1046 errpart.addparam('params', '\0'.join(exc.params))
1061 1047 except error.Abort as exc:
1062 1048 manargs = [('message', str(exc))]
1063 1049 advargs = []
1064 1050 if exc.hint is not None:
1065 1051 advargs.append(('hint', exc.hint))
1066 1052 bundler.addpart(bundle2.bundlepart('error:abort',
1067 1053 manargs, advargs))
1068 1054 except error.PushRaced as exc:
1069 1055 bundler.newpart('error:pushraced', [('message', str(exc))])
1070 1056 return streamres(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now