##// END OF EJS Templates
bundlespec: rationalize the way we specify stream bundle version...
marmoute -
r52451:c4aab366 default
parent child Browse files
Show More
@@ -1,2674 +1,2675 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148
148
149 import collections
149 import collections
150 import errno
150 import errno
151 import os
151 import os
152 import re
152 import re
153 import string
153 import string
154 import struct
154 import struct
155 import sys
155 import sys
156
156
157 from .i18n import _
157 from .i18n import _
158 from .node import (
158 from .node import (
159 hex,
159 hex,
160 short,
160 short,
161 )
161 )
162 from . import (
162 from . import (
163 bookmarks,
163 bookmarks,
164 changegroup,
164 changegroup,
165 encoding,
165 encoding,
166 error,
166 error,
167 obsolete,
167 obsolete,
168 phases,
168 phases,
169 pushkey,
169 pushkey,
170 pycompat,
170 pycompat,
171 requirements,
171 requirements,
172 scmutil,
172 scmutil,
173 streamclone,
173 streamclone,
174 tags,
174 tags,
175 url,
175 url,
176 util,
176 util,
177 )
177 )
178 from .utils import (
178 from .utils import (
179 stringutil,
179 stringutil,
180 urlutil,
180 urlutil,
181 )
181 )
182 from .interfaces import repository
182 from .interfaces import repository
183
183
184 urlerr = util.urlerr
184 urlerr = util.urlerr
185 urlreq = util.urlreq
185 urlreq = util.urlreq
186
186
187 _pack = struct.pack
187 _pack = struct.pack
188 _unpack = struct.unpack
188 _unpack = struct.unpack
189
189
190 _fstreamparamsize = b'>i'
190 _fstreamparamsize = b'>i'
191 _fpartheadersize = b'>i'
191 _fpartheadersize = b'>i'
192 _fparttypesize = b'>B'
192 _fparttypesize = b'>B'
193 _fpartid = b'>I'
193 _fpartid = b'>I'
194 _fpayloadsize = b'>i'
194 _fpayloadsize = b'>i'
195 _fpartparamcount = b'>BB'
195 _fpartparamcount = b'>BB'
196
196
197 preferedchunksize = 32768
197 preferedchunksize = 32768
198
198
199 _parttypeforbidden = re.compile(b'[^a-zA-Z0-9_:-]')
199 _parttypeforbidden = re.compile(b'[^a-zA-Z0-9_:-]')
200
200
201
201
202 def outdebug(ui, message):
202 def outdebug(ui, message):
203 """debug regarding output stream (bundling)"""
203 """debug regarding output stream (bundling)"""
204 if ui.configbool(b'devel', b'bundle2.debug'):
204 if ui.configbool(b'devel', b'bundle2.debug'):
205 ui.debug(b'bundle2-output: %s\n' % message)
205 ui.debug(b'bundle2-output: %s\n' % message)
206
206
207
207
208 def indebug(ui, message):
208 def indebug(ui, message):
209 """debug on input stream (unbundling)"""
209 """debug on input stream (unbundling)"""
210 if ui.configbool(b'devel', b'bundle2.debug'):
210 if ui.configbool(b'devel', b'bundle2.debug'):
211 ui.debug(b'bundle2-input: %s\n' % message)
211 ui.debug(b'bundle2-input: %s\n' % message)
212
212
213
213
214 def validateparttype(parttype):
214 def validateparttype(parttype):
215 """raise ValueError if a parttype contains invalid character"""
215 """raise ValueError if a parttype contains invalid character"""
216 if _parttypeforbidden.search(parttype):
216 if _parttypeforbidden.search(parttype):
217 raise ValueError(parttype)
217 raise ValueError(parttype)
218
218
219
219
220 def _makefpartparamsizes(nbparams):
220 def _makefpartparamsizes(nbparams):
221 """return a struct format to read part parameter sizes
221 """return a struct format to read part parameter sizes
222
222
223 The number parameters is variable so we need to build that format
223 The number parameters is variable so we need to build that format
224 dynamically.
224 dynamically.
225 """
225 """
226 return b'>' + (b'BB' * nbparams)
226 return b'>' + (b'BB' * nbparams)
227
227
228
228
229 parthandlermapping = {}
229 parthandlermapping = {}
230
230
231
231
232 def parthandler(parttype, params=()):
232 def parthandler(parttype, params=()):
233 """decorator that register a function as a bundle2 part handler
233 """decorator that register a function as a bundle2 part handler
234
234
235 eg::
235 eg::
236
236
237 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
237 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
238 def myparttypehandler(...):
238 def myparttypehandler(...):
239 '''process a part of type "my part".'''
239 '''process a part of type "my part".'''
240 ...
240 ...
241 """
241 """
242 validateparttype(parttype)
242 validateparttype(parttype)
243
243
244 def _decorator(func):
244 def _decorator(func):
245 lparttype = parttype.lower() # enforce lower case matching.
245 lparttype = parttype.lower() # enforce lower case matching.
246 assert lparttype not in parthandlermapping
246 assert lparttype not in parthandlermapping
247 parthandlermapping[lparttype] = func
247 parthandlermapping[lparttype] = func
248 func.params = frozenset(params)
248 func.params = frozenset(params)
249 return func
249 return func
250
250
251 return _decorator
251 return _decorator
252
252
253
253
254 class unbundlerecords:
254 class unbundlerecords:
255 """keep record of what happens during and unbundle
255 """keep record of what happens during and unbundle
256
256
257 New records are added using `records.add('cat', obj)`. Where 'cat' is a
257 New records are added using `records.add('cat', obj)`. Where 'cat' is a
258 category of record and obj is an arbitrary object.
258 category of record and obj is an arbitrary object.
259
259
260 `records['cat']` will return all entries of this category 'cat'.
260 `records['cat']` will return all entries of this category 'cat'.
261
261
262 Iterating on the object itself will yield `('category', obj)` tuples
262 Iterating on the object itself will yield `('category', obj)` tuples
263 for all entries.
263 for all entries.
264
264
265 All iterations happens in chronological order.
265 All iterations happens in chronological order.
266 """
266 """
267
267
268 def __init__(self):
268 def __init__(self):
269 self._categories = {}
269 self._categories = {}
270 self._sequences = []
270 self._sequences = []
271 self._replies = {}
271 self._replies = {}
272
272
273 def add(self, category, entry, inreplyto=None):
273 def add(self, category, entry, inreplyto=None):
274 """add a new record of a given category.
274 """add a new record of a given category.
275
275
276 The entry can then be retrieved in the list returned by
276 The entry can then be retrieved in the list returned by
277 self['category']."""
277 self['category']."""
278 self._categories.setdefault(category, []).append(entry)
278 self._categories.setdefault(category, []).append(entry)
279 self._sequences.append((category, entry))
279 self._sequences.append((category, entry))
280 if inreplyto is not None:
280 if inreplyto is not None:
281 self.getreplies(inreplyto).add(category, entry)
281 self.getreplies(inreplyto).add(category, entry)
282
282
283 def getreplies(self, partid):
283 def getreplies(self, partid):
284 """get the records that are replies to a specific part"""
284 """get the records that are replies to a specific part"""
285 return self._replies.setdefault(partid, unbundlerecords())
285 return self._replies.setdefault(partid, unbundlerecords())
286
286
287 def __getitem__(self, cat):
287 def __getitem__(self, cat):
288 return tuple(self._categories.get(cat, ()))
288 return tuple(self._categories.get(cat, ()))
289
289
290 def __iter__(self):
290 def __iter__(self):
291 return iter(self._sequences)
291 return iter(self._sequences)
292
292
293 def __len__(self):
293 def __len__(self):
294 return len(self._sequences)
294 return len(self._sequences)
295
295
296 def __nonzero__(self):
296 def __nonzero__(self):
297 return bool(self._sequences)
297 return bool(self._sequences)
298
298
299 __bool__ = __nonzero__
299 __bool__ = __nonzero__
300
300
301
301
302 class bundleoperation:
302 class bundleoperation:
303 """an object that represents a single bundling process
303 """an object that represents a single bundling process
304
304
305 Its purpose is to carry unbundle-related objects and states.
305 Its purpose is to carry unbundle-related objects and states.
306
306
307 A new object should be created at the beginning of each bundle processing.
307 A new object should be created at the beginning of each bundle processing.
308 The object is to be returned by the processing function.
308 The object is to be returned by the processing function.
309
309
310 The object has very little content now it will ultimately contain:
310 The object has very little content now it will ultimately contain:
311 * an access to the repo the bundle is applied to,
311 * an access to the repo the bundle is applied to,
312 * a ui object,
312 * a ui object,
313 * a way to retrieve a transaction to add changes to the repo,
313 * a way to retrieve a transaction to add changes to the repo,
314 * a way to record the result of processing each part,
314 * a way to record the result of processing each part,
315 * a way to construct a bundle response when applicable.
315 * a way to construct a bundle response when applicable.
316 """
316 """
317
317
318 def __init__(
318 def __init__(
319 self,
319 self,
320 repo,
320 repo,
321 transactiongetter,
321 transactiongetter,
322 captureoutput=True,
322 captureoutput=True,
323 source=b'',
323 source=b'',
324 remote=None,
324 remote=None,
325 ):
325 ):
326 self.repo = repo
326 self.repo = repo
327 # the peer object who produced this bundle if available
327 # the peer object who produced this bundle if available
328 self.remote = remote
328 self.remote = remote
329 self.ui = repo.ui
329 self.ui = repo.ui
330 self.records = unbundlerecords()
330 self.records = unbundlerecords()
331 self.reply = None
331 self.reply = None
332 self.captureoutput = captureoutput
332 self.captureoutput = captureoutput
333 self.hookargs = {}
333 self.hookargs = {}
334 self._gettransaction = transactiongetter
334 self._gettransaction = transactiongetter
335 # carries value that can modify part behavior
335 # carries value that can modify part behavior
336 self.modes = {}
336 self.modes = {}
337 self.source = source
337 self.source = source
338
338
339 def gettransaction(self):
339 def gettransaction(self):
340 transaction = self._gettransaction()
340 transaction = self._gettransaction()
341
341
342 if self.hookargs:
342 if self.hookargs:
343 # the ones added to the transaction supercede those added
343 # the ones added to the transaction supercede those added
344 # to the operation.
344 # to the operation.
345 self.hookargs.update(transaction.hookargs)
345 self.hookargs.update(transaction.hookargs)
346 transaction.hookargs = self.hookargs
346 transaction.hookargs = self.hookargs
347
347
348 # mark the hookargs as flushed. further attempts to add to
348 # mark the hookargs as flushed. further attempts to add to
349 # hookargs will result in an abort.
349 # hookargs will result in an abort.
350 self.hookargs = None
350 self.hookargs = None
351
351
352 return transaction
352 return transaction
353
353
354 def addhookargs(self, hookargs):
354 def addhookargs(self, hookargs):
355 if self.hookargs is None:
355 if self.hookargs is None:
356 raise error.ProgrammingError(
356 raise error.ProgrammingError(
357 b'attempted to add hookargs to '
357 b'attempted to add hookargs to '
358 b'operation after transaction started'
358 b'operation after transaction started'
359 )
359 )
360 self.hookargs.update(hookargs)
360 self.hookargs.update(hookargs)
361
361
362
362
363 class TransactionUnavailable(RuntimeError):
363 class TransactionUnavailable(RuntimeError):
364 pass
364 pass
365
365
366
366
367 def _notransaction():
367 def _notransaction():
368 """default method to get a transaction while processing a bundle
368 """default method to get a transaction while processing a bundle
369
369
370 Raise an exception to highlight the fact that no transaction was expected
370 Raise an exception to highlight the fact that no transaction was expected
371 to be created"""
371 to be created"""
372 raise TransactionUnavailable()
372 raise TransactionUnavailable()
373
373
374
374
375 def applybundle(repo, unbundler, tr, source, url=None, remote=None, **kwargs):
375 def applybundle(repo, unbundler, tr, source, url=None, remote=None, **kwargs):
376 # transform me into unbundler.apply() as soon as the freeze is lifted
376 # transform me into unbundler.apply() as soon as the freeze is lifted
377 if isinstance(unbundler, unbundle20):
377 if isinstance(unbundler, unbundle20):
378 tr.hookargs[b'bundle2'] = b'1'
378 tr.hookargs[b'bundle2'] = b'1'
379 if source is not None and b'source' not in tr.hookargs:
379 if source is not None and b'source' not in tr.hookargs:
380 tr.hookargs[b'source'] = source
380 tr.hookargs[b'source'] = source
381 if url is not None and b'url' not in tr.hookargs:
381 if url is not None and b'url' not in tr.hookargs:
382 tr.hookargs[b'url'] = url
382 tr.hookargs[b'url'] = url
383 return processbundle(
383 return processbundle(
384 repo, unbundler, lambda: tr, source=source, remote=remote
384 repo, unbundler, lambda: tr, source=source, remote=remote
385 )
385 )
386 else:
386 else:
387 # the transactiongetter won't be used, but we might as well set it
387 # the transactiongetter won't be used, but we might as well set it
388 op = bundleoperation(repo, lambda: tr, source=source, remote=remote)
388 op = bundleoperation(repo, lambda: tr, source=source, remote=remote)
389 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
389 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
390 return op
390 return op
391
391
392
392
393 class partiterator:
393 class partiterator:
394 def __init__(self, repo, op, unbundler):
394 def __init__(self, repo, op, unbundler):
395 self.repo = repo
395 self.repo = repo
396 self.op = op
396 self.op = op
397 self.unbundler = unbundler
397 self.unbundler = unbundler
398 self.iterator = None
398 self.iterator = None
399 self.count = 0
399 self.count = 0
400 self.current = None
400 self.current = None
401
401
402 def __enter__(self):
402 def __enter__(self):
403 def func():
403 def func():
404 itr = enumerate(self.unbundler.iterparts(), 1)
404 itr = enumerate(self.unbundler.iterparts(), 1)
405 for count, p in itr:
405 for count, p in itr:
406 self.count = count
406 self.count = count
407 self.current = p
407 self.current = p
408 yield p
408 yield p
409 p.consume()
409 p.consume()
410 self.current = None
410 self.current = None
411
411
412 self.iterator = func()
412 self.iterator = func()
413 return self.iterator
413 return self.iterator
414
414
415 def __exit__(self, type, exc, tb):
415 def __exit__(self, type, exc, tb):
416 if not self.iterator:
416 if not self.iterator:
417 return
417 return
418
418
419 # Only gracefully abort in a normal exception situation. User aborts
419 # Only gracefully abort in a normal exception situation. User aborts
420 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
420 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
421 # and should not gracefully cleanup.
421 # and should not gracefully cleanup.
422 if isinstance(exc, Exception):
422 if isinstance(exc, Exception):
423 # Any exceptions seeking to the end of the bundle at this point are
423 # Any exceptions seeking to the end of the bundle at this point are
424 # almost certainly related to the underlying stream being bad.
424 # almost certainly related to the underlying stream being bad.
425 # And, chances are that the exception we're handling is related to
425 # And, chances are that the exception we're handling is related to
426 # getting in that bad state. So, we swallow the seeking error and
426 # getting in that bad state. So, we swallow the seeking error and
427 # re-raise the original error.
427 # re-raise the original error.
428 seekerror = False
428 seekerror = False
429 try:
429 try:
430 if self.current:
430 if self.current:
431 # consume the part content to not corrupt the stream.
431 # consume the part content to not corrupt the stream.
432 self.current.consume()
432 self.current.consume()
433
433
434 for part in self.iterator:
434 for part in self.iterator:
435 # consume the bundle content
435 # consume the bundle content
436 part.consume()
436 part.consume()
437 except Exception:
437 except Exception:
438 seekerror = True
438 seekerror = True
439
439
440 # Small hack to let caller code distinguish exceptions from bundle2
440 # Small hack to let caller code distinguish exceptions from bundle2
441 # processing from processing the old format. This is mostly needed
441 # processing from processing the old format. This is mostly needed
442 # to handle different return codes to unbundle according to the type
442 # to handle different return codes to unbundle according to the type
443 # of bundle. We should probably clean up or drop this return code
443 # of bundle. We should probably clean up or drop this return code
444 # craziness in a future version.
444 # craziness in a future version.
445 exc.duringunbundle2 = True
445 exc.duringunbundle2 = True
446 salvaged = []
446 salvaged = []
447 replycaps = None
447 replycaps = None
448 if self.op.reply is not None:
448 if self.op.reply is not None:
449 salvaged = self.op.reply.salvageoutput()
449 salvaged = self.op.reply.salvageoutput()
450 replycaps = self.op.reply.capabilities
450 replycaps = self.op.reply.capabilities
451 exc._replycaps = replycaps
451 exc._replycaps = replycaps
452 exc._bundle2salvagedoutput = salvaged
452 exc._bundle2salvagedoutput = salvaged
453
453
454 # Re-raising from a variable loses the original stack. So only use
454 # Re-raising from a variable loses the original stack. So only use
455 # that form if we need to.
455 # that form if we need to.
456 if seekerror:
456 if seekerror:
457 raise exc
457 raise exc
458
458
459 self.repo.ui.debug(
459 self.repo.ui.debug(
460 b'bundle2-input-bundle: %i parts total\n' % self.count
460 b'bundle2-input-bundle: %i parts total\n' % self.count
461 )
461 )
462
462
463
463
464 def processbundle(
464 def processbundle(
465 repo,
465 repo,
466 unbundler,
466 unbundler,
467 transactiongetter=None,
467 transactiongetter=None,
468 op=None,
468 op=None,
469 source=b'',
469 source=b'',
470 remote=None,
470 remote=None,
471 ):
471 ):
472 """This function process a bundle, apply effect to/from a repo
472 """This function process a bundle, apply effect to/from a repo
473
473
474 It iterates over each part then searches for and uses the proper handling
474 It iterates over each part then searches for and uses the proper handling
475 code to process the part. Parts are processed in order.
475 code to process the part. Parts are processed in order.
476
476
477 Unknown Mandatory part will abort the process.
477 Unknown Mandatory part will abort the process.
478
478
479 It is temporarily possible to provide a prebuilt bundleoperation to the
479 It is temporarily possible to provide a prebuilt bundleoperation to the
480 function. This is used to ensure output is properly propagated in case of
480 function. This is used to ensure output is properly propagated in case of
481 an error during the unbundling. This output capturing part will likely be
481 an error during the unbundling. This output capturing part will likely be
482 reworked and this ability will probably go away in the process.
482 reworked and this ability will probably go away in the process.
483 """
483 """
484 if op is None:
484 if op is None:
485 if transactiongetter is None:
485 if transactiongetter is None:
486 transactiongetter = _notransaction
486 transactiongetter = _notransaction
487 op = bundleoperation(
487 op = bundleoperation(
488 repo,
488 repo,
489 transactiongetter,
489 transactiongetter,
490 source=source,
490 source=source,
491 remote=remote,
491 remote=remote,
492 )
492 )
493 # todo:
493 # todo:
494 # - replace this is a init function soon.
494 # - replace this is a init function soon.
495 # - exception catching
495 # - exception catching
496 unbundler.params
496 unbundler.params
497 if repo.ui.debugflag:
497 if repo.ui.debugflag:
498 msg = [b'bundle2-input-bundle:']
498 msg = [b'bundle2-input-bundle:']
499 if unbundler.params:
499 if unbundler.params:
500 msg.append(b' %i params' % len(unbundler.params))
500 msg.append(b' %i params' % len(unbundler.params))
501 if op._gettransaction is None or op._gettransaction is _notransaction:
501 if op._gettransaction is None or op._gettransaction is _notransaction:
502 msg.append(b' no-transaction')
502 msg.append(b' no-transaction')
503 else:
503 else:
504 msg.append(b' with-transaction')
504 msg.append(b' with-transaction')
505 msg.append(b'\n')
505 msg.append(b'\n')
506 repo.ui.debug(b''.join(msg))
506 repo.ui.debug(b''.join(msg))
507
507
508 processparts(repo, op, unbundler)
508 processparts(repo, op, unbundler)
509
509
510 return op
510 return op
511
511
512
512
513 def processparts(repo, op, unbundler):
513 def processparts(repo, op, unbundler):
514 with partiterator(repo, op, unbundler) as parts:
514 with partiterator(repo, op, unbundler) as parts:
515 for part in parts:
515 for part in parts:
516 _processpart(op, part)
516 _processpart(op, part)
517
517
518
518
519 def _processchangegroup(op, cg, tr, source, url, **kwargs):
519 def _processchangegroup(op, cg, tr, source, url, **kwargs):
520 if op.remote is not None and op.remote.path is not None:
520 if op.remote is not None and op.remote.path is not None:
521 remote_path = op.remote.path
521 remote_path = op.remote.path
522 kwargs = kwargs.copy()
522 kwargs = kwargs.copy()
523 kwargs['delta_base_reuse_policy'] = remote_path.delta_reuse_policy
523 kwargs['delta_base_reuse_policy'] = remote_path.delta_reuse_policy
524 ret = cg.apply(op.repo, tr, source, url, **kwargs)
524 ret = cg.apply(op.repo, tr, source, url, **kwargs)
525 op.records.add(
525 op.records.add(
526 b'changegroup',
526 b'changegroup',
527 {
527 {
528 b'return': ret,
528 b'return': ret,
529 },
529 },
530 )
530 )
531 return ret
531 return ret
532
532
533
533
534 def _gethandler(op, part):
534 def _gethandler(op, part):
535 status = b'unknown' # used by debug output
535 status = b'unknown' # used by debug output
536 try:
536 try:
537 handler = parthandlermapping.get(part.type)
537 handler = parthandlermapping.get(part.type)
538 if handler is None:
538 if handler is None:
539 status = b'unsupported-type'
539 status = b'unsupported-type'
540 raise error.BundleUnknownFeatureError(parttype=part.type)
540 raise error.BundleUnknownFeatureError(parttype=part.type)
541 indebug(op.ui, b'found a handler for part %s' % part.type)
541 indebug(op.ui, b'found a handler for part %s' % part.type)
542 unknownparams = part.mandatorykeys - handler.params
542 unknownparams = part.mandatorykeys - handler.params
543 if unknownparams:
543 if unknownparams:
544 unknownparams = list(unknownparams)
544 unknownparams = list(unknownparams)
545 unknownparams.sort()
545 unknownparams.sort()
546 status = b'unsupported-params (%s)' % b', '.join(unknownparams)
546 status = b'unsupported-params (%s)' % b', '.join(unknownparams)
547 raise error.BundleUnknownFeatureError(
547 raise error.BundleUnknownFeatureError(
548 parttype=part.type, params=unknownparams
548 parttype=part.type, params=unknownparams
549 )
549 )
550 status = b'supported'
550 status = b'supported'
551 except error.BundleUnknownFeatureError as exc:
551 except error.BundleUnknownFeatureError as exc:
552 if part.mandatory: # mandatory parts
552 if part.mandatory: # mandatory parts
553 raise
553 raise
554 indebug(op.ui, b'ignoring unsupported advisory part %s' % exc)
554 indebug(op.ui, b'ignoring unsupported advisory part %s' % exc)
555 return # skip to part processing
555 return # skip to part processing
556 finally:
556 finally:
557 if op.ui.debugflag:
557 if op.ui.debugflag:
558 msg = [b'bundle2-input-part: "%s"' % part.type]
558 msg = [b'bundle2-input-part: "%s"' % part.type]
559 if not part.mandatory:
559 if not part.mandatory:
560 msg.append(b' (advisory)')
560 msg.append(b' (advisory)')
561 nbmp = len(part.mandatorykeys)
561 nbmp = len(part.mandatorykeys)
562 nbap = len(part.params) - nbmp
562 nbap = len(part.params) - nbmp
563 if nbmp or nbap:
563 if nbmp or nbap:
564 msg.append(b' (params:')
564 msg.append(b' (params:')
565 if nbmp:
565 if nbmp:
566 msg.append(b' %i mandatory' % nbmp)
566 msg.append(b' %i mandatory' % nbmp)
567 if nbap:
567 if nbap:
568 msg.append(b' %i advisory' % nbmp)
568 msg.append(b' %i advisory' % nbmp)
569 msg.append(b')')
569 msg.append(b')')
570 msg.append(b' %s\n' % status)
570 msg.append(b' %s\n' % status)
571 op.ui.debug(b''.join(msg))
571 op.ui.debug(b''.join(msg))
572
572
573 return handler
573 return handler
574
574
575
575
576 def _processpart(op, part):
576 def _processpart(op, part):
577 """process a single part from a bundle
577 """process a single part from a bundle
578
578
579 The part is guaranteed to have been fully consumed when the function exits
579 The part is guaranteed to have been fully consumed when the function exits
580 (even if an exception is raised)."""
580 (even if an exception is raised)."""
581 handler = _gethandler(op, part)
581 handler = _gethandler(op, part)
582 if handler is None:
582 if handler is None:
583 return
583 return
584
584
585 # handler is called outside the above try block so that we don't
585 # handler is called outside the above try block so that we don't
586 # risk catching KeyErrors from anything other than the
586 # risk catching KeyErrors from anything other than the
587 # parthandlermapping lookup (any KeyError raised by handler()
587 # parthandlermapping lookup (any KeyError raised by handler()
588 # itself represents a defect of a different variety).
588 # itself represents a defect of a different variety).
589 output = None
589 output = None
590 if op.captureoutput and op.reply is not None:
590 if op.captureoutput and op.reply is not None:
591 op.ui.pushbuffer(error=True, subproc=True)
591 op.ui.pushbuffer(error=True, subproc=True)
592 output = b''
592 output = b''
593 try:
593 try:
594 handler(op, part)
594 handler(op, part)
595 finally:
595 finally:
596 if output is not None:
596 if output is not None:
597 output = op.ui.popbuffer()
597 output = op.ui.popbuffer()
598 if output:
598 if output:
599 outpart = op.reply.newpart(b'output', data=output, mandatory=False)
599 outpart = op.reply.newpart(b'output', data=output, mandatory=False)
600 outpart.addparam(
600 outpart.addparam(
601 b'in-reply-to', pycompat.bytestr(part.id), mandatory=False
601 b'in-reply-to', pycompat.bytestr(part.id), mandatory=False
602 )
602 )
603
603
604
604
605 def decodecaps(blob):
605 def decodecaps(blob):
606 """decode a bundle2 caps bytes blob into a dictionary
606 """decode a bundle2 caps bytes blob into a dictionary
607
607
608 The blob is a list of capabilities (one per line)
608 The blob is a list of capabilities (one per line)
609 Capabilities may have values using a line of the form::
609 Capabilities may have values using a line of the form::
610
610
611 capability=value1,value2,value3
611 capability=value1,value2,value3
612
612
613 The values are always a list."""
613 The values are always a list."""
614 caps = {}
614 caps = {}
615 for line in blob.splitlines():
615 for line in blob.splitlines():
616 if not line:
616 if not line:
617 continue
617 continue
618 if b'=' not in line:
618 if b'=' not in line:
619 key, vals = line, ()
619 key, vals = line, ()
620 else:
620 else:
621 key, vals = line.split(b'=', 1)
621 key, vals = line.split(b'=', 1)
622 vals = vals.split(b',')
622 vals = vals.split(b',')
623 key = urlreq.unquote(key)
623 key = urlreq.unquote(key)
624 vals = [urlreq.unquote(v) for v in vals]
624 vals = [urlreq.unquote(v) for v in vals]
625 caps[key] = vals
625 caps[key] = vals
626 return caps
626 return caps
627
627
628
628
629 def encodecaps(caps):
629 def encodecaps(caps):
630 """encode a bundle2 caps dictionary into a bytes blob"""
630 """encode a bundle2 caps dictionary into a bytes blob"""
631 chunks = []
631 chunks = []
632 for ca in sorted(caps):
632 for ca in sorted(caps):
633 vals = caps[ca]
633 vals = caps[ca]
634 ca = urlreq.quote(ca)
634 ca = urlreq.quote(ca)
635 vals = [urlreq.quote(v) for v in vals]
635 vals = [urlreq.quote(v) for v in vals]
636 if vals:
636 if vals:
637 ca = b"%s=%s" % (ca, b','.join(vals))
637 ca = b"%s=%s" % (ca, b','.join(vals))
638 chunks.append(ca)
638 chunks.append(ca)
639 return b'\n'.join(chunks)
639 return b'\n'.join(chunks)
640
640
641
641
642 bundletypes = {
642 bundletypes = {
643 b"": (b"", b'UN'), # only when using unbundle on ssh and old http servers
643 b"": (b"", b'UN'), # only when using unbundle on ssh and old http servers
644 # since the unification ssh accepts a header but there
644 # since the unification ssh accepts a header but there
645 # is no capability signaling it.
645 # is no capability signaling it.
646 b"HG20": (), # special-cased below
646 b"HG20": (), # special-cased below
647 b"HG10UN": (b"HG10UN", b'UN'),
647 b"HG10UN": (b"HG10UN", b'UN'),
648 b"HG10BZ": (b"HG10", b'BZ'),
648 b"HG10BZ": (b"HG10", b'BZ'),
649 b"HG10GZ": (b"HG10GZ", b'GZ'),
649 b"HG10GZ": (b"HG10GZ", b'GZ'),
650 }
650 }
651
651
652 # hgweb uses this list to communicate its preferred type
652 # hgweb uses this list to communicate its preferred type
653 bundlepriority = [b'HG10GZ', b'HG10BZ', b'HG10UN']
653 bundlepriority = [b'HG10GZ', b'HG10BZ', b'HG10UN']
654
654
655
655
656 class bundle20:
656 class bundle20:
657 """represent an outgoing bundle2 container
657 """represent an outgoing bundle2 container
658
658
659 Use the `addparam` method to add stream level parameter. and `newpart` to
659 Use the `addparam` method to add stream level parameter. and `newpart` to
660 populate it. Then call `getchunks` to retrieve all the binary chunks of
660 populate it. Then call `getchunks` to retrieve all the binary chunks of
661 data that compose the bundle2 container."""
661 data that compose the bundle2 container."""
662
662
663 _magicstring = b'HG20'
663 _magicstring = b'HG20'
664
664
665 def __init__(self, ui, capabilities=()):
665 def __init__(self, ui, capabilities=()):
666 self.ui = ui
666 self.ui = ui
667 self._params = []
667 self._params = []
668 self._parts = []
668 self._parts = []
669 self.capabilities = dict(capabilities)
669 self.capabilities = dict(capabilities)
670 self._compengine = util.compengines.forbundletype(b'UN')
670 self._compengine = util.compengines.forbundletype(b'UN')
671 self._compopts = None
671 self._compopts = None
672 # If compression is being handled by a consumer of the raw
672 # If compression is being handled by a consumer of the raw
673 # data (e.g. the wire protocol), unsetting this flag tells
673 # data (e.g. the wire protocol), unsetting this flag tells
674 # consumers that the bundle is best left uncompressed.
674 # consumers that the bundle is best left uncompressed.
675 self.prefercompressed = True
675 self.prefercompressed = True
676
676
677 def setcompression(self, alg, compopts=None):
677 def setcompression(self, alg, compopts=None):
678 """setup core part compression to <alg>"""
678 """setup core part compression to <alg>"""
679 if alg in (None, b'UN'):
679 if alg in (None, b'UN'):
680 return
680 return
681 assert not any(n.lower() == b'compression' for n, v in self._params)
681 assert not any(n.lower() == b'compression' for n, v in self._params)
682 self.addparam(b'Compression', alg)
682 self.addparam(b'Compression', alg)
683 self._compengine = util.compengines.forbundletype(alg)
683 self._compengine = util.compengines.forbundletype(alg)
684 self._compopts = compopts
684 self._compopts = compopts
685
685
686 @property
686 @property
687 def nbparts(self):
687 def nbparts(self):
688 """total number of parts added to the bundler"""
688 """total number of parts added to the bundler"""
689 return len(self._parts)
689 return len(self._parts)
690
690
691 # methods used to defines the bundle2 content
691 # methods used to defines the bundle2 content
692 def addparam(self, name, value=None):
692 def addparam(self, name, value=None):
693 """add a stream level parameter"""
693 """add a stream level parameter"""
694 if not name:
694 if not name:
695 raise error.ProgrammingError(b'empty parameter name')
695 raise error.ProgrammingError(b'empty parameter name')
696 if name[0:1] not in pycompat.bytestr(
696 if name[0:1] not in pycompat.bytestr(
697 string.ascii_letters # pytype: disable=wrong-arg-types
697 string.ascii_letters # pytype: disable=wrong-arg-types
698 ):
698 ):
699 raise error.ProgrammingError(
699 raise error.ProgrammingError(
700 b'non letter first character: %s' % name
700 b'non letter first character: %s' % name
701 )
701 )
702 self._params.append((name, value))
702 self._params.append((name, value))
703
703
704 def addpart(self, part):
704 def addpart(self, part):
705 """add a new part to the bundle2 container
705 """add a new part to the bundle2 container
706
706
707 Parts contains the actual applicative payload."""
707 Parts contains the actual applicative payload."""
708 assert part.id is None
708 assert part.id is None
709 part.id = len(self._parts) # very cheap counter
709 part.id = len(self._parts) # very cheap counter
710 self._parts.append(part)
710 self._parts.append(part)
711
711
712 def newpart(self, typeid, *args, **kwargs):
712 def newpart(self, typeid, *args, **kwargs):
713 """create a new part and add it to the containers
713 """create a new part and add it to the containers
714
714
715 As the part is directly added to the containers. For now, this means
715 As the part is directly added to the containers. For now, this means
716 that any failure to properly initialize the part after calling
716 that any failure to properly initialize the part after calling
717 ``newpart`` should result in a failure of the whole bundling process.
717 ``newpart`` should result in a failure of the whole bundling process.
718
718
719 You can still fall back to manually create and add if you need better
719 You can still fall back to manually create and add if you need better
720 control."""
720 control."""
721 part = bundlepart(typeid, *args, **kwargs)
721 part = bundlepart(typeid, *args, **kwargs)
722 self.addpart(part)
722 self.addpart(part)
723 return part
723 return part
724
724
725 # methods used to generate the bundle2 stream
725 # methods used to generate the bundle2 stream
726 def getchunks(self):
726 def getchunks(self):
727 if self.ui.debugflag:
727 if self.ui.debugflag:
728 msg = [b'bundle2-output-bundle: "%s",' % self._magicstring]
728 msg = [b'bundle2-output-bundle: "%s",' % self._magicstring]
729 if self._params:
729 if self._params:
730 msg.append(b' (%i params)' % len(self._params))
730 msg.append(b' (%i params)' % len(self._params))
731 msg.append(b' %i parts total\n' % len(self._parts))
731 msg.append(b' %i parts total\n' % len(self._parts))
732 self.ui.debug(b''.join(msg))
732 self.ui.debug(b''.join(msg))
733 outdebug(self.ui, b'start emission of %s stream' % self._magicstring)
733 outdebug(self.ui, b'start emission of %s stream' % self._magicstring)
734 yield self._magicstring
734 yield self._magicstring
735 param = self._paramchunk()
735 param = self._paramchunk()
736 outdebug(self.ui, b'bundle parameter: %s' % param)
736 outdebug(self.ui, b'bundle parameter: %s' % param)
737 yield _pack(_fstreamparamsize, len(param))
737 yield _pack(_fstreamparamsize, len(param))
738 if param:
738 if param:
739 yield param
739 yield param
740 for chunk in self._compengine.compressstream(
740 for chunk in self._compengine.compressstream(
741 self._getcorechunk(), self._compopts
741 self._getcorechunk(), self._compopts
742 ):
742 ):
743 yield chunk
743 yield chunk
744
744
745 def _paramchunk(self):
745 def _paramchunk(self):
746 """return a encoded version of all stream parameters"""
746 """return a encoded version of all stream parameters"""
747 blocks = []
747 blocks = []
748 for par, value in self._params:
748 for par, value in self._params:
749 par = urlreq.quote(par)
749 par = urlreq.quote(par)
750 if value is not None:
750 if value is not None:
751 value = urlreq.quote(value)
751 value = urlreq.quote(value)
752 par = b'%s=%s' % (par, value)
752 par = b'%s=%s' % (par, value)
753 blocks.append(par)
753 blocks.append(par)
754 return b' '.join(blocks)
754 return b' '.join(blocks)
755
755
756 def _getcorechunk(self):
756 def _getcorechunk(self):
757 """yield chunk for the core part of the bundle
757 """yield chunk for the core part of the bundle
758
758
759 (all but headers and parameters)"""
759 (all but headers and parameters)"""
760 outdebug(self.ui, b'start of parts')
760 outdebug(self.ui, b'start of parts')
761 for part in self._parts:
761 for part in self._parts:
762 outdebug(self.ui, b'bundle part: "%s"' % part.type)
762 outdebug(self.ui, b'bundle part: "%s"' % part.type)
763 for chunk in part.getchunks(ui=self.ui):
763 for chunk in part.getchunks(ui=self.ui):
764 yield chunk
764 yield chunk
765 outdebug(self.ui, b'end of bundle')
765 outdebug(self.ui, b'end of bundle')
766 yield _pack(_fpartheadersize, 0)
766 yield _pack(_fpartheadersize, 0)
767
767
768 def salvageoutput(self):
768 def salvageoutput(self):
769 """return a list with a copy of all output parts in the bundle
769 """return a list with a copy of all output parts in the bundle
770
770
771 This is meant to be used during error handling to make sure we preserve
771 This is meant to be used during error handling to make sure we preserve
772 server output"""
772 server output"""
773 salvaged = []
773 salvaged = []
774 for part in self._parts:
774 for part in self._parts:
775 if part.type.startswith(b'output'):
775 if part.type.startswith(b'output'):
776 salvaged.append(part.copy())
776 salvaged.append(part.copy())
777 return salvaged
777 return salvaged
778
778
779
779
780 class unpackermixin:
780 class unpackermixin:
781 """A mixin to extract bytes and struct data from a stream"""
781 """A mixin to extract bytes and struct data from a stream"""
782
782
783 def __init__(self, fp):
783 def __init__(self, fp):
784 self._fp = fp
784 self._fp = fp
785
785
786 def _unpack(self, format):
786 def _unpack(self, format):
787 """unpack this struct format from the stream
787 """unpack this struct format from the stream
788
788
789 This method is meant for internal usage by the bundle2 protocol only.
789 This method is meant for internal usage by the bundle2 protocol only.
790 They directly manipulate the low level stream including bundle2 level
790 They directly manipulate the low level stream including bundle2 level
791 instruction.
791 instruction.
792
792
793 Do not use it to implement higher-level logic or methods."""
793 Do not use it to implement higher-level logic or methods."""
794 data = self._readexact(struct.calcsize(format))
794 data = self._readexact(struct.calcsize(format))
795 return _unpack(format, data)
795 return _unpack(format, data)
796
796
797 def _readexact(self, size):
797 def _readexact(self, size):
798 """read exactly <size> bytes from the stream
798 """read exactly <size> bytes from the stream
799
799
800 This method is meant for internal usage by the bundle2 protocol only.
800 This method is meant for internal usage by the bundle2 protocol only.
801 They directly manipulate the low level stream including bundle2 level
801 They directly manipulate the low level stream including bundle2 level
802 instruction.
802 instruction.
803
803
804 Do not use it to implement higher-level logic or methods."""
804 Do not use it to implement higher-level logic or methods."""
805 return changegroup.readexactly(self._fp, size)
805 return changegroup.readexactly(self._fp, size)
806
806
807
807
808 def getunbundler(ui, fp, magicstring=None):
808 def getunbundler(ui, fp, magicstring=None):
809 """return a valid unbundler object for a given magicstring"""
809 """return a valid unbundler object for a given magicstring"""
810 if magicstring is None:
810 if magicstring is None:
811 magicstring = changegroup.readexactly(fp, 4)
811 magicstring = changegroup.readexactly(fp, 4)
812 magic, version = magicstring[0:2], magicstring[2:4]
812 magic, version = magicstring[0:2], magicstring[2:4]
813 if magic != b'HG':
813 if magic != b'HG':
814 ui.debug(
814 ui.debug(
815 b"error: invalid magic: %r (version %r), should be 'HG'\n"
815 b"error: invalid magic: %r (version %r), should be 'HG'\n"
816 % (magic, version)
816 % (magic, version)
817 )
817 )
818 raise error.Abort(_(b'not a Mercurial bundle'))
818 raise error.Abort(_(b'not a Mercurial bundle'))
819 unbundlerclass = formatmap.get(version)
819 unbundlerclass = formatmap.get(version)
820 if unbundlerclass is None:
820 if unbundlerclass is None:
821 raise error.Abort(_(b'unknown bundle version %s') % version)
821 raise error.Abort(_(b'unknown bundle version %s') % version)
822 unbundler = unbundlerclass(ui, fp)
822 unbundler = unbundlerclass(ui, fp)
823 indebug(ui, b'start processing of %s stream' % magicstring)
823 indebug(ui, b'start processing of %s stream' % magicstring)
824 return unbundler
824 return unbundler
825
825
826
826
827 class unbundle20(unpackermixin):
827 class unbundle20(unpackermixin):
828 """interpret a bundle2 stream
828 """interpret a bundle2 stream
829
829
830 This class is fed with a binary stream and yields parts through its
830 This class is fed with a binary stream and yields parts through its
831 `iterparts` methods."""
831 `iterparts` methods."""
832
832
833 _magicstring = b'HG20'
833 _magicstring = b'HG20'
834
834
835 def __init__(self, ui, fp):
835 def __init__(self, ui, fp):
836 """If header is specified, we do not read it out of the stream."""
836 """If header is specified, we do not read it out of the stream."""
837 self.ui = ui
837 self.ui = ui
838 self._compengine = util.compengines.forbundletype(b'UN')
838 self._compengine = util.compengines.forbundletype(b'UN')
839 self._compressed = None
839 self._compressed = None
840 super(unbundle20, self).__init__(fp)
840 super(unbundle20, self).__init__(fp)
841
841
842 @util.propertycache
842 @util.propertycache
843 def params(self):
843 def params(self):
844 """dictionary of stream level parameters"""
844 """dictionary of stream level parameters"""
845 indebug(self.ui, b'reading bundle2 stream parameters')
845 indebug(self.ui, b'reading bundle2 stream parameters')
846 params = {}
846 params = {}
847 paramssize = self._unpack(_fstreamparamsize)[0]
847 paramssize = self._unpack(_fstreamparamsize)[0]
848 if paramssize < 0:
848 if paramssize < 0:
849 raise error.BundleValueError(
849 raise error.BundleValueError(
850 b'negative bundle param size: %i' % paramssize
850 b'negative bundle param size: %i' % paramssize
851 )
851 )
852 if paramssize:
852 if paramssize:
853 params = self._readexact(paramssize)
853 params = self._readexact(paramssize)
854 params = self._processallparams(params)
854 params = self._processallparams(params)
855 return params
855 return params
856
856
857 def _processallparams(self, paramsblock):
857 def _processallparams(self, paramsblock):
858 """ """
858 """ """
859 params = util.sortdict()
859 params = util.sortdict()
860 for p in paramsblock.split(b' '):
860 for p in paramsblock.split(b' '):
861 p = p.split(b'=', 1)
861 p = p.split(b'=', 1)
862 p = [urlreq.unquote(i) for i in p]
862 p = [urlreq.unquote(i) for i in p]
863 if len(p) < 2:
863 if len(p) < 2:
864 p.append(None)
864 p.append(None)
865 self._processparam(*p)
865 self._processparam(*p)
866 params[p[0]] = p[1]
866 params[p[0]] = p[1]
867 return params
867 return params
868
868
869 def _processparam(self, name, value):
869 def _processparam(self, name, value):
870 """process a parameter, applying its effect if needed
870 """process a parameter, applying its effect if needed
871
871
872 Parameter starting with a lower case letter are advisory and will be
872 Parameter starting with a lower case letter are advisory and will be
873 ignored when unknown. Those starting with an upper case letter are
873 ignored when unknown. Those starting with an upper case letter are
874 mandatory and will this function will raise a KeyError when unknown.
874 mandatory and will this function will raise a KeyError when unknown.
875
875
876 Note: no option are currently supported. Any input will be either
876 Note: no option are currently supported. Any input will be either
877 ignored or failing.
877 ignored or failing.
878 """
878 """
879 if not name:
879 if not name:
880 raise ValueError('empty parameter name')
880 raise ValueError('empty parameter name')
881 if name[0:1] not in pycompat.bytestr(
881 if name[0:1] not in pycompat.bytestr(
882 string.ascii_letters # pytype: disable=wrong-arg-types
882 string.ascii_letters # pytype: disable=wrong-arg-types
883 ):
883 ):
884 raise ValueError('non letter first character: %s' % name)
884 raise ValueError('non letter first character: %s' % name)
885 try:
885 try:
886 handler = b2streamparamsmap[name.lower()]
886 handler = b2streamparamsmap[name.lower()]
887 except KeyError:
887 except KeyError:
888 if name[0:1].islower():
888 if name[0:1].islower():
889 indebug(self.ui, b"ignoring unknown parameter %s" % name)
889 indebug(self.ui, b"ignoring unknown parameter %s" % name)
890 else:
890 else:
891 raise error.BundleUnknownFeatureError(params=(name,))
891 raise error.BundleUnknownFeatureError(params=(name,))
892 else:
892 else:
893 handler(self, name, value)
893 handler(self, name, value)
894
894
895 def _forwardchunks(self):
895 def _forwardchunks(self):
896 """utility to transfer a bundle2 as binary
896 """utility to transfer a bundle2 as binary
897
897
898 This is made necessary by the fact the 'getbundle' command over 'ssh'
898 This is made necessary by the fact the 'getbundle' command over 'ssh'
899 have no way to know when the reply ends, relying on the bundle to be
899 have no way to know when the reply ends, relying on the bundle to be
900 interpreted to know its end. This is terrible and we are sorry, but we
900 interpreted to know its end. This is terrible and we are sorry, but we
901 needed to move forward to get general delta enabled.
901 needed to move forward to get general delta enabled.
902 """
902 """
903 yield self._magicstring
903 yield self._magicstring
904 assert 'params' not in vars(self)
904 assert 'params' not in vars(self)
905 paramssize = self._unpack(_fstreamparamsize)[0]
905 paramssize = self._unpack(_fstreamparamsize)[0]
906 if paramssize < 0:
906 if paramssize < 0:
907 raise error.BundleValueError(
907 raise error.BundleValueError(
908 b'negative bundle param size: %i' % paramssize
908 b'negative bundle param size: %i' % paramssize
909 )
909 )
910 if paramssize:
910 if paramssize:
911 params = self._readexact(paramssize)
911 params = self._readexact(paramssize)
912 self._processallparams(params)
912 self._processallparams(params)
913 # The payload itself is decompressed below, so drop
913 # The payload itself is decompressed below, so drop
914 # the compression parameter passed down to compensate.
914 # the compression parameter passed down to compensate.
915 outparams = []
915 outparams = []
916 for p in params.split(b' '):
916 for p in params.split(b' '):
917 k, v = p.split(b'=', 1)
917 k, v = p.split(b'=', 1)
918 if k.lower() != b'compression':
918 if k.lower() != b'compression':
919 outparams.append(p)
919 outparams.append(p)
920 outparams = b' '.join(outparams)
920 outparams = b' '.join(outparams)
921 yield _pack(_fstreamparamsize, len(outparams))
921 yield _pack(_fstreamparamsize, len(outparams))
922 yield outparams
922 yield outparams
923 else:
923 else:
924 yield _pack(_fstreamparamsize, paramssize)
924 yield _pack(_fstreamparamsize, paramssize)
925 # From there, payload might need to be decompressed
925 # From there, payload might need to be decompressed
926 self._fp = self._compengine.decompressorreader(self._fp)
926 self._fp = self._compengine.decompressorreader(self._fp)
927 emptycount = 0
927 emptycount = 0
928 while emptycount < 2:
928 while emptycount < 2:
929 # so we can brainlessly loop
929 # so we can brainlessly loop
930 assert _fpartheadersize == _fpayloadsize
930 assert _fpartheadersize == _fpayloadsize
931 size = self._unpack(_fpartheadersize)[0]
931 size = self._unpack(_fpartheadersize)[0]
932 yield _pack(_fpartheadersize, size)
932 yield _pack(_fpartheadersize, size)
933 if size:
933 if size:
934 emptycount = 0
934 emptycount = 0
935 else:
935 else:
936 emptycount += 1
936 emptycount += 1
937 continue
937 continue
938 if size == flaginterrupt:
938 if size == flaginterrupt:
939 continue
939 continue
940 elif size < 0:
940 elif size < 0:
941 raise error.BundleValueError(b'negative chunk size: %i')
941 raise error.BundleValueError(b'negative chunk size: %i')
942 yield self._readexact(size)
942 yield self._readexact(size)
943
943
944 def iterparts(self, seekable=False):
944 def iterparts(self, seekable=False):
945 """yield all parts contained in the stream"""
945 """yield all parts contained in the stream"""
946 cls = seekableunbundlepart if seekable else unbundlepart
946 cls = seekableunbundlepart if seekable else unbundlepart
947 # make sure param have been loaded
947 # make sure param have been loaded
948 self.params
948 self.params
949 # From there, payload need to be decompressed
949 # From there, payload need to be decompressed
950 self._fp = self._compengine.decompressorreader(self._fp)
950 self._fp = self._compengine.decompressorreader(self._fp)
951 indebug(self.ui, b'start extraction of bundle2 parts')
951 indebug(self.ui, b'start extraction of bundle2 parts')
952 headerblock = self._readpartheader()
952 headerblock = self._readpartheader()
953 while headerblock is not None:
953 while headerblock is not None:
954 part = cls(self.ui, headerblock, self._fp)
954 part = cls(self.ui, headerblock, self._fp)
955 yield part
955 yield part
956 # Ensure part is fully consumed so we can start reading the next
956 # Ensure part is fully consumed so we can start reading the next
957 # part.
957 # part.
958 part.consume()
958 part.consume()
959
959
960 headerblock = self._readpartheader()
960 headerblock = self._readpartheader()
961 indebug(self.ui, b'end of bundle2 stream')
961 indebug(self.ui, b'end of bundle2 stream')
962
962
963 def _readpartheader(self):
963 def _readpartheader(self):
964 """reads a part header size and return the bytes blob
964 """reads a part header size and return the bytes blob
965
965
966 returns None if empty"""
966 returns None if empty"""
967 headersize = self._unpack(_fpartheadersize)[0]
967 headersize = self._unpack(_fpartheadersize)[0]
968 if headersize < 0:
968 if headersize < 0:
969 raise error.BundleValueError(
969 raise error.BundleValueError(
970 b'negative part header size: %i' % headersize
970 b'negative part header size: %i' % headersize
971 )
971 )
972 indebug(self.ui, b'part header size: %i' % headersize)
972 indebug(self.ui, b'part header size: %i' % headersize)
973 if headersize:
973 if headersize:
974 return self._readexact(headersize)
974 return self._readexact(headersize)
975 return None
975 return None
976
976
977 def compressed(self):
977 def compressed(self):
978 self.params # load params
978 self.params # load params
979 return self._compressed
979 return self._compressed
980
980
981 def close(self):
981 def close(self):
982 """close underlying file"""
982 """close underlying file"""
983 if hasattr(self._fp, 'close'):
983 if hasattr(self._fp, 'close'):
984 return self._fp.close()
984 return self._fp.close()
985
985
986
986
987 formatmap = {b'20': unbundle20}
987 formatmap = {b'20': unbundle20}
988
988
989 b2streamparamsmap = {}
989 b2streamparamsmap = {}
990
990
991
991
992 def b2streamparamhandler(name):
992 def b2streamparamhandler(name):
993 """register a handler for a stream level parameter"""
993 """register a handler for a stream level parameter"""
994
994
995 def decorator(func):
995 def decorator(func):
996 assert name not in formatmap
996 assert name not in formatmap
997 b2streamparamsmap[name] = func
997 b2streamparamsmap[name] = func
998 return func
998 return func
999
999
1000 return decorator
1000 return decorator
1001
1001
1002
1002
1003 @b2streamparamhandler(b'compression')
1003 @b2streamparamhandler(b'compression')
1004 def processcompression(unbundler, param, value):
1004 def processcompression(unbundler, param, value):
1005 """read compression parameter and install payload decompression"""
1005 """read compression parameter and install payload decompression"""
1006 if value not in util.compengines.supportedbundletypes:
1006 if value not in util.compengines.supportedbundletypes:
1007 raise error.BundleUnknownFeatureError(params=(param,), values=(value,))
1007 raise error.BundleUnknownFeatureError(params=(param,), values=(value,))
1008 unbundler._compengine = util.compengines.forbundletype(value)
1008 unbundler._compengine = util.compengines.forbundletype(value)
1009 if value is not None:
1009 if value is not None:
1010 unbundler._compressed = True
1010 unbundler._compressed = True
1011
1011
1012
1012
1013 class bundlepart:
1013 class bundlepart:
1014 """A bundle2 part contains application level payload
1014 """A bundle2 part contains application level payload
1015
1015
1016 The part `type` is used to route the part to the application level
1016 The part `type` is used to route the part to the application level
1017 handler.
1017 handler.
1018
1018
1019 The part payload is contained in ``part.data``. It could be raw bytes or a
1019 The part payload is contained in ``part.data``. It could be raw bytes or a
1020 generator of byte chunks.
1020 generator of byte chunks.
1021
1021
1022 You can add parameters to the part using the ``addparam`` method.
1022 You can add parameters to the part using the ``addparam`` method.
1023 Parameters can be either mandatory (default) or advisory. Remote side
1023 Parameters can be either mandatory (default) or advisory. Remote side
1024 should be able to safely ignore the advisory ones.
1024 should be able to safely ignore the advisory ones.
1025
1025
1026 Both data and parameters cannot be modified after the generation has begun.
1026 Both data and parameters cannot be modified after the generation has begun.
1027 """
1027 """
1028
1028
1029 def __init__(
1029 def __init__(
1030 self,
1030 self,
1031 parttype,
1031 parttype,
1032 mandatoryparams=(),
1032 mandatoryparams=(),
1033 advisoryparams=(),
1033 advisoryparams=(),
1034 data=b'',
1034 data=b'',
1035 mandatory=True,
1035 mandatory=True,
1036 ):
1036 ):
1037 validateparttype(parttype)
1037 validateparttype(parttype)
1038 self.id = None
1038 self.id = None
1039 self.type = parttype
1039 self.type = parttype
1040 self._data = data
1040 self._data = data
1041 self._mandatoryparams = list(mandatoryparams)
1041 self._mandatoryparams = list(mandatoryparams)
1042 self._advisoryparams = list(advisoryparams)
1042 self._advisoryparams = list(advisoryparams)
1043 # checking for duplicated entries
1043 # checking for duplicated entries
1044 self._seenparams = set()
1044 self._seenparams = set()
1045 for pname, __ in self._mandatoryparams + self._advisoryparams:
1045 for pname, __ in self._mandatoryparams + self._advisoryparams:
1046 if pname in self._seenparams:
1046 if pname in self._seenparams:
1047 raise error.ProgrammingError(b'duplicated params: %s' % pname)
1047 raise error.ProgrammingError(b'duplicated params: %s' % pname)
1048 self._seenparams.add(pname)
1048 self._seenparams.add(pname)
1049 # status of the part's generation:
1049 # status of the part's generation:
1050 # - None: not started,
1050 # - None: not started,
1051 # - False: currently generated,
1051 # - False: currently generated,
1052 # - True: generation done.
1052 # - True: generation done.
1053 self._generated = None
1053 self._generated = None
1054 self.mandatory = mandatory
1054 self.mandatory = mandatory
1055
1055
1056 def __repr__(self):
1056 def __repr__(self):
1057 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
1057 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
1058 return '<%s object at %x; id: %s; type: %s; mandatory: %s>' % (
1058 return '<%s object at %x; id: %s; type: %s; mandatory: %s>' % (
1059 cls,
1059 cls,
1060 id(self),
1060 id(self),
1061 self.id,
1061 self.id,
1062 self.type,
1062 self.type,
1063 self.mandatory,
1063 self.mandatory,
1064 )
1064 )
1065
1065
1066 def copy(self):
1066 def copy(self):
1067 """return a copy of the part
1067 """return a copy of the part
1068
1068
1069 The new part have the very same content but no partid assigned yet.
1069 The new part have the very same content but no partid assigned yet.
1070 Parts with generated data cannot be copied."""
1070 Parts with generated data cannot be copied."""
1071 assert not hasattr(self.data, 'next')
1071 assert not hasattr(self.data, 'next')
1072 return self.__class__(
1072 return self.__class__(
1073 self.type,
1073 self.type,
1074 self._mandatoryparams,
1074 self._mandatoryparams,
1075 self._advisoryparams,
1075 self._advisoryparams,
1076 self._data,
1076 self._data,
1077 self.mandatory,
1077 self.mandatory,
1078 )
1078 )
1079
1079
1080 # methods used to defines the part content
1080 # methods used to defines the part content
1081 @property
1081 @property
1082 def data(self):
1082 def data(self):
1083 return self._data
1083 return self._data
1084
1084
1085 @data.setter
1085 @data.setter
1086 def data(self, data):
1086 def data(self, data):
1087 if self._generated is not None:
1087 if self._generated is not None:
1088 raise error.ReadOnlyPartError(b'part is being generated')
1088 raise error.ReadOnlyPartError(b'part is being generated')
1089 self._data = data
1089 self._data = data
1090
1090
1091 @property
1091 @property
1092 def mandatoryparams(self):
1092 def mandatoryparams(self):
1093 # make it an immutable tuple to force people through ``addparam``
1093 # make it an immutable tuple to force people through ``addparam``
1094 return tuple(self._mandatoryparams)
1094 return tuple(self._mandatoryparams)
1095
1095
1096 @property
1096 @property
1097 def advisoryparams(self):
1097 def advisoryparams(self):
1098 # make it an immutable tuple to force people through ``addparam``
1098 # make it an immutable tuple to force people through ``addparam``
1099 return tuple(self._advisoryparams)
1099 return tuple(self._advisoryparams)
1100
1100
1101 def addparam(self, name, value=b'', mandatory=True):
1101 def addparam(self, name, value=b'', mandatory=True):
1102 """add a parameter to the part
1102 """add a parameter to the part
1103
1103
1104 If 'mandatory' is set to True, the remote handler must claim support
1104 If 'mandatory' is set to True, the remote handler must claim support
1105 for this parameter or the unbundling will be aborted.
1105 for this parameter or the unbundling will be aborted.
1106
1106
1107 The 'name' and 'value' cannot exceed 255 bytes each.
1107 The 'name' and 'value' cannot exceed 255 bytes each.
1108 """
1108 """
1109 if self._generated is not None:
1109 if self._generated is not None:
1110 raise error.ReadOnlyPartError(b'part is being generated')
1110 raise error.ReadOnlyPartError(b'part is being generated')
1111 if name in self._seenparams:
1111 if name in self._seenparams:
1112 raise ValueError(b'duplicated params: %s' % name)
1112 raise ValueError(b'duplicated params: %s' % name)
1113 self._seenparams.add(name)
1113 self._seenparams.add(name)
1114 params = self._advisoryparams
1114 params = self._advisoryparams
1115 if mandatory:
1115 if mandatory:
1116 params = self._mandatoryparams
1116 params = self._mandatoryparams
1117 params.append((name, value))
1117 params.append((name, value))
1118
1118
1119 # methods used to generates the bundle2 stream
1119 # methods used to generates the bundle2 stream
1120 def getchunks(self, ui):
1120 def getchunks(self, ui):
1121 if self._generated is not None:
1121 if self._generated is not None:
1122 raise error.ProgrammingError(b'part can only be consumed once')
1122 raise error.ProgrammingError(b'part can only be consumed once')
1123 self._generated = False
1123 self._generated = False
1124
1124
1125 if ui.debugflag:
1125 if ui.debugflag:
1126 msg = [b'bundle2-output-part: "%s"' % self.type]
1126 msg = [b'bundle2-output-part: "%s"' % self.type]
1127 if not self.mandatory:
1127 if not self.mandatory:
1128 msg.append(b' (advisory)')
1128 msg.append(b' (advisory)')
1129 nbmp = len(self.mandatoryparams)
1129 nbmp = len(self.mandatoryparams)
1130 nbap = len(self.advisoryparams)
1130 nbap = len(self.advisoryparams)
1131 if nbmp or nbap:
1131 if nbmp or nbap:
1132 msg.append(b' (params:')
1132 msg.append(b' (params:')
1133 if nbmp:
1133 if nbmp:
1134 msg.append(b' %i mandatory' % nbmp)
1134 msg.append(b' %i mandatory' % nbmp)
1135 if nbap:
1135 if nbap:
1136 msg.append(b' %i advisory' % nbmp)
1136 msg.append(b' %i advisory' % nbmp)
1137 msg.append(b')')
1137 msg.append(b')')
1138 if not self.data:
1138 if not self.data:
1139 msg.append(b' empty payload')
1139 msg.append(b' empty payload')
1140 elif hasattr(self.data, 'next') or hasattr(self.data, '__next__'):
1140 elif hasattr(self.data, 'next') or hasattr(self.data, '__next__'):
1141 msg.append(b' streamed payload')
1141 msg.append(b' streamed payload')
1142 else:
1142 else:
1143 msg.append(b' %i bytes payload' % len(self.data))
1143 msg.append(b' %i bytes payload' % len(self.data))
1144 msg.append(b'\n')
1144 msg.append(b'\n')
1145 ui.debug(b''.join(msg))
1145 ui.debug(b''.join(msg))
1146
1146
1147 #### header
1147 #### header
1148 if self.mandatory:
1148 if self.mandatory:
1149 parttype = self.type.upper()
1149 parttype = self.type.upper()
1150 else:
1150 else:
1151 parttype = self.type.lower()
1151 parttype = self.type.lower()
1152 outdebug(ui, b'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1152 outdebug(ui, b'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1153 ## parttype
1153 ## parttype
1154 header = [
1154 header = [
1155 _pack(_fparttypesize, len(parttype)),
1155 _pack(_fparttypesize, len(parttype)),
1156 parttype,
1156 parttype,
1157 _pack(_fpartid, self.id),
1157 _pack(_fpartid, self.id),
1158 ]
1158 ]
1159 ## parameters
1159 ## parameters
1160 # count
1160 # count
1161 manpar = self.mandatoryparams
1161 manpar = self.mandatoryparams
1162 advpar = self.advisoryparams
1162 advpar = self.advisoryparams
1163 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1163 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1164 # size
1164 # size
1165 parsizes = []
1165 parsizes = []
1166 for key, value in manpar:
1166 for key, value in manpar:
1167 parsizes.append(len(key))
1167 parsizes.append(len(key))
1168 parsizes.append(len(value))
1168 parsizes.append(len(value))
1169 for key, value in advpar:
1169 for key, value in advpar:
1170 parsizes.append(len(key))
1170 parsizes.append(len(key))
1171 parsizes.append(len(value))
1171 parsizes.append(len(value))
1172 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1172 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1173 header.append(paramsizes)
1173 header.append(paramsizes)
1174 # key, value
1174 # key, value
1175 for key, value in manpar:
1175 for key, value in manpar:
1176 header.append(key)
1176 header.append(key)
1177 header.append(value)
1177 header.append(value)
1178 for key, value in advpar:
1178 for key, value in advpar:
1179 header.append(key)
1179 header.append(key)
1180 header.append(value)
1180 header.append(value)
1181 ## finalize header
1181 ## finalize header
1182 try:
1182 try:
1183 headerchunk = b''.join(header)
1183 headerchunk = b''.join(header)
1184 except TypeError:
1184 except TypeError:
1185 raise TypeError(
1185 raise TypeError(
1186 'Found a non-bytes trying to '
1186 'Found a non-bytes trying to '
1187 'build bundle part header: %r' % header
1187 'build bundle part header: %r' % header
1188 )
1188 )
1189 outdebug(ui, b'header chunk size: %i' % len(headerchunk))
1189 outdebug(ui, b'header chunk size: %i' % len(headerchunk))
1190 yield _pack(_fpartheadersize, len(headerchunk))
1190 yield _pack(_fpartheadersize, len(headerchunk))
1191 yield headerchunk
1191 yield headerchunk
1192 ## payload
1192 ## payload
1193 try:
1193 try:
1194 for chunk in self._payloadchunks():
1194 for chunk in self._payloadchunks():
1195 outdebug(ui, b'payload chunk size: %i' % len(chunk))
1195 outdebug(ui, b'payload chunk size: %i' % len(chunk))
1196 yield _pack(_fpayloadsize, len(chunk))
1196 yield _pack(_fpayloadsize, len(chunk))
1197 yield chunk
1197 yield chunk
1198 except GeneratorExit:
1198 except GeneratorExit:
1199 # GeneratorExit means that nobody is listening for our
1199 # GeneratorExit means that nobody is listening for our
1200 # results anyway, so just bail quickly rather than trying
1200 # results anyway, so just bail quickly rather than trying
1201 # to produce an error part.
1201 # to produce an error part.
1202 ui.debug(b'bundle2-generatorexit\n')
1202 ui.debug(b'bundle2-generatorexit\n')
1203 raise
1203 raise
1204 except BaseException as exc:
1204 except BaseException as exc:
1205 bexc = stringutil.forcebytestr(exc)
1205 bexc = stringutil.forcebytestr(exc)
1206 # backup exception data for later
1206 # backup exception data for later
1207 ui.debug(
1207 ui.debug(
1208 b'bundle2-input-stream-interrupt: encoding exception %s' % bexc
1208 b'bundle2-input-stream-interrupt: encoding exception %s' % bexc
1209 )
1209 )
1210 tb = sys.exc_info()[2]
1210 tb = sys.exc_info()[2]
1211 msg = b'unexpected error: %s' % bexc
1211 msg = b'unexpected error: %s' % bexc
1212 interpart = bundlepart(
1212 interpart = bundlepart(
1213 b'error:abort', [(b'message', msg)], mandatory=False
1213 b'error:abort', [(b'message', msg)], mandatory=False
1214 )
1214 )
1215 interpart.id = 0
1215 interpart.id = 0
1216 yield _pack(_fpayloadsize, -1)
1216 yield _pack(_fpayloadsize, -1)
1217 for chunk in interpart.getchunks(ui=ui):
1217 for chunk in interpart.getchunks(ui=ui):
1218 yield chunk
1218 yield chunk
1219 outdebug(ui, b'closing payload chunk')
1219 outdebug(ui, b'closing payload chunk')
1220 # abort current part payload
1220 # abort current part payload
1221 yield _pack(_fpayloadsize, 0)
1221 yield _pack(_fpayloadsize, 0)
1222 pycompat.raisewithtb(exc, tb)
1222 pycompat.raisewithtb(exc, tb)
1223 # end of payload
1223 # end of payload
1224 outdebug(ui, b'closing payload chunk')
1224 outdebug(ui, b'closing payload chunk')
1225 yield _pack(_fpayloadsize, 0)
1225 yield _pack(_fpayloadsize, 0)
1226 self._generated = True
1226 self._generated = True
1227
1227
1228 def _payloadchunks(self):
1228 def _payloadchunks(self):
1229 """yield chunks of a the part payload
1229 """yield chunks of a the part payload
1230
1230
1231 Exists to handle the different methods to provide data to a part."""
1231 Exists to handle the different methods to provide data to a part."""
1232 # we only support fixed size data now.
1232 # we only support fixed size data now.
1233 # This will be improved in the future.
1233 # This will be improved in the future.
1234 if hasattr(self.data, 'next') or hasattr(self.data, '__next__'):
1234 if hasattr(self.data, 'next') or hasattr(self.data, '__next__'):
1235 buff = util.chunkbuffer(self.data)
1235 buff = util.chunkbuffer(self.data)
1236 chunk = buff.read(preferedchunksize)
1236 chunk = buff.read(preferedchunksize)
1237 while chunk:
1237 while chunk:
1238 yield chunk
1238 yield chunk
1239 chunk = buff.read(preferedchunksize)
1239 chunk = buff.read(preferedchunksize)
1240 elif len(self.data):
1240 elif len(self.data):
1241 yield self.data
1241 yield self.data
1242
1242
1243
1243
1244 flaginterrupt = -1
1244 flaginterrupt = -1
1245
1245
1246
1246
1247 class interrupthandler(unpackermixin):
1247 class interrupthandler(unpackermixin):
1248 """read one part and process it with restricted capability
1248 """read one part and process it with restricted capability
1249
1249
1250 This allows to transmit exception raised on the producer size during part
1250 This allows to transmit exception raised on the producer size during part
1251 iteration while the consumer is reading a part.
1251 iteration while the consumer is reading a part.
1252
1252
1253 Part processed in this manner only have access to a ui object,"""
1253 Part processed in this manner only have access to a ui object,"""
1254
1254
1255 def __init__(self, ui, fp):
1255 def __init__(self, ui, fp):
1256 super(interrupthandler, self).__init__(fp)
1256 super(interrupthandler, self).__init__(fp)
1257 self.ui = ui
1257 self.ui = ui
1258
1258
1259 def _readpartheader(self):
1259 def _readpartheader(self):
1260 """reads a part header size and return the bytes blob
1260 """reads a part header size and return the bytes blob
1261
1261
1262 returns None if empty"""
1262 returns None if empty"""
1263 headersize = self._unpack(_fpartheadersize)[0]
1263 headersize = self._unpack(_fpartheadersize)[0]
1264 if headersize < 0:
1264 if headersize < 0:
1265 raise error.BundleValueError(
1265 raise error.BundleValueError(
1266 b'negative part header size: %i' % headersize
1266 b'negative part header size: %i' % headersize
1267 )
1267 )
1268 indebug(self.ui, b'part header size: %i\n' % headersize)
1268 indebug(self.ui, b'part header size: %i\n' % headersize)
1269 if headersize:
1269 if headersize:
1270 return self._readexact(headersize)
1270 return self._readexact(headersize)
1271 return None
1271 return None
1272
1272
1273 def __call__(self):
1273 def __call__(self):
1274
1274
1275 self.ui.debug(
1275 self.ui.debug(
1276 b'bundle2-input-stream-interrupt: opening out of band context\n'
1276 b'bundle2-input-stream-interrupt: opening out of band context\n'
1277 )
1277 )
1278 indebug(self.ui, b'bundle2 stream interruption, looking for a part.')
1278 indebug(self.ui, b'bundle2 stream interruption, looking for a part.')
1279 headerblock = self._readpartheader()
1279 headerblock = self._readpartheader()
1280 if headerblock is None:
1280 if headerblock is None:
1281 indebug(self.ui, b'no part found during interruption.')
1281 indebug(self.ui, b'no part found during interruption.')
1282 return
1282 return
1283 part = unbundlepart(self.ui, headerblock, self._fp)
1283 part = unbundlepart(self.ui, headerblock, self._fp)
1284 op = interruptoperation(self.ui)
1284 op = interruptoperation(self.ui)
1285 hardabort = False
1285 hardabort = False
1286 try:
1286 try:
1287 _processpart(op, part)
1287 _processpart(op, part)
1288 except (SystemExit, KeyboardInterrupt):
1288 except (SystemExit, KeyboardInterrupt):
1289 hardabort = True
1289 hardabort = True
1290 raise
1290 raise
1291 finally:
1291 finally:
1292 if not hardabort:
1292 if not hardabort:
1293 part.consume()
1293 part.consume()
1294 self.ui.debug(
1294 self.ui.debug(
1295 b'bundle2-input-stream-interrupt: closing out of band context\n'
1295 b'bundle2-input-stream-interrupt: closing out of band context\n'
1296 )
1296 )
1297
1297
1298
1298
1299 class interruptoperation:
1299 class interruptoperation:
1300 """A limited operation to be use by part handler during interruption
1300 """A limited operation to be use by part handler during interruption
1301
1301
1302 It only have access to an ui object.
1302 It only have access to an ui object.
1303 """
1303 """
1304
1304
1305 def __init__(self, ui):
1305 def __init__(self, ui):
1306 self.ui = ui
1306 self.ui = ui
1307 self.reply = None
1307 self.reply = None
1308 self.captureoutput = False
1308 self.captureoutput = False
1309
1309
1310 @property
1310 @property
1311 def repo(self):
1311 def repo(self):
1312 raise error.ProgrammingError(b'no repo access from stream interruption')
1312 raise error.ProgrammingError(b'no repo access from stream interruption')
1313
1313
1314 def gettransaction(self):
1314 def gettransaction(self):
1315 raise TransactionUnavailable(b'no repo access from stream interruption')
1315 raise TransactionUnavailable(b'no repo access from stream interruption')
1316
1316
1317
1317
1318 def decodepayloadchunks(ui, fh):
1318 def decodepayloadchunks(ui, fh):
1319 """Reads bundle2 part payload data into chunks.
1319 """Reads bundle2 part payload data into chunks.
1320
1320
1321 Part payload data consists of framed chunks. This function takes
1321 Part payload data consists of framed chunks. This function takes
1322 a file handle and emits those chunks.
1322 a file handle and emits those chunks.
1323 """
1323 """
1324 dolog = ui.configbool(b'devel', b'bundle2.debug')
1324 dolog = ui.configbool(b'devel', b'bundle2.debug')
1325 debug = ui.debug
1325 debug = ui.debug
1326
1326
1327 headerstruct = struct.Struct(_fpayloadsize)
1327 headerstruct = struct.Struct(_fpayloadsize)
1328 headersize = headerstruct.size
1328 headersize = headerstruct.size
1329 unpack = headerstruct.unpack
1329 unpack = headerstruct.unpack
1330
1330
1331 readexactly = changegroup.readexactly
1331 readexactly = changegroup.readexactly
1332 read = fh.read
1332 read = fh.read
1333
1333
1334 chunksize = unpack(readexactly(fh, headersize))[0]
1334 chunksize = unpack(readexactly(fh, headersize))[0]
1335 indebug(ui, b'payload chunk size: %i' % chunksize)
1335 indebug(ui, b'payload chunk size: %i' % chunksize)
1336
1336
1337 # changegroup.readexactly() is inlined below for performance.
1337 # changegroup.readexactly() is inlined below for performance.
1338 while chunksize:
1338 while chunksize:
1339 if chunksize >= 0:
1339 if chunksize >= 0:
1340 s = read(chunksize)
1340 s = read(chunksize)
1341 if len(s) < chunksize:
1341 if len(s) < chunksize:
1342 raise error.Abort(
1342 raise error.Abort(
1343 _(
1343 _(
1344 b'stream ended unexpectedly '
1344 b'stream ended unexpectedly '
1345 b' (got %d bytes, expected %d)'
1345 b' (got %d bytes, expected %d)'
1346 )
1346 )
1347 % (len(s), chunksize)
1347 % (len(s), chunksize)
1348 )
1348 )
1349
1349
1350 yield s
1350 yield s
1351 elif chunksize == flaginterrupt:
1351 elif chunksize == flaginterrupt:
1352 # Interrupt "signal" detected. The regular stream is interrupted
1352 # Interrupt "signal" detected. The regular stream is interrupted
1353 # and a bundle2 part follows. Consume it.
1353 # and a bundle2 part follows. Consume it.
1354 interrupthandler(ui, fh)()
1354 interrupthandler(ui, fh)()
1355 else:
1355 else:
1356 raise error.BundleValueError(
1356 raise error.BundleValueError(
1357 b'negative payload chunk size: %s' % chunksize
1357 b'negative payload chunk size: %s' % chunksize
1358 )
1358 )
1359
1359
1360 s = read(headersize)
1360 s = read(headersize)
1361 if len(s) < headersize:
1361 if len(s) < headersize:
1362 raise error.Abort(
1362 raise error.Abort(
1363 _(b'stream ended unexpectedly (got %d bytes, expected %d)')
1363 _(b'stream ended unexpectedly (got %d bytes, expected %d)')
1364 % (len(s), chunksize)
1364 % (len(s), chunksize)
1365 )
1365 )
1366
1366
1367 chunksize = unpack(s)[0]
1367 chunksize = unpack(s)[0]
1368
1368
1369 # indebug() inlined for performance.
1369 # indebug() inlined for performance.
1370 if dolog:
1370 if dolog:
1371 debug(b'bundle2-input: payload chunk size: %i\n' % chunksize)
1371 debug(b'bundle2-input: payload chunk size: %i\n' % chunksize)
1372
1372
1373
1373
1374 class unbundlepart(unpackermixin):
1374 class unbundlepart(unpackermixin):
1375 """a bundle part read from a bundle"""
1375 """a bundle part read from a bundle"""
1376
1376
1377 def __init__(self, ui, header, fp):
1377 def __init__(self, ui, header, fp):
1378 super(unbundlepart, self).__init__(fp)
1378 super(unbundlepart, self).__init__(fp)
1379 self._seekable = hasattr(fp, 'seek') and hasattr(fp, 'tell')
1379 self._seekable = hasattr(fp, 'seek') and hasattr(fp, 'tell')
1380 self.ui = ui
1380 self.ui = ui
1381 # unbundle state attr
1381 # unbundle state attr
1382 self._headerdata = header
1382 self._headerdata = header
1383 self._headeroffset = 0
1383 self._headeroffset = 0
1384 self._initialized = False
1384 self._initialized = False
1385 self.consumed = False
1385 self.consumed = False
1386 # part data
1386 # part data
1387 self.id = None
1387 self.id = None
1388 self.type = None
1388 self.type = None
1389 self.mandatoryparams = None
1389 self.mandatoryparams = None
1390 self.advisoryparams = None
1390 self.advisoryparams = None
1391 self.params = None
1391 self.params = None
1392 self.mandatorykeys = ()
1392 self.mandatorykeys = ()
1393 self._readheader()
1393 self._readheader()
1394 self._mandatory = None
1394 self._mandatory = None
1395 self._pos = 0
1395 self._pos = 0
1396
1396
1397 def _fromheader(self, size):
1397 def _fromheader(self, size):
1398 """return the next <size> byte from the header"""
1398 """return the next <size> byte from the header"""
1399 offset = self._headeroffset
1399 offset = self._headeroffset
1400 data = self._headerdata[offset : (offset + size)]
1400 data = self._headerdata[offset : (offset + size)]
1401 self._headeroffset = offset + size
1401 self._headeroffset = offset + size
1402 return data
1402 return data
1403
1403
1404 def _unpackheader(self, format):
1404 def _unpackheader(self, format):
1405 """read given format from header
1405 """read given format from header
1406
1406
1407 This automatically compute the size of the format to read."""
1407 This automatically compute the size of the format to read."""
1408 data = self._fromheader(struct.calcsize(format))
1408 data = self._fromheader(struct.calcsize(format))
1409 return _unpack(format, data)
1409 return _unpack(format, data)
1410
1410
1411 def _initparams(self, mandatoryparams, advisoryparams):
1411 def _initparams(self, mandatoryparams, advisoryparams):
1412 """internal function to setup all logic related parameters"""
1412 """internal function to setup all logic related parameters"""
1413 # make it read only to prevent people touching it by mistake.
1413 # make it read only to prevent people touching it by mistake.
1414 self.mandatoryparams = tuple(mandatoryparams)
1414 self.mandatoryparams = tuple(mandatoryparams)
1415 self.advisoryparams = tuple(advisoryparams)
1415 self.advisoryparams = tuple(advisoryparams)
1416 # user friendly UI
1416 # user friendly UI
1417 self.params = util.sortdict(self.mandatoryparams)
1417 self.params = util.sortdict(self.mandatoryparams)
1418 self.params.update(self.advisoryparams)
1418 self.params.update(self.advisoryparams)
1419 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1419 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1420
1420
1421 def _readheader(self):
1421 def _readheader(self):
1422 """read the header and setup the object"""
1422 """read the header and setup the object"""
1423 typesize = self._unpackheader(_fparttypesize)[0]
1423 typesize = self._unpackheader(_fparttypesize)[0]
1424 self.type = self._fromheader(typesize)
1424 self.type = self._fromheader(typesize)
1425 indebug(self.ui, b'part type: "%s"' % self.type)
1425 indebug(self.ui, b'part type: "%s"' % self.type)
1426 self.id = self._unpackheader(_fpartid)[0]
1426 self.id = self._unpackheader(_fpartid)[0]
1427 indebug(self.ui, b'part id: "%s"' % pycompat.bytestr(self.id))
1427 indebug(self.ui, b'part id: "%s"' % pycompat.bytestr(self.id))
1428 # extract mandatory bit from type
1428 # extract mandatory bit from type
1429 self.mandatory = self.type != self.type.lower()
1429 self.mandatory = self.type != self.type.lower()
1430 self.type = self.type.lower()
1430 self.type = self.type.lower()
1431 ## reading parameters
1431 ## reading parameters
1432 # param count
1432 # param count
1433 mancount, advcount = self._unpackheader(_fpartparamcount)
1433 mancount, advcount = self._unpackheader(_fpartparamcount)
1434 indebug(self.ui, b'part parameters: %i' % (mancount + advcount))
1434 indebug(self.ui, b'part parameters: %i' % (mancount + advcount))
1435 # param size
1435 # param size
1436 fparamsizes = _makefpartparamsizes(mancount + advcount)
1436 fparamsizes = _makefpartparamsizes(mancount + advcount)
1437 paramsizes = self._unpackheader(fparamsizes)
1437 paramsizes = self._unpackheader(fparamsizes)
1438 # make it a list of couple again
1438 # make it a list of couple again
1439 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1439 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1440 # split mandatory from advisory
1440 # split mandatory from advisory
1441 mansizes = paramsizes[:mancount]
1441 mansizes = paramsizes[:mancount]
1442 advsizes = paramsizes[mancount:]
1442 advsizes = paramsizes[mancount:]
1443 # retrieve param value
1443 # retrieve param value
1444 manparams = []
1444 manparams = []
1445 for key, value in mansizes:
1445 for key, value in mansizes:
1446 manparams.append((self._fromheader(key), self._fromheader(value)))
1446 manparams.append((self._fromheader(key), self._fromheader(value)))
1447 advparams = []
1447 advparams = []
1448 for key, value in advsizes:
1448 for key, value in advsizes:
1449 advparams.append((self._fromheader(key), self._fromheader(value)))
1449 advparams.append((self._fromheader(key), self._fromheader(value)))
1450 self._initparams(manparams, advparams)
1450 self._initparams(manparams, advparams)
1451 ## part payload
1451 ## part payload
1452 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1452 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1453 # we read the data, tell it
1453 # we read the data, tell it
1454 self._initialized = True
1454 self._initialized = True
1455
1455
1456 def _payloadchunks(self):
1456 def _payloadchunks(self):
1457 """Generator of decoded chunks in the payload."""
1457 """Generator of decoded chunks in the payload."""
1458 return decodepayloadchunks(self.ui, self._fp)
1458 return decodepayloadchunks(self.ui, self._fp)
1459
1459
1460 def consume(self):
1460 def consume(self):
1461 """Read the part payload until completion.
1461 """Read the part payload until completion.
1462
1462
1463 By consuming the part data, the underlying stream read offset will
1463 By consuming the part data, the underlying stream read offset will
1464 be advanced to the next part (or end of stream).
1464 be advanced to the next part (or end of stream).
1465 """
1465 """
1466 if self.consumed:
1466 if self.consumed:
1467 return
1467 return
1468
1468
1469 chunk = self.read(32768)
1469 chunk = self.read(32768)
1470 while chunk:
1470 while chunk:
1471 self._pos += len(chunk)
1471 self._pos += len(chunk)
1472 chunk = self.read(32768)
1472 chunk = self.read(32768)
1473
1473
1474 def read(self, size=None):
1474 def read(self, size=None):
1475 """read payload data"""
1475 """read payload data"""
1476 if not self._initialized:
1476 if not self._initialized:
1477 self._readheader()
1477 self._readheader()
1478 if size is None:
1478 if size is None:
1479 data = self._payloadstream.read()
1479 data = self._payloadstream.read()
1480 else:
1480 else:
1481 data = self._payloadstream.read(size)
1481 data = self._payloadstream.read(size)
1482 self._pos += len(data)
1482 self._pos += len(data)
1483 if size is None or len(data) < size:
1483 if size is None or len(data) < size:
1484 if not self.consumed and self._pos:
1484 if not self.consumed and self._pos:
1485 self.ui.debug(
1485 self.ui.debug(
1486 b'bundle2-input-part: total payload size %i\n' % self._pos
1486 b'bundle2-input-part: total payload size %i\n' % self._pos
1487 )
1487 )
1488 self.consumed = True
1488 self.consumed = True
1489 return data
1489 return data
1490
1490
1491
1491
1492 class seekableunbundlepart(unbundlepart):
1492 class seekableunbundlepart(unbundlepart):
1493 """A bundle2 part in a bundle that is seekable.
1493 """A bundle2 part in a bundle that is seekable.
1494
1494
1495 Regular ``unbundlepart`` instances can only be read once. This class
1495 Regular ``unbundlepart`` instances can only be read once. This class
1496 extends ``unbundlepart`` to enable bi-directional seeking within the
1496 extends ``unbundlepart`` to enable bi-directional seeking within the
1497 part.
1497 part.
1498
1498
1499 Bundle2 part data consists of framed chunks. Offsets when seeking
1499 Bundle2 part data consists of framed chunks. Offsets when seeking
1500 refer to the decoded data, not the offsets in the underlying bundle2
1500 refer to the decoded data, not the offsets in the underlying bundle2
1501 stream.
1501 stream.
1502
1502
1503 To facilitate quickly seeking within the decoded data, instances of this
1503 To facilitate quickly seeking within the decoded data, instances of this
1504 class maintain a mapping between offsets in the underlying stream and
1504 class maintain a mapping between offsets in the underlying stream and
1505 the decoded payload. This mapping will consume memory in proportion
1505 the decoded payload. This mapping will consume memory in proportion
1506 to the number of chunks within the payload (which almost certainly
1506 to the number of chunks within the payload (which almost certainly
1507 increases in proportion with the size of the part).
1507 increases in proportion with the size of the part).
1508 """
1508 """
1509
1509
1510 def __init__(self, ui, header, fp):
1510 def __init__(self, ui, header, fp):
1511 # (payload, file) offsets for chunk starts.
1511 # (payload, file) offsets for chunk starts.
1512 self._chunkindex = []
1512 self._chunkindex = []
1513
1513
1514 super(seekableunbundlepart, self).__init__(ui, header, fp)
1514 super(seekableunbundlepart, self).__init__(ui, header, fp)
1515
1515
1516 def _payloadchunks(self, chunknum=0):
1516 def _payloadchunks(self, chunknum=0):
1517 '''seek to specified chunk and start yielding data'''
1517 '''seek to specified chunk and start yielding data'''
1518 if len(self._chunkindex) == 0:
1518 if len(self._chunkindex) == 0:
1519 assert chunknum == 0, b'Must start with chunk 0'
1519 assert chunknum == 0, b'Must start with chunk 0'
1520 self._chunkindex.append((0, self._tellfp()))
1520 self._chunkindex.append((0, self._tellfp()))
1521 else:
1521 else:
1522 assert chunknum < len(self._chunkindex), (
1522 assert chunknum < len(self._chunkindex), (
1523 b'Unknown chunk %d' % chunknum
1523 b'Unknown chunk %d' % chunknum
1524 )
1524 )
1525 self._seekfp(self._chunkindex[chunknum][1])
1525 self._seekfp(self._chunkindex[chunknum][1])
1526
1526
1527 pos = self._chunkindex[chunknum][0]
1527 pos = self._chunkindex[chunknum][0]
1528
1528
1529 for chunk in decodepayloadchunks(self.ui, self._fp):
1529 for chunk in decodepayloadchunks(self.ui, self._fp):
1530 chunknum += 1
1530 chunknum += 1
1531 pos += len(chunk)
1531 pos += len(chunk)
1532 if chunknum == len(self._chunkindex):
1532 if chunknum == len(self._chunkindex):
1533 self._chunkindex.append((pos, self._tellfp()))
1533 self._chunkindex.append((pos, self._tellfp()))
1534
1534
1535 yield chunk
1535 yield chunk
1536
1536
1537 def _findchunk(self, pos):
1537 def _findchunk(self, pos):
1538 '''for a given payload position, return a chunk number and offset'''
1538 '''for a given payload position, return a chunk number and offset'''
1539 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1539 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1540 if ppos == pos:
1540 if ppos == pos:
1541 return chunk, 0
1541 return chunk, 0
1542 elif ppos > pos:
1542 elif ppos > pos:
1543 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1543 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1544 raise ValueError(b'Unknown chunk')
1544 raise ValueError(b'Unknown chunk')
1545
1545
1546 def tell(self):
1546 def tell(self):
1547 return self._pos
1547 return self._pos
1548
1548
1549 def seek(self, offset, whence=os.SEEK_SET):
1549 def seek(self, offset, whence=os.SEEK_SET):
1550 if whence == os.SEEK_SET:
1550 if whence == os.SEEK_SET:
1551 newpos = offset
1551 newpos = offset
1552 elif whence == os.SEEK_CUR:
1552 elif whence == os.SEEK_CUR:
1553 newpos = self._pos + offset
1553 newpos = self._pos + offset
1554 elif whence == os.SEEK_END:
1554 elif whence == os.SEEK_END:
1555 if not self.consumed:
1555 if not self.consumed:
1556 # Can't use self.consume() here because it advances self._pos.
1556 # Can't use self.consume() here because it advances self._pos.
1557 chunk = self.read(32768)
1557 chunk = self.read(32768)
1558 while chunk:
1558 while chunk:
1559 chunk = self.read(32768)
1559 chunk = self.read(32768)
1560 newpos = self._chunkindex[-1][0] - offset
1560 newpos = self._chunkindex[-1][0] - offset
1561 else:
1561 else:
1562 raise ValueError(b'Unknown whence value: %r' % (whence,))
1562 raise ValueError(b'Unknown whence value: %r' % (whence,))
1563
1563
1564 if newpos > self._chunkindex[-1][0] and not self.consumed:
1564 if newpos > self._chunkindex[-1][0] and not self.consumed:
1565 # Can't use self.consume() here because it advances self._pos.
1565 # Can't use self.consume() here because it advances self._pos.
1566 chunk = self.read(32768)
1566 chunk = self.read(32768)
1567 while chunk:
1567 while chunk:
1568 chunk = self.read(32668)
1568 chunk = self.read(32668)
1569
1569
1570 if not 0 <= newpos <= self._chunkindex[-1][0]:
1570 if not 0 <= newpos <= self._chunkindex[-1][0]:
1571 raise ValueError(b'Offset out of range')
1571 raise ValueError(b'Offset out of range')
1572
1572
1573 if self._pos != newpos:
1573 if self._pos != newpos:
1574 chunk, internaloffset = self._findchunk(newpos)
1574 chunk, internaloffset = self._findchunk(newpos)
1575 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1575 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1576 adjust = self.read(internaloffset)
1576 adjust = self.read(internaloffset)
1577 if len(adjust) != internaloffset:
1577 if len(adjust) != internaloffset:
1578 raise error.Abort(_(b'Seek failed\n'))
1578 raise error.Abort(_(b'Seek failed\n'))
1579 self._pos = newpos
1579 self._pos = newpos
1580
1580
1581 def _seekfp(self, offset, whence=0):
1581 def _seekfp(self, offset, whence=0):
1582 """move the underlying file pointer
1582 """move the underlying file pointer
1583
1583
1584 This method is meant for internal usage by the bundle2 protocol only.
1584 This method is meant for internal usage by the bundle2 protocol only.
1585 They directly manipulate the low level stream including bundle2 level
1585 They directly manipulate the low level stream including bundle2 level
1586 instruction.
1586 instruction.
1587
1587
1588 Do not use it to implement higher-level logic or methods."""
1588 Do not use it to implement higher-level logic or methods."""
1589 if self._seekable:
1589 if self._seekable:
1590 return self._fp.seek(offset, whence)
1590 return self._fp.seek(offset, whence)
1591 else:
1591 else:
1592 raise NotImplementedError(_(b'File pointer is not seekable'))
1592 raise NotImplementedError(_(b'File pointer is not seekable'))
1593
1593
1594 def _tellfp(self):
1594 def _tellfp(self):
1595 """return the file offset, or None if file is not seekable
1595 """return the file offset, or None if file is not seekable
1596
1596
1597 This method is meant for internal usage by the bundle2 protocol only.
1597 This method is meant for internal usage by the bundle2 protocol only.
1598 They directly manipulate the low level stream including bundle2 level
1598 They directly manipulate the low level stream including bundle2 level
1599 instruction.
1599 instruction.
1600
1600
1601 Do not use it to implement higher-level logic or methods."""
1601 Do not use it to implement higher-level logic or methods."""
1602 if self._seekable:
1602 if self._seekable:
1603 try:
1603 try:
1604 return self._fp.tell()
1604 return self._fp.tell()
1605 except IOError as e:
1605 except IOError as e:
1606 if e.errno == errno.ESPIPE:
1606 if e.errno == errno.ESPIPE:
1607 self._seekable = False
1607 self._seekable = False
1608 else:
1608 else:
1609 raise
1609 raise
1610 return None
1610 return None
1611
1611
1612
1612
1613 # These are only the static capabilities.
1613 # These are only the static capabilities.
1614 # Check the 'getrepocaps' function for the rest.
1614 # Check the 'getrepocaps' function for the rest.
1615 capabilities = {
1615 capabilities = {
1616 b'HG20': (),
1616 b'HG20': (),
1617 b'bookmarks': (),
1617 b'bookmarks': (),
1618 b'error': (b'abort', b'unsupportedcontent', b'pushraced', b'pushkey'),
1618 b'error': (b'abort', b'unsupportedcontent', b'pushraced', b'pushkey'),
1619 b'listkeys': (),
1619 b'listkeys': (),
1620 b'pushkey': (),
1620 b'pushkey': (),
1621 b'digests': tuple(sorted(util.DIGESTS.keys())),
1621 b'digests': tuple(sorted(util.DIGESTS.keys())),
1622 b'remote-changegroup': (b'http', b'https'),
1622 b'remote-changegroup': (b'http', b'https'),
1623 b'hgtagsfnodes': (),
1623 b'hgtagsfnodes': (),
1624 b'phases': (b'heads',),
1624 b'phases': (b'heads',),
1625 b'stream': (b'v2',),
1625 b'stream': (b'v2',),
1626 }
1626 }
1627
1627
1628
1628
1629 def getrepocaps(repo, allowpushback=False, role=None):
1629 def getrepocaps(repo, allowpushback=False, role=None):
1630 """return the bundle2 capabilities for a given repo
1630 """return the bundle2 capabilities for a given repo
1631
1631
1632 Exists to allow extensions (like evolution) to mutate the capabilities.
1632 Exists to allow extensions (like evolution) to mutate the capabilities.
1633
1633
1634 The returned value is used for servers advertising their capabilities as
1634 The returned value is used for servers advertising their capabilities as
1635 well as clients advertising their capabilities to servers as part of
1635 well as clients advertising their capabilities to servers as part of
1636 bundle2 requests. The ``role`` argument specifies which is which.
1636 bundle2 requests. The ``role`` argument specifies which is which.
1637 """
1637 """
1638 if role not in (b'client', b'server'):
1638 if role not in (b'client', b'server'):
1639 raise error.ProgrammingError(b'role argument must be client or server')
1639 raise error.ProgrammingError(b'role argument must be client or server')
1640
1640
1641 caps = capabilities.copy()
1641 caps = capabilities.copy()
1642 caps[b'changegroup'] = tuple(
1642 caps[b'changegroup'] = tuple(
1643 sorted(changegroup.supportedincomingversions(repo))
1643 sorted(changegroup.supportedincomingversions(repo))
1644 )
1644 )
1645 if obsolete.isenabled(repo, obsolete.exchangeopt):
1645 if obsolete.isenabled(repo, obsolete.exchangeopt):
1646 supportedformat = tuple(b'V%i' % v for v in obsolete.formats)
1646 supportedformat = tuple(b'V%i' % v for v in obsolete.formats)
1647 caps[b'obsmarkers'] = supportedformat
1647 caps[b'obsmarkers'] = supportedformat
1648 if allowpushback:
1648 if allowpushback:
1649 caps[b'pushback'] = ()
1649 caps[b'pushback'] = ()
1650 cpmode = repo.ui.config(b'server', b'concurrent-push-mode')
1650 cpmode = repo.ui.config(b'server', b'concurrent-push-mode')
1651 if cpmode == b'check-related':
1651 if cpmode == b'check-related':
1652 caps[b'checkheads'] = (b'related',)
1652 caps[b'checkheads'] = (b'related',)
1653 if b'phases' in repo.ui.configlist(b'devel', b'legacy.exchange'):
1653 if b'phases' in repo.ui.configlist(b'devel', b'legacy.exchange'):
1654 caps.pop(b'phases')
1654 caps.pop(b'phases')
1655
1655
1656 # Don't advertise stream clone support in server mode if not configured.
1656 # Don't advertise stream clone support in server mode if not configured.
1657 if role == b'server':
1657 if role == b'server':
1658 streamsupported = repo.ui.configbool(
1658 streamsupported = repo.ui.configbool(
1659 b'server', b'uncompressed', untrusted=True
1659 b'server', b'uncompressed', untrusted=True
1660 )
1660 )
1661 featuresupported = repo.ui.configbool(b'server', b'bundle2.stream')
1661 featuresupported = repo.ui.configbool(b'server', b'bundle2.stream')
1662
1662
1663 if not streamsupported or not featuresupported:
1663 if not streamsupported or not featuresupported:
1664 caps.pop(b'stream')
1664 caps.pop(b'stream')
1665 # Else always advertise support on client, because payload support
1665 # Else always advertise support on client, because payload support
1666 # should always be advertised.
1666 # should always be advertised.
1667
1667
1668 if repo.ui.configbool(b'experimental', b'stream-v3'):
1668 if repo.ui.configbool(b'experimental', b'stream-v3'):
1669 if b'stream' in caps:
1669 if b'stream' in caps:
1670 caps[b'stream'] += (b'v3-exp',)
1670 caps[b'stream'] += (b'v3-exp',)
1671
1671
1672 # b'rev-branch-cache is no longer advertised, but still supported
1672 # b'rev-branch-cache is no longer advertised, but still supported
1673 # for legacy clients.
1673 # for legacy clients.
1674
1674
1675 return caps
1675 return caps
1676
1676
1677
1677
1678 def bundle2caps(remote):
1678 def bundle2caps(remote):
1679 """return the bundle capabilities of a peer as dict"""
1679 """return the bundle capabilities of a peer as dict"""
1680 raw = remote.capable(b'bundle2')
1680 raw = remote.capable(b'bundle2')
1681 if not raw and raw != b'':
1681 if not raw and raw != b'':
1682 return {}
1682 return {}
1683 capsblob = urlreq.unquote(remote.capable(b'bundle2'))
1683 capsblob = urlreq.unquote(remote.capable(b'bundle2'))
1684 return decodecaps(capsblob)
1684 return decodecaps(capsblob)
1685
1685
1686
1686
1687 def obsmarkersversion(caps):
1687 def obsmarkersversion(caps):
1688 """extract the list of supported obsmarkers versions from a bundle2caps dict"""
1688 """extract the list of supported obsmarkers versions from a bundle2caps dict"""
1689 obscaps = caps.get(b'obsmarkers', ())
1689 obscaps = caps.get(b'obsmarkers', ())
1690 return [int(c[1:]) for c in obscaps if c.startswith(b'V')]
1690 return [int(c[1:]) for c in obscaps if c.startswith(b'V')]
1691
1691
1692
1692
1693 def writenewbundle(
1693 def writenewbundle(
1694 ui,
1694 ui,
1695 repo,
1695 repo,
1696 source,
1696 source,
1697 filename,
1697 filename,
1698 bundletype,
1698 bundletype,
1699 outgoing,
1699 outgoing,
1700 opts,
1700 opts,
1701 vfs=None,
1701 vfs=None,
1702 compression=None,
1702 compression=None,
1703 compopts=None,
1703 compopts=None,
1704 allow_internal=False,
1704 allow_internal=False,
1705 ):
1705 ):
1706 if bundletype.startswith(b'HG10'):
1706 if bundletype.startswith(b'HG10'):
1707 cg = changegroup.makechangegroup(repo, outgoing, b'01', source)
1707 cg = changegroup.makechangegroup(repo, outgoing, b'01', source)
1708 return writebundle(
1708 return writebundle(
1709 ui,
1709 ui,
1710 cg,
1710 cg,
1711 filename,
1711 filename,
1712 bundletype,
1712 bundletype,
1713 vfs=vfs,
1713 vfs=vfs,
1714 compression=compression,
1714 compression=compression,
1715 compopts=compopts,
1715 compopts=compopts,
1716 )
1716 )
1717 elif not bundletype.startswith(b'HG20'):
1717 elif not bundletype.startswith(b'HG20'):
1718 raise error.ProgrammingError(b'unknown bundle type: %s' % bundletype)
1718 raise error.ProgrammingError(b'unknown bundle type: %s' % bundletype)
1719
1719
1720 # enforce that no internal phase are to be bundled
1720 # enforce that no internal phase are to be bundled
1721 bundled_internal = repo.revs(b"%ln and _internal()", outgoing.ancestorsof)
1721 bundled_internal = repo.revs(b"%ln and _internal()", outgoing.ancestorsof)
1722 if bundled_internal and not allow_internal:
1722 if bundled_internal and not allow_internal:
1723 count = len(repo.revs(b'%ln and _internal()', outgoing.missing))
1723 count = len(repo.revs(b'%ln and _internal()', outgoing.missing))
1724 msg = "backup bundle would contains %d internal changesets"
1724 msg = "backup bundle would contains %d internal changesets"
1725 msg %= count
1725 msg %= count
1726 raise error.ProgrammingError(msg)
1726 raise error.ProgrammingError(msg)
1727
1727
1728 caps = {}
1728 caps = {}
1729 if opts.get(b'obsolescence', False):
1729 if opts.get(b'obsolescence', False):
1730 caps[b'obsmarkers'] = (b'V1',)
1730 caps[b'obsmarkers'] = (b'V1',)
1731 if opts.get(b'streamv2'):
1731 stream_version = opts.get(b'stream', b"")
1732 if stream_version == b"v2":
1732 caps[b'stream'] = [b'v2']
1733 caps[b'stream'] = [b'v2']
1733 elif opts.get(b'streamv3-exp'):
1734 elif stream_version == b"v3-exp":
1734 caps[b'stream'] = [b'v3-exp']
1735 caps[b'stream'] = [b'v3-exp']
1735 bundle = bundle20(ui, caps)
1736 bundle = bundle20(ui, caps)
1736 bundle.setcompression(compression, compopts)
1737 bundle.setcompression(compression, compopts)
1737 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1738 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1738 chunkiter = bundle.getchunks()
1739 chunkiter = bundle.getchunks()
1739
1740
1740 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1741 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1741
1742
1742
1743
1743 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1744 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1744 # We should eventually reconcile this logic with the one behind
1745 # We should eventually reconcile this logic with the one behind
1745 # 'exchange.getbundle2partsgenerator'.
1746 # 'exchange.getbundle2partsgenerator'.
1746 #
1747 #
1747 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1748 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1748 # different right now. So we keep them separated for now for the sake of
1749 # different right now. So we keep them separated for now for the sake of
1749 # simplicity.
1750 # simplicity.
1750
1751
1751 # we might not always want a changegroup in such bundle, for example in
1752 # we might not always want a changegroup in such bundle, for example in
1752 # stream bundles
1753 # stream bundles
1753 if opts.get(b'changegroup', True):
1754 if opts.get(b'changegroup', True):
1754 cgversion = opts.get(b'cg.version')
1755 cgversion = opts.get(b'cg.version')
1755 if cgversion is None:
1756 if cgversion is None:
1756 cgversion = changegroup.safeversion(repo)
1757 cgversion = changegroup.safeversion(repo)
1757 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1758 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1758 part = bundler.newpart(b'changegroup', data=cg.getchunks())
1759 part = bundler.newpart(b'changegroup', data=cg.getchunks())
1759 part.addparam(b'version', cg.version)
1760 part.addparam(b'version', cg.version)
1760 if b'clcount' in cg.extras:
1761 if b'clcount' in cg.extras:
1761 part.addparam(
1762 part.addparam(
1762 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1763 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1763 )
1764 )
1764 if opts.get(b'phases'):
1765 if opts.get(b'phases'):
1765 target_phase = phases.draft
1766 target_phase = phases.draft
1766 for head in outgoing.ancestorsof:
1767 for head in outgoing.ancestorsof:
1767 target_phase = max(target_phase, repo[head].phase())
1768 target_phase = max(target_phase, repo[head].phase())
1768 if target_phase > phases.draft:
1769 if target_phase > phases.draft:
1769 part.addparam(
1770 part.addparam(
1770 b'targetphase',
1771 b'targetphase',
1771 b'%d' % target_phase,
1772 b'%d' % target_phase,
1772 mandatory=False,
1773 mandatory=False,
1773 )
1774 )
1774 if repository.REPO_FEATURE_SIDE_DATA in repo.features:
1775 if repository.REPO_FEATURE_SIDE_DATA in repo.features:
1775 part.addparam(b'exp-sidedata', b'1')
1776 part.addparam(b'exp-sidedata', b'1')
1776
1777
1777 if opts.get(b'streamv2', False):
1778 if opts.get(b'stream', b"") == b"v2":
1778 addpartbundlestream2(bundler, repo, stream=True)
1779 addpartbundlestream2(bundler, repo, stream=True)
1779
1780
1780 if opts.get(b'streamv3-exp', False):
1781 if opts.get(b'stream', b"") == b"v3-exp":
1781 addpartbundlestream2(bundler, repo, stream=True)
1782 addpartbundlestream2(bundler, repo, stream=True)
1782
1783
1783 if opts.get(b'tagsfnodescache', True):
1784 if opts.get(b'tagsfnodescache', True):
1784 addparttagsfnodescache(repo, bundler, outgoing)
1785 addparttagsfnodescache(repo, bundler, outgoing)
1785
1786
1786 if opts.get(b'revbranchcache', True):
1787 if opts.get(b'revbranchcache', True):
1787 addpartrevbranchcache(repo, bundler, outgoing)
1788 addpartrevbranchcache(repo, bundler, outgoing)
1788
1789
1789 if opts.get(b'obsolescence', False):
1790 if opts.get(b'obsolescence', False):
1790 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1791 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1791 buildobsmarkerspart(
1792 buildobsmarkerspart(
1792 bundler,
1793 bundler,
1793 obsmarkers,
1794 obsmarkers,
1794 mandatory=opts.get(b'obsolescence-mandatory', True),
1795 mandatory=opts.get(b'obsolescence-mandatory', True),
1795 )
1796 )
1796
1797
1797 if opts.get(b'phases', False):
1798 if opts.get(b'phases', False):
1798 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1799 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1799 phasedata = phases.binaryencode(headsbyphase)
1800 phasedata = phases.binaryencode(headsbyphase)
1800 bundler.newpart(b'phase-heads', data=phasedata)
1801 bundler.newpart(b'phase-heads', data=phasedata)
1801
1802
1802
1803
1803 def addparttagsfnodescache(repo, bundler, outgoing):
1804 def addparttagsfnodescache(repo, bundler, outgoing):
1804 # we include the tags fnode cache for the bundle changeset
1805 # we include the tags fnode cache for the bundle changeset
1805 # (as an optional parts)
1806 # (as an optional parts)
1806 cache = tags.hgtagsfnodescache(repo.unfiltered())
1807 cache = tags.hgtagsfnodescache(repo.unfiltered())
1807 chunks = []
1808 chunks = []
1808
1809
1809 # .hgtags fnodes are only relevant for head changesets. While we could
1810 # .hgtags fnodes are only relevant for head changesets. While we could
1810 # transfer values for all known nodes, there will likely be little to
1811 # transfer values for all known nodes, there will likely be little to
1811 # no benefit.
1812 # no benefit.
1812 #
1813 #
1813 # We don't bother using a generator to produce output data because
1814 # We don't bother using a generator to produce output data because
1814 # a) we only have 40 bytes per head and even esoteric numbers of heads
1815 # a) we only have 40 bytes per head and even esoteric numbers of heads
1815 # consume little memory (1M heads is 40MB) b) we don't want to send the
1816 # consume little memory (1M heads is 40MB) b) we don't want to send the
1816 # part if we don't have entries and knowing if we have entries requires
1817 # part if we don't have entries and knowing if we have entries requires
1817 # cache lookups.
1818 # cache lookups.
1818 for node in outgoing.ancestorsof:
1819 for node in outgoing.ancestorsof:
1819 # Don't compute missing, as this may slow down serving.
1820 # Don't compute missing, as this may slow down serving.
1820 fnode = cache.getfnode(node, computemissing=False)
1821 fnode = cache.getfnode(node, computemissing=False)
1821 if fnode:
1822 if fnode:
1822 chunks.extend([node, fnode])
1823 chunks.extend([node, fnode])
1823
1824
1824 if chunks:
1825 if chunks:
1825 bundler.newpart(
1826 bundler.newpart(
1826 b'hgtagsfnodes',
1827 b'hgtagsfnodes',
1827 mandatory=False,
1828 mandatory=False,
1828 data=b''.join(chunks),
1829 data=b''.join(chunks),
1829 )
1830 )
1830
1831
1831
1832
1832 def addpartrevbranchcache(repo, bundler, outgoing):
1833 def addpartrevbranchcache(repo, bundler, outgoing):
1833 # we include the rev branch cache for the bundle changeset
1834 # we include the rev branch cache for the bundle changeset
1834 # (as an optional parts)
1835 # (as an optional parts)
1835 cache = repo.revbranchcache()
1836 cache = repo.revbranchcache()
1836 cl = repo.unfiltered().changelog
1837 cl = repo.unfiltered().changelog
1837 branchesdata = collections.defaultdict(lambda: (set(), set()))
1838 branchesdata = collections.defaultdict(lambda: (set(), set()))
1838 for node in outgoing.missing:
1839 for node in outgoing.missing:
1839 branch, close = cache.branchinfo(cl.rev(node))
1840 branch, close = cache.branchinfo(cl.rev(node))
1840 branchesdata[branch][close].add(node)
1841 branchesdata[branch][close].add(node)
1841
1842
1842 def generate():
1843 def generate():
1843 for branch, (nodes, closed) in sorted(branchesdata.items()):
1844 for branch, (nodes, closed) in sorted(branchesdata.items()):
1844 utf8branch = encoding.fromlocal(branch)
1845 utf8branch = encoding.fromlocal(branch)
1845 yield rbcstruct.pack(len(utf8branch), len(nodes), len(closed))
1846 yield rbcstruct.pack(len(utf8branch), len(nodes), len(closed))
1846 yield utf8branch
1847 yield utf8branch
1847 for n in sorted(nodes):
1848 for n in sorted(nodes):
1848 yield n
1849 yield n
1849 for n in sorted(closed):
1850 for n in sorted(closed):
1850 yield n
1851 yield n
1851
1852
1852 bundler.newpart(b'cache:rev-branch-cache', data=generate(), mandatory=False)
1853 bundler.newpart(b'cache:rev-branch-cache', data=generate(), mandatory=False)
1853
1854
1854
1855
1855 def _formatrequirementsspec(requirements):
1856 def _formatrequirementsspec(requirements):
1856 requirements = [req for req in requirements if req != b"shared"]
1857 requirements = [req for req in requirements if req != b"shared"]
1857 return urlreq.quote(b','.join(sorted(requirements)))
1858 return urlreq.quote(b','.join(sorted(requirements)))
1858
1859
1859
1860
1860 def _formatrequirementsparams(requirements):
1861 def _formatrequirementsparams(requirements):
1861 requirements = _formatrequirementsspec(requirements)
1862 requirements = _formatrequirementsspec(requirements)
1862 params = b"%s%s" % (urlreq.quote(b"requirements="), requirements)
1863 params = b"%s%s" % (urlreq.quote(b"requirements="), requirements)
1863 return params
1864 return params
1864
1865
1865
1866
1866 def format_remote_wanted_sidedata(repo):
1867 def format_remote_wanted_sidedata(repo):
1867 """Formats a repo's wanted sidedata categories into a bytestring for
1868 """Formats a repo's wanted sidedata categories into a bytestring for
1868 capabilities exchange."""
1869 capabilities exchange."""
1869 wanted = b""
1870 wanted = b""
1870 if repo._wanted_sidedata:
1871 if repo._wanted_sidedata:
1871 wanted = b','.join(
1872 wanted = b','.join(
1872 pycompat.bytestr(c) for c in sorted(repo._wanted_sidedata)
1873 pycompat.bytestr(c) for c in sorted(repo._wanted_sidedata)
1873 )
1874 )
1874 return wanted
1875 return wanted
1875
1876
1876
1877
1877 def read_remote_wanted_sidedata(remote):
1878 def read_remote_wanted_sidedata(remote):
1878 sidedata_categories = remote.capable(b'exp-wanted-sidedata')
1879 sidedata_categories = remote.capable(b'exp-wanted-sidedata')
1879 return read_wanted_sidedata(sidedata_categories)
1880 return read_wanted_sidedata(sidedata_categories)
1880
1881
1881
1882
1882 def read_wanted_sidedata(formatted):
1883 def read_wanted_sidedata(formatted):
1883 if formatted:
1884 if formatted:
1884 return set(formatted.split(b','))
1885 return set(formatted.split(b','))
1885 return set()
1886 return set()
1886
1887
1887
1888
1888 def addpartbundlestream2(bundler, repo, **kwargs):
1889 def addpartbundlestream2(bundler, repo, **kwargs):
1889 if not kwargs.get('stream', False):
1890 if not kwargs.get('stream', False):
1890 return
1891 return
1891
1892
1892 if not streamclone.allowservergeneration(repo):
1893 if not streamclone.allowservergeneration(repo):
1893 msg = _(b'stream data requested but server does not allow this feature')
1894 msg = _(b'stream data requested but server does not allow this feature')
1894 hint = _(b'the client seems buggy')
1895 hint = _(b'the client seems buggy')
1895 raise error.Abort(msg, hint=hint)
1896 raise error.Abort(msg, hint=hint)
1896 if not (b'stream' in bundler.capabilities):
1897 if not (b'stream' in bundler.capabilities):
1897 msg = _(
1898 msg = _(
1898 b'stream data requested but supported streaming clone versions were not specified'
1899 b'stream data requested but supported streaming clone versions were not specified'
1899 )
1900 )
1900 hint = _(b'the client seems buggy')
1901 hint = _(b'the client seems buggy')
1901 raise error.Abort(msg, hint=hint)
1902 raise error.Abort(msg, hint=hint)
1902 client_supported = set(bundler.capabilities[b'stream'])
1903 client_supported = set(bundler.capabilities[b'stream'])
1903 server_supported = set(getrepocaps(repo, role=b'client').get(b'stream', []))
1904 server_supported = set(getrepocaps(repo, role=b'client').get(b'stream', []))
1904 common_supported = client_supported & server_supported
1905 common_supported = client_supported & server_supported
1905 if not common_supported:
1906 if not common_supported:
1906 msg = _(b'no common supported version with the client: %s; %s')
1907 msg = _(b'no common supported version with the client: %s; %s')
1907 str_server = b','.join(sorted(server_supported))
1908 str_server = b','.join(sorted(server_supported))
1908 str_client = b','.join(sorted(client_supported))
1909 str_client = b','.join(sorted(client_supported))
1909 msg %= (str_server, str_client)
1910 msg %= (str_server, str_client)
1910 raise error.Abort(msg)
1911 raise error.Abort(msg)
1911 version = max(common_supported)
1912 version = max(common_supported)
1912
1913
1913 # Stream clones don't compress well. And compression undermines a
1914 # Stream clones don't compress well. And compression undermines a
1914 # goal of stream clones, which is to be fast. Communicate the desire
1915 # goal of stream clones, which is to be fast. Communicate the desire
1915 # to avoid compression to consumers of the bundle.
1916 # to avoid compression to consumers of the bundle.
1916 bundler.prefercompressed = False
1917 bundler.prefercompressed = False
1917
1918
1918 # get the includes and excludes
1919 # get the includes and excludes
1919 includepats = kwargs.get('includepats')
1920 includepats = kwargs.get('includepats')
1920 excludepats = kwargs.get('excludepats')
1921 excludepats = kwargs.get('excludepats')
1921
1922
1922 narrowstream = repo.ui.configbool(
1923 narrowstream = repo.ui.configbool(
1923 b'experimental', b'server.stream-narrow-clones'
1924 b'experimental', b'server.stream-narrow-clones'
1924 )
1925 )
1925
1926
1926 if (includepats or excludepats) and not narrowstream:
1927 if (includepats or excludepats) and not narrowstream:
1927 raise error.Abort(_(b'server does not support narrow stream clones'))
1928 raise error.Abort(_(b'server does not support narrow stream clones'))
1928
1929
1929 includeobsmarkers = False
1930 includeobsmarkers = False
1930 if repo.obsstore:
1931 if repo.obsstore:
1931 remoteversions = obsmarkersversion(bundler.capabilities)
1932 remoteversions = obsmarkersversion(bundler.capabilities)
1932 if not remoteversions:
1933 if not remoteversions:
1933 raise error.Abort(
1934 raise error.Abort(
1934 _(
1935 _(
1935 b'server has obsolescence markers, but client '
1936 b'server has obsolescence markers, but client '
1936 b'cannot receive them via stream clone'
1937 b'cannot receive them via stream clone'
1937 )
1938 )
1938 )
1939 )
1939 elif repo.obsstore._version in remoteversions:
1940 elif repo.obsstore._version in remoteversions:
1940 includeobsmarkers = True
1941 includeobsmarkers = True
1941
1942
1942 if version == b"v2":
1943 if version == b"v2":
1943 filecount, bytecount, it = streamclone.generatev2(
1944 filecount, bytecount, it = streamclone.generatev2(
1944 repo, includepats, excludepats, includeobsmarkers
1945 repo, includepats, excludepats, includeobsmarkers
1945 )
1946 )
1946 requirements = streamclone.streamed_requirements(repo)
1947 requirements = streamclone.streamed_requirements(repo)
1947 requirements = _formatrequirementsspec(requirements)
1948 requirements = _formatrequirementsspec(requirements)
1948 part = bundler.newpart(b'stream2', data=it)
1949 part = bundler.newpart(b'stream2', data=it)
1949 part.addparam(b'bytecount', b'%d' % bytecount, mandatory=True)
1950 part.addparam(b'bytecount', b'%d' % bytecount, mandatory=True)
1950 part.addparam(b'filecount', b'%d' % filecount, mandatory=True)
1951 part.addparam(b'filecount', b'%d' % filecount, mandatory=True)
1951 part.addparam(b'requirements', requirements, mandatory=True)
1952 part.addparam(b'requirements', requirements, mandatory=True)
1952 elif version == b"v3-exp":
1953 elif version == b"v3-exp":
1953 it = streamclone.generatev3(
1954 it = streamclone.generatev3(
1954 repo, includepats, excludepats, includeobsmarkers
1955 repo, includepats, excludepats, includeobsmarkers
1955 )
1956 )
1956 requirements = streamclone.streamed_requirements(repo)
1957 requirements = streamclone.streamed_requirements(repo)
1957 requirements = _formatrequirementsspec(requirements)
1958 requirements = _formatrequirementsspec(requirements)
1958 part = bundler.newpart(b'stream3-exp', data=it)
1959 part = bundler.newpart(b'stream3-exp', data=it)
1959 part.addparam(b'requirements', requirements, mandatory=True)
1960 part.addparam(b'requirements', requirements, mandatory=True)
1960
1961
1961
1962
1962 def buildobsmarkerspart(bundler, markers, mandatory=True):
1963 def buildobsmarkerspart(bundler, markers, mandatory=True):
1963 """add an obsmarker part to the bundler with <markers>
1964 """add an obsmarker part to the bundler with <markers>
1964
1965
1965 No part is created if markers is empty.
1966 No part is created if markers is empty.
1966 Raises ValueError if the bundler doesn't support any known obsmarker format.
1967 Raises ValueError if the bundler doesn't support any known obsmarker format.
1967 """
1968 """
1968 if not markers:
1969 if not markers:
1969 return None
1970 return None
1970
1971
1971 remoteversions = obsmarkersversion(bundler.capabilities)
1972 remoteversions = obsmarkersversion(bundler.capabilities)
1972 version = obsolete.commonversion(remoteversions)
1973 version = obsolete.commonversion(remoteversions)
1973 if version is None:
1974 if version is None:
1974 raise ValueError(b'bundler does not support common obsmarker format')
1975 raise ValueError(b'bundler does not support common obsmarker format')
1975 stream = obsolete.encodemarkers(markers, True, version=version)
1976 stream = obsolete.encodemarkers(markers, True, version=version)
1976 return bundler.newpart(b'obsmarkers', data=stream, mandatory=mandatory)
1977 return bundler.newpart(b'obsmarkers', data=stream, mandatory=mandatory)
1977
1978
1978
1979
1979 def writebundle(
1980 def writebundle(
1980 ui, cg, filename, bundletype, vfs=None, compression=None, compopts=None
1981 ui, cg, filename, bundletype, vfs=None, compression=None, compopts=None
1981 ):
1982 ):
1982 """Write a bundle file and return its filename.
1983 """Write a bundle file and return its filename.
1983
1984
1984 Existing files will not be overwritten.
1985 Existing files will not be overwritten.
1985 If no filename is specified, a temporary file is created.
1986 If no filename is specified, a temporary file is created.
1986 bz2 compression can be turned off.
1987 bz2 compression can be turned off.
1987 The bundle file will be deleted in case of errors.
1988 The bundle file will be deleted in case of errors.
1988 """
1989 """
1989
1990
1990 if bundletype == b"HG20":
1991 if bundletype == b"HG20":
1991 bundle = bundle20(ui)
1992 bundle = bundle20(ui)
1992 bundle.setcompression(compression, compopts)
1993 bundle.setcompression(compression, compopts)
1993 part = bundle.newpart(b'changegroup', data=cg.getchunks())
1994 part = bundle.newpart(b'changegroup', data=cg.getchunks())
1994 part.addparam(b'version', cg.version)
1995 part.addparam(b'version', cg.version)
1995 if b'clcount' in cg.extras:
1996 if b'clcount' in cg.extras:
1996 part.addparam(
1997 part.addparam(
1997 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1998 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1998 )
1999 )
1999 chunkiter = bundle.getchunks()
2000 chunkiter = bundle.getchunks()
2000 else:
2001 else:
2001 # compression argument is only for the bundle2 case
2002 # compression argument is only for the bundle2 case
2002 assert compression is None
2003 assert compression is None
2003 if cg.version != b'01':
2004 if cg.version != b'01':
2004 raise error.Abort(
2005 raise error.Abort(
2005 _(b'old bundle types only supports v1 changegroups')
2006 _(b'old bundle types only supports v1 changegroups')
2006 )
2007 )
2007
2008
2008 # HG20 is the case without 2 values to unpack, but is handled above.
2009 # HG20 is the case without 2 values to unpack, but is handled above.
2009 # pytype: disable=bad-unpacking
2010 # pytype: disable=bad-unpacking
2010 header, comp = bundletypes[bundletype]
2011 header, comp = bundletypes[bundletype]
2011 # pytype: enable=bad-unpacking
2012 # pytype: enable=bad-unpacking
2012
2013
2013 if comp not in util.compengines.supportedbundletypes:
2014 if comp not in util.compengines.supportedbundletypes:
2014 raise error.Abort(_(b'unknown stream compression type: %s') % comp)
2015 raise error.Abort(_(b'unknown stream compression type: %s') % comp)
2015 compengine = util.compengines.forbundletype(comp)
2016 compengine = util.compengines.forbundletype(comp)
2016
2017
2017 def chunkiter():
2018 def chunkiter():
2018 yield header
2019 yield header
2019 for chunk in compengine.compressstream(cg.getchunks(), compopts):
2020 for chunk in compengine.compressstream(cg.getchunks(), compopts):
2020 yield chunk
2021 yield chunk
2021
2022
2022 chunkiter = chunkiter()
2023 chunkiter = chunkiter()
2023
2024
2024 # parse the changegroup data, otherwise we will block
2025 # parse the changegroup data, otherwise we will block
2025 # in case of sshrepo because we don't know the end of the stream
2026 # in case of sshrepo because we don't know the end of the stream
2026 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
2027 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
2027
2028
2028
2029
2029 def combinechangegroupresults(op):
2030 def combinechangegroupresults(op):
2030 """logic to combine 0 or more addchangegroup results into one"""
2031 """logic to combine 0 or more addchangegroup results into one"""
2031 results = [r.get(b'return', 0) for r in op.records[b'changegroup']]
2032 results = [r.get(b'return', 0) for r in op.records[b'changegroup']]
2032 changedheads = 0
2033 changedheads = 0
2033 result = 1
2034 result = 1
2034 for ret in results:
2035 for ret in results:
2035 # If any changegroup result is 0, return 0
2036 # If any changegroup result is 0, return 0
2036 if ret == 0:
2037 if ret == 0:
2037 result = 0
2038 result = 0
2038 break
2039 break
2039 if ret < -1:
2040 if ret < -1:
2040 changedheads += ret + 1
2041 changedheads += ret + 1
2041 elif ret > 1:
2042 elif ret > 1:
2042 changedheads += ret - 1
2043 changedheads += ret - 1
2043 if changedheads > 0:
2044 if changedheads > 0:
2044 result = 1 + changedheads
2045 result = 1 + changedheads
2045 elif changedheads < 0:
2046 elif changedheads < 0:
2046 result = -1 + changedheads
2047 result = -1 + changedheads
2047 return result
2048 return result
2048
2049
2049
2050
2050 @parthandler(
2051 @parthandler(
2051 b'changegroup',
2052 b'changegroup',
2052 (
2053 (
2053 b'version',
2054 b'version',
2054 b'nbchanges',
2055 b'nbchanges',
2055 b'exp-sidedata',
2056 b'exp-sidedata',
2056 b'exp-wanted-sidedata',
2057 b'exp-wanted-sidedata',
2057 b'treemanifest',
2058 b'treemanifest',
2058 b'targetphase',
2059 b'targetphase',
2059 ),
2060 ),
2060 )
2061 )
2061 def handlechangegroup(op, inpart):
2062 def handlechangegroup(op, inpart):
2062 """apply a changegroup part on the repo"""
2063 """apply a changegroup part on the repo"""
2063 from . import localrepo
2064 from . import localrepo
2064
2065
2065 tr = op.gettransaction()
2066 tr = op.gettransaction()
2066 unpackerversion = inpart.params.get(b'version', b'01')
2067 unpackerversion = inpart.params.get(b'version', b'01')
2067 # We should raise an appropriate exception here
2068 # We should raise an appropriate exception here
2068 cg = changegroup.getunbundler(unpackerversion, inpart, None)
2069 cg = changegroup.getunbundler(unpackerversion, inpart, None)
2069 # the source and url passed here are overwritten by the one contained in
2070 # the source and url passed here are overwritten by the one contained in
2070 # the transaction.hookargs argument. So 'bundle2' is a placeholder
2071 # the transaction.hookargs argument. So 'bundle2' is a placeholder
2071 nbchangesets = None
2072 nbchangesets = None
2072 if b'nbchanges' in inpart.params:
2073 if b'nbchanges' in inpart.params:
2073 nbchangesets = int(inpart.params.get(b'nbchanges'))
2074 nbchangesets = int(inpart.params.get(b'nbchanges'))
2074 if b'treemanifest' in inpart.params and not scmutil.istreemanifest(op.repo):
2075 if b'treemanifest' in inpart.params and not scmutil.istreemanifest(op.repo):
2075 if len(op.repo.changelog) != 0:
2076 if len(op.repo.changelog) != 0:
2076 raise error.Abort(
2077 raise error.Abort(
2077 _(
2078 _(
2078 b"bundle contains tree manifests, but local repo is "
2079 b"bundle contains tree manifests, but local repo is "
2079 b"non-empty and does not use tree manifests"
2080 b"non-empty and does not use tree manifests"
2080 )
2081 )
2081 )
2082 )
2082 op.repo.requirements.add(requirements.TREEMANIFEST_REQUIREMENT)
2083 op.repo.requirements.add(requirements.TREEMANIFEST_REQUIREMENT)
2083 op.repo.svfs.options = localrepo.resolvestorevfsoptions(
2084 op.repo.svfs.options = localrepo.resolvestorevfsoptions(
2084 op.repo.ui, op.repo.requirements, op.repo.features
2085 op.repo.ui, op.repo.requirements, op.repo.features
2085 )
2086 )
2086 scmutil.writereporequirements(op.repo)
2087 scmutil.writereporequirements(op.repo)
2087
2088
2088 extrakwargs = {}
2089 extrakwargs = {}
2089 targetphase = inpart.params.get(b'targetphase')
2090 targetphase = inpart.params.get(b'targetphase')
2090 if targetphase is not None:
2091 if targetphase is not None:
2091 extrakwargs['targetphase'] = int(targetphase)
2092 extrakwargs['targetphase'] = int(targetphase)
2092
2093
2093 remote_sidedata = inpart.params.get(b'exp-wanted-sidedata')
2094 remote_sidedata = inpart.params.get(b'exp-wanted-sidedata')
2094 extrakwargs['sidedata_categories'] = read_wanted_sidedata(remote_sidedata)
2095 extrakwargs['sidedata_categories'] = read_wanted_sidedata(remote_sidedata)
2095
2096
2096 ret = _processchangegroup(
2097 ret = _processchangegroup(
2097 op,
2098 op,
2098 cg,
2099 cg,
2099 tr,
2100 tr,
2100 op.source,
2101 op.source,
2101 b'bundle2',
2102 b'bundle2',
2102 expectedtotal=nbchangesets,
2103 expectedtotal=nbchangesets,
2103 **extrakwargs
2104 **extrakwargs
2104 )
2105 )
2105 if op.reply is not None:
2106 if op.reply is not None:
2106 # This is definitely not the final form of this
2107 # This is definitely not the final form of this
2107 # return. But one need to start somewhere.
2108 # return. But one need to start somewhere.
2108 part = op.reply.newpart(b'reply:changegroup', mandatory=False)
2109 part = op.reply.newpart(b'reply:changegroup', mandatory=False)
2109 part.addparam(
2110 part.addparam(
2110 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2111 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2111 )
2112 )
2112 part.addparam(b'return', b'%i' % ret, mandatory=False)
2113 part.addparam(b'return', b'%i' % ret, mandatory=False)
2113 assert not inpart.read()
2114 assert not inpart.read()
2114
2115
2115
2116
2116 _remotechangegroupparams = tuple(
2117 _remotechangegroupparams = tuple(
2117 [b'url', b'size', b'digests']
2118 [b'url', b'size', b'digests']
2118 + [b'digest:%s' % k for k in util.DIGESTS.keys()]
2119 + [b'digest:%s' % k for k in util.DIGESTS.keys()]
2119 )
2120 )
2120
2121
2121
2122
2122 @parthandler(b'remote-changegroup', _remotechangegroupparams)
2123 @parthandler(b'remote-changegroup', _remotechangegroupparams)
2123 def handleremotechangegroup(op, inpart):
2124 def handleremotechangegroup(op, inpart):
2124 """apply a bundle10 on the repo, given an url and validation information
2125 """apply a bundle10 on the repo, given an url and validation information
2125
2126
2126 All the information about the remote bundle to import are given as
2127 All the information about the remote bundle to import are given as
2127 parameters. The parameters include:
2128 parameters. The parameters include:
2128 - url: the url to the bundle10.
2129 - url: the url to the bundle10.
2129 - size: the bundle10 file size. It is used to validate what was
2130 - size: the bundle10 file size. It is used to validate what was
2130 retrieved by the client matches the server knowledge about the bundle.
2131 retrieved by the client matches the server knowledge about the bundle.
2131 - digests: a space separated list of the digest types provided as
2132 - digests: a space separated list of the digest types provided as
2132 parameters.
2133 parameters.
2133 - digest:<digest-type>: the hexadecimal representation of the digest with
2134 - digest:<digest-type>: the hexadecimal representation of the digest with
2134 that name. Like the size, it is used to validate what was retrieved by
2135 that name. Like the size, it is used to validate what was retrieved by
2135 the client matches what the server knows about the bundle.
2136 the client matches what the server knows about the bundle.
2136
2137
2137 When multiple digest types are given, all of them are checked.
2138 When multiple digest types are given, all of them are checked.
2138 """
2139 """
2139 try:
2140 try:
2140 raw_url = inpart.params[b'url']
2141 raw_url = inpart.params[b'url']
2141 except KeyError:
2142 except KeyError:
2142 raise error.Abort(_(b'remote-changegroup: missing "%s" param') % b'url')
2143 raise error.Abort(_(b'remote-changegroup: missing "%s" param') % b'url')
2143 parsed_url = urlutil.url(raw_url)
2144 parsed_url = urlutil.url(raw_url)
2144 if parsed_url.scheme not in capabilities[b'remote-changegroup']:
2145 if parsed_url.scheme not in capabilities[b'remote-changegroup']:
2145 raise error.Abort(
2146 raise error.Abort(
2146 _(b'remote-changegroup does not support %s urls')
2147 _(b'remote-changegroup does not support %s urls')
2147 % parsed_url.scheme
2148 % parsed_url.scheme
2148 )
2149 )
2149
2150
2150 try:
2151 try:
2151 size = int(inpart.params[b'size'])
2152 size = int(inpart.params[b'size'])
2152 except ValueError:
2153 except ValueError:
2153 raise error.Abort(
2154 raise error.Abort(
2154 _(b'remote-changegroup: invalid value for param "%s"') % b'size'
2155 _(b'remote-changegroup: invalid value for param "%s"') % b'size'
2155 )
2156 )
2156 except KeyError:
2157 except KeyError:
2157 raise error.Abort(
2158 raise error.Abort(
2158 _(b'remote-changegroup: missing "%s" param') % b'size'
2159 _(b'remote-changegroup: missing "%s" param') % b'size'
2159 )
2160 )
2160
2161
2161 digests = {}
2162 digests = {}
2162 for typ in inpart.params.get(b'digests', b'').split():
2163 for typ in inpart.params.get(b'digests', b'').split():
2163 param = b'digest:%s' % typ
2164 param = b'digest:%s' % typ
2164 try:
2165 try:
2165 value = inpart.params[param]
2166 value = inpart.params[param]
2166 except KeyError:
2167 except KeyError:
2167 raise error.Abort(
2168 raise error.Abort(
2168 _(b'remote-changegroup: missing "%s" param') % param
2169 _(b'remote-changegroup: missing "%s" param') % param
2169 )
2170 )
2170 digests[typ] = value
2171 digests[typ] = value
2171
2172
2172 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
2173 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
2173
2174
2174 tr = op.gettransaction()
2175 tr = op.gettransaction()
2175 from . import exchange
2176 from . import exchange
2176
2177
2177 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
2178 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
2178 if not isinstance(cg, changegroup.cg1unpacker):
2179 if not isinstance(cg, changegroup.cg1unpacker):
2179 raise error.Abort(
2180 raise error.Abort(
2180 _(b'%s: not a bundle version 1.0') % urlutil.hidepassword(raw_url)
2181 _(b'%s: not a bundle version 1.0') % urlutil.hidepassword(raw_url)
2181 )
2182 )
2182 ret = _processchangegroup(op, cg, tr, op.source, b'bundle2')
2183 ret = _processchangegroup(op, cg, tr, op.source, b'bundle2')
2183 if op.reply is not None:
2184 if op.reply is not None:
2184 # This is definitely not the final form of this
2185 # This is definitely not the final form of this
2185 # return. But one need to start somewhere.
2186 # return. But one need to start somewhere.
2186 part = op.reply.newpart(b'reply:changegroup')
2187 part = op.reply.newpart(b'reply:changegroup')
2187 part.addparam(
2188 part.addparam(
2188 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2189 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2189 )
2190 )
2190 part.addparam(b'return', b'%i' % ret, mandatory=False)
2191 part.addparam(b'return', b'%i' % ret, mandatory=False)
2191 try:
2192 try:
2192 real_part.validate()
2193 real_part.validate()
2193 except error.Abort as e:
2194 except error.Abort as e:
2194 raise error.Abort(
2195 raise error.Abort(
2195 _(b'bundle at %s is corrupted:\n%s')
2196 _(b'bundle at %s is corrupted:\n%s')
2196 % (urlutil.hidepassword(raw_url), e.message)
2197 % (urlutil.hidepassword(raw_url), e.message)
2197 )
2198 )
2198 assert not inpart.read()
2199 assert not inpart.read()
2199
2200
2200
2201
2201 @parthandler(b'reply:changegroup', (b'return', b'in-reply-to'))
2202 @parthandler(b'reply:changegroup', (b'return', b'in-reply-to'))
2202 def handlereplychangegroup(op, inpart):
2203 def handlereplychangegroup(op, inpart):
2203 ret = int(inpart.params[b'return'])
2204 ret = int(inpart.params[b'return'])
2204 replyto = int(inpart.params[b'in-reply-to'])
2205 replyto = int(inpart.params[b'in-reply-to'])
2205 op.records.add(b'changegroup', {b'return': ret}, replyto)
2206 op.records.add(b'changegroup', {b'return': ret}, replyto)
2206
2207
2207
2208
2208 @parthandler(b'check:bookmarks')
2209 @parthandler(b'check:bookmarks')
2209 def handlecheckbookmarks(op, inpart):
2210 def handlecheckbookmarks(op, inpart):
2210 """check location of bookmarks
2211 """check location of bookmarks
2211
2212
2212 This part is to be used to detect push race regarding bookmark, it
2213 This part is to be used to detect push race regarding bookmark, it
2213 contains binary encoded (bookmark, node) tuple. If the local state does
2214 contains binary encoded (bookmark, node) tuple. If the local state does
2214 not marks the one in the part, a PushRaced exception is raised
2215 not marks the one in the part, a PushRaced exception is raised
2215 """
2216 """
2216 bookdata = bookmarks.binarydecode(op.repo, inpart)
2217 bookdata = bookmarks.binarydecode(op.repo, inpart)
2217
2218
2218 msgstandard = (
2219 msgstandard = (
2219 b'remote repository changed while pushing - please try again '
2220 b'remote repository changed while pushing - please try again '
2220 b'(bookmark "%s" move from %s to %s)'
2221 b'(bookmark "%s" move from %s to %s)'
2221 )
2222 )
2222 msgmissing = (
2223 msgmissing = (
2223 b'remote repository changed while pushing - please try again '
2224 b'remote repository changed while pushing - please try again '
2224 b'(bookmark "%s" is missing, expected %s)'
2225 b'(bookmark "%s" is missing, expected %s)'
2225 )
2226 )
2226 msgexist = (
2227 msgexist = (
2227 b'remote repository changed while pushing - please try again '
2228 b'remote repository changed while pushing - please try again '
2228 b'(bookmark "%s" set on %s, expected missing)'
2229 b'(bookmark "%s" set on %s, expected missing)'
2229 )
2230 )
2230 for book, node in bookdata:
2231 for book, node in bookdata:
2231 currentnode = op.repo._bookmarks.get(book)
2232 currentnode = op.repo._bookmarks.get(book)
2232 if currentnode != node:
2233 if currentnode != node:
2233 if node is None:
2234 if node is None:
2234 finalmsg = msgexist % (book, short(currentnode))
2235 finalmsg = msgexist % (book, short(currentnode))
2235 elif currentnode is None:
2236 elif currentnode is None:
2236 finalmsg = msgmissing % (book, short(node))
2237 finalmsg = msgmissing % (book, short(node))
2237 else:
2238 else:
2238 finalmsg = msgstandard % (
2239 finalmsg = msgstandard % (
2239 book,
2240 book,
2240 short(node),
2241 short(node),
2241 short(currentnode),
2242 short(currentnode),
2242 )
2243 )
2243 raise error.PushRaced(finalmsg)
2244 raise error.PushRaced(finalmsg)
2244
2245
2245
2246
2246 @parthandler(b'check:heads')
2247 @parthandler(b'check:heads')
2247 def handlecheckheads(op, inpart):
2248 def handlecheckheads(op, inpart):
2248 """check that head of the repo did not change
2249 """check that head of the repo did not change
2249
2250
2250 This is used to detect a push race when using unbundle.
2251 This is used to detect a push race when using unbundle.
2251 This replaces the "heads" argument of unbundle."""
2252 This replaces the "heads" argument of unbundle."""
2252 h = inpart.read(20)
2253 h = inpart.read(20)
2253 heads = []
2254 heads = []
2254 while len(h) == 20:
2255 while len(h) == 20:
2255 heads.append(h)
2256 heads.append(h)
2256 h = inpart.read(20)
2257 h = inpart.read(20)
2257 assert not h
2258 assert not h
2258 # Trigger a transaction so that we are guaranteed to have the lock now.
2259 # Trigger a transaction so that we are guaranteed to have the lock now.
2259 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2260 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2260 op.gettransaction()
2261 op.gettransaction()
2261 if sorted(heads) != sorted(op.repo.heads()):
2262 if sorted(heads) != sorted(op.repo.heads()):
2262 raise error.PushRaced(
2263 raise error.PushRaced(
2263 b'remote repository changed while pushing - please try again'
2264 b'remote repository changed while pushing - please try again'
2264 )
2265 )
2265
2266
2266
2267
2267 @parthandler(b'check:updated-heads')
2268 @parthandler(b'check:updated-heads')
2268 def handlecheckupdatedheads(op, inpart):
2269 def handlecheckupdatedheads(op, inpart):
2269 """check for race on the heads touched by a push
2270 """check for race on the heads touched by a push
2270
2271
2271 This is similar to 'check:heads' but focus on the heads actually updated
2272 This is similar to 'check:heads' but focus on the heads actually updated
2272 during the push. If other activities happen on unrelated heads, it is
2273 during the push. If other activities happen on unrelated heads, it is
2273 ignored.
2274 ignored.
2274
2275
2275 This allow server with high traffic to avoid push contention as long as
2276 This allow server with high traffic to avoid push contention as long as
2276 unrelated parts of the graph are involved."""
2277 unrelated parts of the graph are involved."""
2277 h = inpart.read(20)
2278 h = inpart.read(20)
2278 heads = []
2279 heads = []
2279 while len(h) == 20:
2280 while len(h) == 20:
2280 heads.append(h)
2281 heads.append(h)
2281 h = inpart.read(20)
2282 h = inpart.read(20)
2282 assert not h
2283 assert not h
2283 # trigger a transaction so that we are guaranteed to have the lock now.
2284 # trigger a transaction so that we are guaranteed to have the lock now.
2284 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2285 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2285 op.gettransaction()
2286 op.gettransaction()
2286
2287
2287 currentheads = set()
2288 currentheads = set()
2288 for ls in op.repo.branchmap().iterheads():
2289 for ls in op.repo.branchmap().iterheads():
2289 currentheads.update(ls)
2290 currentheads.update(ls)
2290
2291
2291 for h in heads:
2292 for h in heads:
2292 if h not in currentheads:
2293 if h not in currentheads:
2293 raise error.PushRaced(
2294 raise error.PushRaced(
2294 b'remote repository changed while pushing - '
2295 b'remote repository changed while pushing - '
2295 b'please try again'
2296 b'please try again'
2296 )
2297 )
2297
2298
2298
2299
2299 @parthandler(b'check:phases')
2300 @parthandler(b'check:phases')
2300 def handlecheckphases(op, inpart):
2301 def handlecheckphases(op, inpart):
2301 """check that phase boundaries of the repository did not change
2302 """check that phase boundaries of the repository did not change
2302
2303
2303 This is used to detect a push race.
2304 This is used to detect a push race.
2304 """
2305 """
2305 phasetonodes = phases.binarydecode(inpart)
2306 phasetonodes = phases.binarydecode(inpart)
2306 unfi = op.repo.unfiltered()
2307 unfi = op.repo.unfiltered()
2307 cl = unfi.changelog
2308 cl = unfi.changelog
2308 phasecache = unfi._phasecache
2309 phasecache = unfi._phasecache
2309 msg = (
2310 msg = (
2310 b'remote repository changed while pushing - please try again '
2311 b'remote repository changed while pushing - please try again '
2311 b'(%s is %s expected %s)'
2312 b'(%s is %s expected %s)'
2312 )
2313 )
2313 for expectedphase, nodes in phasetonodes.items():
2314 for expectedphase, nodes in phasetonodes.items():
2314 for n in nodes:
2315 for n in nodes:
2315 actualphase = phasecache.phase(unfi, cl.rev(n))
2316 actualphase = phasecache.phase(unfi, cl.rev(n))
2316 if actualphase != expectedphase:
2317 if actualphase != expectedphase:
2317 finalmsg = msg % (
2318 finalmsg = msg % (
2318 short(n),
2319 short(n),
2319 phases.phasenames[actualphase],
2320 phases.phasenames[actualphase],
2320 phases.phasenames[expectedphase],
2321 phases.phasenames[expectedphase],
2321 )
2322 )
2322 raise error.PushRaced(finalmsg)
2323 raise error.PushRaced(finalmsg)
2323
2324
2324
2325
2325 @parthandler(b'output')
2326 @parthandler(b'output')
2326 def handleoutput(op, inpart):
2327 def handleoutput(op, inpart):
2327 """forward output captured on the server to the client"""
2328 """forward output captured on the server to the client"""
2328 for line in inpart.read().splitlines():
2329 for line in inpart.read().splitlines():
2329 op.ui.status(_(b'remote: %s\n') % line)
2330 op.ui.status(_(b'remote: %s\n') % line)
2330
2331
2331
2332
2332 @parthandler(b'replycaps')
2333 @parthandler(b'replycaps')
2333 def handlereplycaps(op, inpart):
2334 def handlereplycaps(op, inpart):
2334 """Notify that a reply bundle should be created
2335 """Notify that a reply bundle should be created
2335
2336
2336 The payload contains the capabilities information for the reply"""
2337 The payload contains the capabilities information for the reply"""
2337 caps = decodecaps(inpart.read())
2338 caps = decodecaps(inpart.read())
2338 if op.reply is None:
2339 if op.reply is None:
2339 op.reply = bundle20(op.ui, caps)
2340 op.reply = bundle20(op.ui, caps)
2340
2341
2341
2342
2342 class AbortFromPart(error.Abort):
2343 class AbortFromPart(error.Abort):
2343 """Sub-class of Abort that denotes an error from a bundle2 part."""
2344 """Sub-class of Abort that denotes an error from a bundle2 part."""
2344
2345
2345
2346
2346 @parthandler(b'error:abort', (b'message', b'hint'))
2347 @parthandler(b'error:abort', (b'message', b'hint'))
2347 def handleerrorabort(op, inpart):
2348 def handleerrorabort(op, inpart):
2348 """Used to transmit abort error over the wire"""
2349 """Used to transmit abort error over the wire"""
2349 raise AbortFromPart(
2350 raise AbortFromPart(
2350 inpart.params[b'message'], hint=inpart.params.get(b'hint')
2351 inpart.params[b'message'], hint=inpart.params.get(b'hint')
2351 )
2352 )
2352
2353
2353
2354
2354 @parthandler(
2355 @parthandler(
2355 b'error:pushkey',
2356 b'error:pushkey',
2356 (b'namespace', b'key', b'new', b'old', b'ret', b'in-reply-to'),
2357 (b'namespace', b'key', b'new', b'old', b'ret', b'in-reply-to'),
2357 )
2358 )
2358 def handleerrorpushkey(op, inpart):
2359 def handleerrorpushkey(op, inpart):
2359 """Used to transmit failure of a mandatory pushkey over the wire"""
2360 """Used to transmit failure of a mandatory pushkey over the wire"""
2360 kwargs = {}
2361 kwargs = {}
2361 for name in (b'namespace', b'key', b'new', b'old', b'ret'):
2362 for name in (b'namespace', b'key', b'new', b'old', b'ret'):
2362 value = inpart.params.get(name)
2363 value = inpart.params.get(name)
2363 if value is not None:
2364 if value is not None:
2364 kwargs[name] = value
2365 kwargs[name] = value
2365 raise error.PushkeyFailed(
2366 raise error.PushkeyFailed(
2366 inpart.params[b'in-reply-to'], **pycompat.strkwargs(kwargs)
2367 inpart.params[b'in-reply-to'], **pycompat.strkwargs(kwargs)
2367 )
2368 )
2368
2369
2369
2370
2370 @parthandler(b'error:unsupportedcontent', (b'parttype', b'params'))
2371 @parthandler(b'error:unsupportedcontent', (b'parttype', b'params'))
2371 def handleerrorunsupportedcontent(op, inpart):
2372 def handleerrorunsupportedcontent(op, inpart):
2372 """Used to transmit unknown content error over the wire"""
2373 """Used to transmit unknown content error over the wire"""
2373 kwargs = {}
2374 kwargs = {}
2374 parttype = inpart.params.get(b'parttype')
2375 parttype = inpart.params.get(b'parttype')
2375 if parttype is not None:
2376 if parttype is not None:
2376 kwargs[b'parttype'] = parttype
2377 kwargs[b'parttype'] = parttype
2377 params = inpart.params.get(b'params')
2378 params = inpart.params.get(b'params')
2378 if params is not None:
2379 if params is not None:
2379 kwargs[b'params'] = params.split(b'\0')
2380 kwargs[b'params'] = params.split(b'\0')
2380
2381
2381 raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
2382 raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
2382
2383
2383
2384
2384 @parthandler(b'error:pushraced', (b'message',))
2385 @parthandler(b'error:pushraced', (b'message',))
2385 def handleerrorpushraced(op, inpart):
2386 def handleerrorpushraced(op, inpart):
2386 """Used to transmit push race error over the wire"""
2387 """Used to transmit push race error over the wire"""
2387 raise error.ResponseError(_(b'push failed:'), inpart.params[b'message'])
2388 raise error.ResponseError(_(b'push failed:'), inpart.params[b'message'])
2388
2389
2389
2390
2390 @parthandler(b'listkeys', (b'namespace',))
2391 @parthandler(b'listkeys', (b'namespace',))
2391 def handlelistkeys(op, inpart):
2392 def handlelistkeys(op, inpart):
2392 """retrieve pushkey namespace content stored in a bundle2"""
2393 """retrieve pushkey namespace content stored in a bundle2"""
2393 namespace = inpart.params[b'namespace']
2394 namespace = inpart.params[b'namespace']
2394 r = pushkey.decodekeys(inpart.read())
2395 r = pushkey.decodekeys(inpart.read())
2395 op.records.add(b'listkeys', (namespace, r))
2396 op.records.add(b'listkeys', (namespace, r))
2396
2397
2397
2398
2398 @parthandler(b'pushkey', (b'namespace', b'key', b'old', b'new'))
2399 @parthandler(b'pushkey', (b'namespace', b'key', b'old', b'new'))
2399 def handlepushkey(op, inpart):
2400 def handlepushkey(op, inpart):
2400 """process a pushkey request"""
2401 """process a pushkey request"""
2401 dec = pushkey.decode
2402 dec = pushkey.decode
2402 namespace = dec(inpart.params[b'namespace'])
2403 namespace = dec(inpart.params[b'namespace'])
2403 key = dec(inpart.params[b'key'])
2404 key = dec(inpart.params[b'key'])
2404 old = dec(inpart.params[b'old'])
2405 old = dec(inpart.params[b'old'])
2405 new = dec(inpart.params[b'new'])
2406 new = dec(inpart.params[b'new'])
2406 # Grab the transaction to ensure that we have the lock before performing the
2407 # Grab the transaction to ensure that we have the lock before performing the
2407 # pushkey.
2408 # pushkey.
2408 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2409 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2409 op.gettransaction()
2410 op.gettransaction()
2410 ret = op.repo.pushkey(namespace, key, old, new)
2411 ret = op.repo.pushkey(namespace, key, old, new)
2411 record = {b'namespace': namespace, b'key': key, b'old': old, b'new': new}
2412 record = {b'namespace': namespace, b'key': key, b'old': old, b'new': new}
2412 op.records.add(b'pushkey', record)
2413 op.records.add(b'pushkey', record)
2413 if op.reply is not None:
2414 if op.reply is not None:
2414 rpart = op.reply.newpart(b'reply:pushkey')
2415 rpart = op.reply.newpart(b'reply:pushkey')
2415 rpart.addparam(
2416 rpart.addparam(
2416 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2417 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2417 )
2418 )
2418 rpart.addparam(b'return', b'%i' % ret, mandatory=False)
2419 rpart.addparam(b'return', b'%i' % ret, mandatory=False)
2419 if inpart.mandatory and not ret:
2420 if inpart.mandatory and not ret:
2420 kwargs = {}
2421 kwargs = {}
2421 for key in (b'namespace', b'key', b'new', b'old', b'ret'):
2422 for key in (b'namespace', b'key', b'new', b'old', b'ret'):
2422 if key in inpart.params:
2423 if key in inpart.params:
2423 kwargs[key] = inpart.params[key]
2424 kwargs[key] = inpart.params[key]
2424 raise error.PushkeyFailed(
2425 raise error.PushkeyFailed(
2425 partid=b'%d' % inpart.id, **pycompat.strkwargs(kwargs)
2426 partid=b'%d' % inpart.id, **pycompat.strkwargs(kwargs)
2426 )
2427 )
2427
2428
2428
2429
2429 @parthandler(b'bookmarks')
2430 @parthandler(b'bookmarks')
2430 def handlebookmark(op, inpart):
2431 def handlebookmark(op, inpart):
2431 """transmit bookmark information
2432 """transmit bookmark information
2432
2433
2433 The part contains binary encoded bookmark information.
2434 The part contains binary encoded bookmark information.
2434
2435
2435 The exact behavior of this part can be controlled by the 'bookmarks' mode
2436 The exact behavior of this part can be controlled by the 'bookmarks' mode
2436 on the bundle operation.
2437 on the bundle operation.
2437
2438
2438 When mode is 'apply' (the default) the bookmark information is applied as
2439 When mode is 'apply' (the default) the bookmark information is applied as
2439 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2440 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2440 issued earlier to check for push races in such update. This behavior is
2441 issued earlier to check for push races in such update. This behavior is
2441 suitable for pushing.
2442 suitable for pushing.
2442
2443
2443 When mode is 'records', the information is recorded into the 'bookmarks'
2444 When mode is 'records', the information is recorded into the 'bookmarks'
2444 records of the bundle operation. This behavior is suitable for pulling.
2445 records of the bundle operation. This behavior is suitable for pulling.
2445 """
2446 """
2446 changes = bookmarks.binarydecode(op.repo, inpart)
2447 changes = bookmarks.binarydecode(op.repo, inpart)
2447
2448
2448 pushkeycompat = op.repo.ui.configbool(
2449 pushkeycompat = op.repo.ui.configbool(
2449 b'server', b'bookmarks-pushkey-compat'
2450 b'server', b'bookmarks-pushkey-compat'
2450 )
2451 )
2451 bookmarksmode = op.modes.get(b'bookmarks', b'apply')
2452 bookmarksmode = op.modes.get(b'bookmarks', b'apply')
2452
2453
2453 if bookmarksmode == b'apply':
2454 if bookmarksmode == b'apply':
2454 tr = op.gettransaction()
2455 tr = op.gettransaction()
2455 bookstore = op.repo._bookmarks
2456 bookstore = op.repo._bookmarks
2456 if pushkeycompat:
2457 if pushkeycompat:
2457 allhooks = []
2458 allhooks = []
2458 for book, node in changes:
2459 for book, node in changes:
2459 hookargs = tr.hookargs.copy()
2460 hookargs = tr.hookargs.copy()
2460 hookargs[b'pushkeycompat'] = b'1'
2461 hookargs[b'pushkeycompat'] = b'1'
2461 hookargs[b'namespace'] = b'bookmarks'
2462 hookargs[b'namespace'] = b'bookmarks'
2462 hookargs[b'key'] = book
2463 hookargs[b'key'] = book
2463 hookargs[b'old'] = hex(bookstore.get(book, b''))
2464 hookargs[b'old'] = hex(bookstore.get(book, b''))
2464 hookargs[b'new'] = hex(node if node is not None else b'')
2465 hookargs[b'new'] = hex(node if node is not None else b'')
2465 allhooks.append(hookargs)
2466 allhooks.append(hookargs)
2466
2467
2467 for hookargs in allhooks:
2468 for hookargs in allhooks:
2468 op.repo.hook(
2469 op.repo.hook(
2469 b'prepushkey', throw=True, **pycompat.strkwargs(hookargs)
2470 b'prepushkey', throw=True, **pycompat.strkwargs(hookargs)
2470 )
2471 )
2471
2472
2472 for book, node in changes:
2473 for book, node in changes:
2473 if bookmarks.isdivergent(book):
2474 if bookmarks.isdivergent(book):
2474 msg = _(b'cannot accept divergent bookmark %s!') % book
2475 msg = _(b'cannot accept divergent bookmark %s!') % book
2475 raise error.Abort(msg)
2476 raise error.Abort(msg)
2476
2477
2477 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2478 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2478
2479
2479 if pushkeycompat:
2480 if pushkeycompat:
2480
2481
2481 def runhook(unused_success):
2482 def runhook(unused_success):
2482 for hookargs in allhooks:
2483 for hookargs in allhooks:
2483 op.repo.hook(b'pushkey', **pycompat.strkwargs(hookargs))
2484 op.repo.hook(b'pushkey', **pycompat.strkwargs(hookargs))
2484
2485
2485 op.repo._afterlock(runhook)
2486 op.repo._afterlock(runhook)
2486
2487
2487 elif bookmarksmode == b'records':
2488 elif bookmarksmode == b'records':
2488 for book, node in changes:
2489 for book, node in changes:
2489 record = {b'bookmark': book, b'node': node}
2490 record = {b'bookmark': book, b'node': node}
2490 op.records.add(b'bookmarks', record)
2491 op.records.add(b'bookmarks', record)
2491 else:
2492 else:
2492 raise error.ProgrammingError(
2493 raise error.ProgrammingError(
2493 b'unknown bookmark mode: %s' % bookmarksmode
2494 b'unknown bookmark mode: %s' % bookmarksmode
2494 )
2495 )
2495
2496
2496
2497
2497 @parthandler(b'phase-heads')
2498 @parthandler(b'phase-heads')
2498 def handlephases(op, inpart):
2499 def handlephases(op, inpart):
2499 """apply phases from bundle part to repo"""
2500 """apply phases from bundle part to repo"""
2500 headsbyphase = phases.binarydecode(inpart)
2501 headsbyphase = phases.binarydecode(inpart)
2501 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2502 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2502
2503
2503
2504
2504 @parthandler(b'reply:pushkey', (b'return', b'in-reply-to'))
2505 @parthandler(b'reply:pushkey', (b'return', b'in-reply-to'))
2505 def handlepushkeyreply(op, inpart):
2506 def handlepushkeyreply(op, inpart):
2506 """retrieve the result of a pushkey request"""
2507 """retrieve the result of a pushkey request"""
2507 ret = int(inpart.params[b'return'])
2508 ret = int(inpart.params[b'return'])
2508 partid = int(inpart.params[b'in-reply-to'])
2509 partid = int(inpart.params[b'in-reply-to'])
2509 op.records.add(b'pushkey', {b'return': ret}, partid)
2510 op.records.add(b'pushkey', {b'return': ret}, partid)
2510
2511
2511
2512
2512 @parthandler(b'obsmarkers')
2513 @parthandler(b'obsmarkers')
2513 def handleobsmarker(op, inpart):
2514 def handleobsmarker(op, inpart):
2514 """add a stream of obsmarkers to the repo"""
2515 """add a stream of obsmarkers to the repo"""
2515 tr = op.gettransaction()
2516 tr = op.gettransaction()
2516 markerdata = inpart.read()
2517 markerdata = inpart.read()
2517 if op.ui.config(b'experimental', b'obsmarkers-exchange-debug'):
2518 if op.ui.config(b'experimental', b'obsmarkers-exchange-debug'):
2518 op.ui.writenoi18n(
2519 op.ui.writenoi18n(
2519 b'obsmarker-exchange: %i bytes received\n' % len(markerdata)
2520 b'obsmarker-exchange: %i bytes received\n' % len(markerdata)
2520 )
2521 )
2521 # The mergemarkers call will crash if marker creation is not enabled.
2522 # The mergemarkers call will crash if marker creation is not enabled.
2522 # we want to avoid this if the part is advisory.
2523 # we want to avoid this if the part is advisory.
2523 if not inpart.mandatory and op.repo.obsstore.readonly:
2524 if not inpart.mandatory and op.repo.obsstore.readonly:
2524 op.repo.ui.debug(
2525 op.repo.ui.debug(
2525 b'ignoring obsolescence markers, feature not enabled\n'
2526 b'ignoring obsolescence markers, feature not enabled\n'
2526 )
2527 )
2527 return
2528 return
2528 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2529 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2529 op.repo.invalidatevolatilesets()
2530 op.repo.invalidatevolatilesets()
2530 op.records.add(b'obsmarkers', {b'new': new})
2531 op.records.add(b'obsmarkers', {b'new': new})
2531 if op.reply is not None:
2532 if op.reply is not None:
2532 rpart = op.reply.newpart(b'reply:obsmarkers')
2533 rpart = op.reply.newpart(b'reply:obsmarkers')
2533 rpart.addparam(
2534 rpart.addparam(
2534 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2535 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2535 )
2536 )
2536 rpart.addparam(b'new', b'%i' % new, mandatory=False)
2537 rpart.addparam(b'new', b'%i' % new, mandatory=False)
2537
2538
2538
2539
2539 @parthandler(b'reply:obsmarkers', (b'new', b'in-reply-to'))
2540 @parthandler(b'reply:obsmarkers', (b'new', b'in-reply-to'))
2540 def handleobsmarkerreply(op, inpart):
2541 def handleobsmarkerreply(op, inpart):
2541 """retrieve the result of a pushkey request"""
2542 """retrieve the result of a pushkey request"""
2542 ret = int(inpart.params[b'new'])
2543 ret = int(inpart.params[b'new'])
2543 partid = int(inpart.params[b'in-reply-to'])
2544 partid = int(inpart.params[b'in-reply-to'])
2544 op.records.add(b'obsmarkers', {b'new': ret}, partid)
2545 op.records.add(b'obsmarkers', {b'new': ret}, partid)
2545
2546
2546
2547
2547 @parthandler(b'hgtagsfnodes')
2548 @parthandler(b'hgtagsfnodes')
2548 def handlehgtagsfnodes(op, inpart):
2549 def handlehgtagsfnodes(op, inpart):
2549 """Applies .hgtags fnodes cache entries to the local repo.
2550 """Applies .hgtags fnodes cache entries to the local repo.
2550
2551
2551 Payload is pairs of 20 byte changeset nodes and filenodes.
2552 Payload is pairs of 20 byte changeset nodes and filenodes.
2552 """
2553 """
2553 # Grab the transaction so we ensure that we have the lock at this point.
2554 # Grab the transaction so we ensure that we have the lock at this point.
2554 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2555 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2555 op.gettransaction()
2556 op.gettransaction()
2556 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2557 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2557
2558
2558 count = 0
2559 count = 0
2559 while True:
2560 while True:
2560 node = inpart.read(20)
2561 node = inpart.read(20)
2561 fnode = inpart.read(20)
2562 fnode = inpart.read(20)
2562 if len(node) < 20 or len(fnode) < 20:
2563 if len(node) < 20 or len(fnode) < 20:
2563 op.ui.debug(b'ignoring incomplete received .hgtags fnodes data\n')
2564 op.ui.debug(b'ignoring incomplete received .hgtags fnodes data\n')
2564 break
2565 break
2565 cache.setfnode(node, fnode)
2566 cache.setfnode(node, fnode)
2566 count += 1
2567 count += 1
2567
2568
2568 cache.write()
2569 cache.write()
2569 op.ui.debug(b'applied %i hgtags fnodes cache entries\n' % count)
2570 op.ui.debug(b'applied %i hgtags fnodes cache entries\n' % count)
2570
2571
2571
2572
2572 rbcstruct = struct.Struct(b'>III')
2573 rbcstruct = struct.Struct(b'>III')
2573
2574
2574
2575
2575 @parthandler(b'cache:rev-branch-cache')
2576 @parthandler(b'cache:rev-branch-cache')
2576 def handlerbc(op, inpart):
2577 def handlerbc(op, inpart):
2577 """Legacy part, ignored for compatibility with bundles from or
2578 """Legacy part, ignored for compatibility with bundles from or
2578 for Mercurial before 5.7. Newer Mercurial computes the cache
2579 for Mercurial before 5.7. Newer Mercurial computes the cache
2579 efficiently enough during unbundling that the additional transfer
2580 efficiently enough during unbundling that the additional transfer
2580 is unnecessary."""
2581 is unnecessary."""
2581
2582
2582
2583
2583 @parthandler(b'pushvars')
2584 @parthandler(b'pushvars')
2584 def bundle2getvars(op, part):
2585 def bundle2getvars(op, part):
2585 '''unbundle a bundle2 containing shellvars on the server'''
2586 '''unbundle a bundle2 containing shellvars on the server'''
2586 # An option to disable unbundling on server-side for security reasons
2587 # An option to disable unbundling on server-side for security reasons
2587 if op.ui.configbool(b'push', b'pushvars.server'):
2588 if op.ui.configbool(b'push', b'pushvars.server'):
2588 hookargs = {}
2589 hookargs = {}
2589 for key, value in part.advisoryparams:
2590 for key, value in part.advisoryparams:
2590 key = key.upper()
2591 key = key.upper()
2591 # We want pushed variables to have USERVAR_ prepended so we know
2592 # We want pushed variables to have USERVAR_ prepended so we know
2592 # they came from the --pushvar flag.
2593 # they came from the --pushvar flag.
2593 key = b"USERVAR_" + key
2594 key = b"USERVAR_" + key
2594 hookargs[key] = value
2595 hookargs[key] = value
2595 op.addhookargs(hookargs)
2596 op.addhookargs(hookargs)
2596
2597
2597
2598
2598 @parthandler(b'stream2', (b'requirements', b'filecount', b'bytecount'))
2599 @parthandler(b'stream2', (b'requirements', b'filecount', b'bytecount'))
2599 def handlestreamv2bundle(op, part):
2600 def handlestreamv2bundle(op, part):
2600
2601
2601 requirements = urlreq.unquote(part.params[b'requirements'])
2602 requirements = urlreq.unquote(part.params[b'requirements'])
2602 requirements = requirements.split(b',') if requirements else []
2603 requirements = requirements.split(b',') if requirements else []
2603 filecount = int(part.params[b'filecount'])
2604 filecount = int(part.params[b'filecount'])
2604 bytecount = int(part.params[b'bytecount'])
2605 bytecount = int(part.params[b'bytecount'])
2605
2606
2606 repo = op.repo
2607 repo = op.repo
2607 if len(repo):
2608 if len(repo):
2608 msg = _(b'cannot apply stream clone to non empty repository')
2609 msg = _(b'cannot apply stream clone to non empty repository')
2609 raise error.Abort(msg)
2610 raise error.Abort(msg)
2610
2611
2611 repo.ui.debug(b'applying stream bundle\n')
2612 repo.ui.debug(b'applying stream bundle\n')
2612 streamclone.applybundlev2(repo, part, filecount, bytecount, requirements)
2613 streamclone.applybundlev2(repo, part, filecount, bytecount, requirements)
2613
2614
2614
2615
2615 @parthandler(b'stream3-exp', (b'requirements',))
2616 @parthandler(b'stream3-exp', (b'requirements',))
2616 def handlestreamv3bundle(op, part):
2617 def handlestreamv3bundle(op, part):
2617 requirements = urlreq.unquote(part.params[b'requirements'])
2618 requirements = urlreq.unquote(part.params[b'requirements'])
2618 requirements = requirements.split(b',') if requirements else []
2619 requirements = requirements.split(b',') if requirements else []
2619
2620
2620 repo = op.repo
2621 repo = op.repo
2621 if len(repo):
2622 if len(repo):
2622 msg = _(b'cannot apply stream clone to non empty repository')
2623 msg = _(b'cannot apply stream clone to non empty repository')
2623 raise error.Abort(msg)
2624 raise error.Abort(msg)
2624
2625
2625 repo.ui.debug(b'applying stream bundle\n')
2626 repo.ui.debug(b'applying stream bundle\n')
2626 streamclone.applybundlev3(repo, part, requirements)
2627 streamclone.applybundlev3(repo, part, requirements)
2627
2628
2628
2629
2629 def widen_bundle(
2630 def widen_bundle(
2630 bundler, repo, oldmatcher, newmatcher, common, known, cgversion, ellipses
2631 bundler, repo, oldmatcher, newmatcher, common, known, cgversion, ellipses
2631 ):
2632 ):
2632 """generates bundle2 for widening a narrow clone
2633 """generates bundle2 for widening a narrow clone
2633
2634
2634 bundler is the bundle to which data should be added
2635 bundler is the bundle to which data should be added
2635 repo is the localrepository instance
2636 repo is the localrepository instance
2636 oldmatcher matches what the client already has
2637 oldmatcher matches what the client already has
2637 newmatcher matches what the client needs (including what it already has)
2638 newmatcher matches what the client needs (including what it already has)
2638 common is set of common heads between server and client
2639 common is set of common heads between server and client
2639 known is a set of revs known on the client side (used in ellipses)
2640 known is a set of revs known on the client side (used in ellipses)
2640 cgversion is the changegroup version to send
2641 cgversion is the changegroup version to send
2641 ellipses is boolean value telling whether to send ellipses data or not
2642 ellipses is boolean value telling whether to send ellipses data or not
2642
2643
2643 returns bundle2 of the data required for extending
2644 returns bundle2 of the data required for extending
2644 """
2645 """
2645 commonnodes = set()
2646 commonnodes = set()
2646 cl = repo.changelog
2647 cl = repo.changelog
2647 for r in repo.revs(b"::%ln", common):
2648 for r in repo.revs(b"::%ln", common):
2648 commonnodes.add(cl.node(r))
2649 commonnodes.add(cl.node(r))
2649 if commonnodes:
2650 if commonnodes:
2650 packer = changegroup.getbundler(
2651 packer = changegroup.getbundler(
2651 cgversion,
2652 cgversion,
2652 repo,
2653 repo,
2653 oldmatcher=oldmatcher,
2654 oldmatcher=oldmatcher,
2654 matcher=newmatcher,
2655 matcher=newmatcher,
2655 fullnodes=commonnodes,
2656 fullnodes=commonnodes,
2656 )
2657 )
2657 cgdata = packer.generate(
2658 cgdata = packer.generate(
2658 {repo.nullid},
2659 {repo.nullid},
2659 list(commonnodes),
2660 list(commonnodes),
2660 False,
2661 False,
2661 b'narrow_widen',
2662 b'narrow_widen',
2662 changelog=False,
2663 changelog=False,
2663 )
2664 )
2664
2665
2665 part = bundler.newpart(b'changegroup', data=cgdata)
2666 part = bundler.newpart(b'changegroup', data=cgdata)
2666 part.addparam(b'version', cgversion)
2667 part.addparam(b'version', cgversion)
2667 if scmutil.istreemanifest(repo):
2668 if scmutil.istreemanifest(repo):
2668 part.addparam(b'treemanifest', b'1')
2669 part.addparam(b'treemanifest', b'1')
2669 if repository.REPO_FEATURE_SIDE_DATA in repo.features:
2670 if repository.REPO_FEATURE_SIDE_DATA in repo.features:
2670 part.addparam(b'exp-sidedata', b'1')
2671 part.addparam(b'exp-sidedata', b'1')
2671 wanted = format_remote_wanted_sidedata(repo)
2672 wanted = format_remote_wanted_sidedata(repo)
2672 part.addparam(b'exp-wanted-sidedata', wanted)
2673 part.addparam(b'exp-wanted-sidedata', wanted)
2673
2674
2674 return bundler
2675 return bundler
@@ -1,562 +1,559 b''
1 # bundlecaches.py - utility to deal with pre-computed bundle for servers
1 # bundlecaches.py - utility to deal with pre-computed bundle for servers
2 #
2 #
3 # This software may be used and distributed according to the terms of the
3 # This software may be used and distributed according to the terms of the
4 # GNU General Public License version 2 or any later version.
4 # GNU General Public License version 2 or any later version.
5
5
6 import collections
6 import collections
7
7
8 from typing import (
8 from typing import (
9 cast,
9 cast,
10 )
10 )
11
11
12 from .i18n import _
12 from .i18n import _
13
13
14 from .thirdparty import attr
14 from .thirdparty import attr
15
15
16 from . import (
16 from . import (
17 error,
17 error,
18 requirements as requirementsmod,
18 requirements as requirementsmod,
19 sslutil,
19 sslutil,
20 util,
20 util,
21 )
21 )
22 from .utils import stringutil
22 from .utils import stringutil
23
23
24 urlreq = util.urlreq
24 urlreq = util.urlreq
25
25
26 BUNDLE_CACHE_DIR = b'bundle-cache'
26 BUNDLE_CACHE_DIR = b'bundle-cache'
27 CB_MANIFEST_FILE = b'clonebundles.manifest'
27 CB_MANIFEST_FILE = b'clonebundles.manifest'
28 CLONEBUNDLESCHEME = b"peer-bundle-cache://"
28 CLONEBUNDLESCHEME = b"peer-bundle-cache://"
29
29
30
30
31 def get_manifest(repo):
31 def get_manifest(repo):
32 """get the bundle manifest to be served to a client from a server"""
32 """get the bundle manifest to be served to a client from a server"""
33 raw_text = repo.vfs.tryread(CB_MANIFEST_FILE)
33 raw_text = repo.vfs.tryread(CB_MANIFEST_FILE)
34 entries = [e.split(b' ', 1) for e in raw_text.splitlines()]
34 entries = [e.split(b' ', 1) for e in raw_text.splitlines()]
35
35
36 new_lines = []
36 new_lines = []
37 for e in entries:
37 for e in entries:
38 url = alter_bundle_url(repo, e[0])
38 url = alter_bundle_url(repo, e[0])
39 if len(e) == 1:
39 if len(e) == 1:
40 line = url + b'\n'
40 line = url + b'\n'
41 else:
41 else:
42 line = b"%s %s\n" % (url, e[1])
42 line = b"%s %s\n" % (url, e[1])
43 new_lines.append(line)
43 new_lines.append(line)
44 return b''.join(new_lines)
44 return b''.join(new_lines)
45
45
46
46
47 def alter_bundle_url(repo, url):
47 def alter_bundle_url(repo, url):
48 """a function that exist to help extension and hosting to alter the url
48 """a function that exist to help extension and hosting to alter the url
49
49
50 This will typically be used to inject authentication information in the url
50 This will typically be used to inject authentication information in the url
51 of cached bundles."""
51 of cached bundles."""
52 return url
52 return url
53
53
54
54
55 SUPPORTED_CLONEBUNDLE_SCHEMES = [
55 SUPPORTED_CLONEBUNDLE_SCHEMES = [
56 b"http://",
56 b"http://",
57 b"https://",
57 b"https://",
58 b"largefile://",
58 b"largefile://",
59 CLONEBUNDLESCHEME,
59 CLONEBUNDLESCHEME,
60 ]
60 ]
61
61
62
62
63 @attr.s
63 @attr.s
64 class bundlespec:
64 class bundlespec:
65 compression = attr.ib()
65 compression = attr.ib()
66 wirecompression = attr.ib()
66 wirecompression = attr.ib()
67 version = attr.ib()
67 version = attr.ib()
68 wireversion = attr.ib()
68 wireversion = attr.ib()
69 # parameters explicitly overwritten by the config or the specification
69 # parameters explicitly overwritten by the config or the specification
70 _explicit_params = attr.ib()
70 _explicit_params = attr.ib()
71 # default parameter for the version
71 # default parameter for the version
72 #
72 #
73 # Keeping it separated is useful to check what was actually overwritten.
73 # Keeping it separated is useful to check what was actually overwritten.
74 _default_opts = attr.ib()
74 _default_opts = attr.ib()
75
75
76 @property
76 @property
77 def params(self):
77 def params(self):
78 return collections.ChainMap(self._explicit_params, self._default_opts)
78 return collections.ChainMap(self._explicit_params, self._default_opts)
79
79
80 @property
80 @property
81 def contentopts(self):
81 def contentopts(self):
82 # kept for Backward Compatibility concerns.
82 # kept for Backward Compatibility concerns.
83 return self.params
83 return self.params
84
84
85 def set_param(self, key, value, overwrite=True):
85 def set_param(self, key, value, overwrite=True):
86 """Set a bundle parameter value.
86 """Set a bundle parameter value.
87
87
88 Will only overwrite if overwrite is true"""
88 Will only overwrite if overwrite is true"""
89 if overwrite or key not in self._explicit_params:
89 if overwrite or key not in self._explicit_params:
90 self._explicit_params[key] = value
90 self._explicit_params[key] = value
91
91
92 def as_spec(self):
92 def as_spec(self):
93 parts = [b"%s-%s" % (self.compression, self.version)]
93 parts = [b"%s-%s" % (self.compression, self.version)]
94 for param in sorted(self._explicit_params.items()):
94 for param in sorted(self._explicit_params.items()):
95 parts.append(b'%s=%s' % param)
95 parts.append(b'%s=%s' % param)
96 return b';'.join(parts)
96 return b';'.join(parts)
97
97
98
98
99 # Maps bundle version human names to changegroup versions.
99 # Maps bundle version human names to changegroup versions.
100 _bundlespeccgversions = {
100 _bundlespeccgversions = {
101 b'v1': b'01',
101 b'v1': b'01',
102 b'v2': b'02',
102 b'v2': b'02',
103 b'v3': b'03',
103 b'v3': b'03',
104 b'packed1': b's1',
104 b'packed1': b's1',
105 b'bundle2': b'02', # legacy
105 b'bundle2': b'02', # legacy
106 }
106 }
107
107
108 # Maps bundle version with content opts to choose which part to bundle
108 # Maps bundle version with content opts to choose which part to bundle
109 _bundlespeccontentopts = {
109 _bundlespeccontentopts = {
110 b'v1': {
110 b'v1': {
111 b'changegroup': True,
111 b'changegroup': True,
112 b'cg.version': b'01',
112 b'cg.version': b'01',
113 b'obsolescence': False,
113 b'obsolescence': False,
114 b'phases': False,
114 b'phases': False,
115 b'tagsfnodescache': False,
115 b'tagsfnodescache': False,
116 b'revbranchcache': False,
116 b'revbranchcache': False,
117 },
117 },
118 b'v2': {
118 b'v2': {
119 b'changegroup': True,
119 b'changegroup': True,
120 b'cg.version': b'02',
120 b'cg.version': b'02',
121 b'obsolescence': False,
121 b'obsolescence': False,
122 b'phases': False,
122 b'phases': False,
123 b'tagsfnodescache': True,
123 b'tagsfnodescache': True,
124 b'revbranchcache': True,
124 b'revbranchcache': True,
125 },
125 },
126 b'v3': {
126 b'v3': {
127 b'changegroup': True,
127 b'changegroup': True,
128 b'cg.version': b'03',
128 b'cg.version': b'03',
129 b'obsolescence': False,
129 b'obsolescence': False,
130 b'phases': True,
130 b'phases': True,
131 b'tagsfnodescache': True,
131 b'tagsfnodescache': True,
132 b'revbranchcache': True,
132 b'revbranchcache': True,
133 },
133 },
134 b'streamv2': {
134 b'streamv2': {
135 b'changegroup': False,
135 b'changegroup': False,
136 b'cg.version': b'02',
136 b'cg.version': b'02',
137 b'obsolescence': False,
137 b'obsolescence': False,
138 b'phases': False,
138 b'phases': False,
139 b"streamv2": True,
139 b"stream": "v2",
140 b'tagsfnodescache': False,
140 b'tagsfnodescache': False,
141 b'revbranchcache': False,
141 b'revbranchcache': False,
142 },
142 },
143 b'streamv3-exp': {
143 b'streamv3-exp': {
144 b'changegroup': False,
144 b'changegroup': False,
145 b'cg.version': b'03',
145 b'cg.version': b'03',
146 b'obsolescence': False,
146 b'obsolescence': False,
147 b'phases': False,
147 b'phases': False,
148 b"streamv3-exp": True,
148 b"stream": "v3-exp",
149 b'tagsfnodescache': False,
149 b'tagsfnodescache': False,
150 b'revbranchcache': False,
150 b'revbranchcache': False,
151 },
151 },
152 b'packed1': {
152 b'packed1': {
153 b'cg.version': b's1',
153 b'cg.version': b's1',
154 },
154 },
155 b'bundle2': { # legacy
155 b'bundle2': { # legacy
156 b'cg.version': b'02',
156 b'cg.version': b'02',
157 },
157 },
158 }
158 }
159 _bundlespeccontentopts[b'bundle2'] = _bundlespeccontentopts[b'v2']
159 _bundlespeccontentopts[b'bundle2'] = _bundlespeccontentopts[b'v2']
160
160
161 _bundlespecvariants = {b"streamv2": {}}
161 _bundlespecvariants = {b"streamv2": {}}
162
162
163 # Compression engines allowed in version 1. THIS SHOULD NEVER CHANGE.
163 # Compression engines allowed in version 1. THIS SHOULD NEVER CHANGE.
164 _bundlespecv1compengines = {b'gzip', b'bzip2', b'none'}
164 _bundlespecv1compengines = {b'gzip', b'bzip2', b'none'}
165
165
166
166
167 def param_bool(key, value):
167 def param_bool(key, value):
168 """make a boolean out of a parameter value"""
168 """make a boolean out of a parameter value"""
169 b = stringutil.parsebool(value)
169 b = stringutil.parsebool(value)
170 if b is None:
170 if b is None:
171 msg = _(b"parameter %s should be a boolean ('%s')")
171 msg = _(b"parameter %s should be a boolean ('%s')")
172 msg %= (key, value)
172 msg %= (key, value)
173 raise error.InvalidBundleSpecification(msg)
173 raise error.InvalidBundleSpecification(msg)
174 return b
174 return b
175
175
176
176
177 # mapping of known parameter name need their value processed
177 # mapping of known parameter name need their value processed
178 bundle_spec_param_processing = {
178 bundle_spec_param_processing = {
179 b"obsolescence": param_bool,
179 b"obsolescence": param_bool,
180 b"obsolescence-mandatory": param_bool,
180 b"obsolescence-mandatory": param_bool,
181 b"phases": param_bool,
181 b"phases": param_bool,
182 }
182 }
183
183
184
184
185 def _parseparams(s):
185 def _parseparams(s):
186 """parse bundlespec parameter section
186 """parse bundlespec parameter section
187
187
188 input: "comp-version;params" string
188 input: "comp-version;params" string
189
189
190 return: (spec; {param_key: param_value})
190 return: (spec; {param_key: param_value})
191 """
191 """
192 if b';' not in s:
192 if b';' not in s:
193 return s, {}
193 return s, {}
194
194
195 params = {}
195 params = {}
196 version, paramstr = s.split(b';', 1)
196 version, paramstr = s.split(b';', 1)
197
197
198 err = _(b'invalid bundle specification: missing "=" in parameter: %s')
198 err = _(b'invalid bundle specification: missing "=" in parameter: %s')
199 for p in paramstr.split(b';'):
199 for p in paramstr.split(b';'):
200 if b'=' not in p:
200 if b'=' not in p:
201 msg = err % p
201 msg = err % p
202 raise error.InvalidBundleSpecification(msg)
202 raise error.InvalidBundleSpecification(msg)
203
203
204 key, value = p.split(b'=', 1)
204 key, value = p.split(b'=', 1)
205 key = urlreq.unquote(key)
205 key = urlreq.unquote(key)
206 value = urlreq.unquote(value)
206 value = urlreq.unquote(value)
207 process = bundle_spec_param_processing.get(key)
207 process = bundle_spec_param_processing.get(key)
208 if process is not None:
208 if process is not None:
209 value = process(key, value)
209 value = process(key, value)
210 params[key] = value
210 params[key] = value
211
211
212 return version, params
212 return version, params
213
213
214
214
215 def parsebundlespec(repo, spec, strict=True):
215 def parsebundlespec(repo, spec, strict=True):
216 """Parse a bundle string specification into parts.
216 """Parse a bundle string specification into parts.
217
217
218 Bundle specifications denote a well-defined bundle/exchange format.
218 Bundle specifications denote a well-defined bundle/exchange format.
219 The content of a given specification should not change over time in
219 The content of a given specification should not change over time in
220 order to ensure that bundles produced by a newer version of Mercurial are
220 order to ensure that bundles produced by a newer version of Mercurial are
221 readable from an older version.
221 readable from an older version.
222
222
223 The string currently has the form:
223 The string currently has the form:
224
224
225 <compression>-<type>[;<parameter0>[;<parameter1>]]
225 <compression>-<type>[;<parameter0>[;<parameter1>]]
226
226
227 Where <compression> is one of the supported compression formats
227 Where <compression> is one of the supported compression formats
228 and <type> is (currently) a version string. A ";" can follow the type and
228 and <type> is (currently) a version string. A ";" can follow the type and
229 all text afterwards is interpreted as URI encoded, ";" delimited key=value
229 all text afterwards is interpreted as URI encoded, ";" delimited key=value
230 pairs.
230 pairs.
231
231
232 If ``strict`` is True (the default) <compression> is required. Otherwise,
232 If ``strict`` is True (the default) <compression> is required. Otherwise,
233 it is optional.
233 it is optional.
234
234
235 Returns a bundlespec object of (compression, version, parameters).
235 Returns a bundlespec object of (compression, version, parameters).
236 Compression will be ``None`` if not in strict mode and a compression isn't
236 Compression will be ``None`` if not in strict mode and a compression isn't
237 defined.
237 defined.
238
238
239 An ``InvalidBundleSpecification`` is raised when the specification is
239 An ``InvalidBundleSpecification`` is raised when the specification is
240 not syntactically well formed.
240 not syntactically well formed.
241
241
242 An ``UnsupportedBundleSpecification`` is raised when the compression or
242 An ``UnsupportedBundleSpecification`` is raised when the compression or
243 bundle type/version is not recognized.
243 bundle type/version is not recognized.
244
244
245 Note: this function will likely eventually return a more complex data
245 Note: this function will likely eventually return a more complex data
246 structure, including bundle2 part information.
246 structure, including bundle2 part information.
247 """
247 """
248 if strict and b'-' not in spec:
248 if strict and b'-' not in spec:
249 raise error.InvalidBundleSpecification(
249 raise error.InvalidBundleSpecification(
250 _(
250 _(
251 b'invalid bundle specification; '
251 b'invalid bundle specification; '
252 b'must be prefixed with compression: %s'
252 b'must be prefixed with compression: %s'
253 )
253 )
254 % spec
254 % spec
255 )
255 )
256
256
257 pre_args = spec.split(b';', 1)[0]
257 pre_args = spec.split(b';', 1)[0]
258 if b'-' in pre_args:
258 if b'-' in pre_args:
259 compression, version = spec.split(b'-', 1)
259 compression, version = spec.split(b'-', 1)
260
260
261 if compression not in util.compengines.supportedbundlenames:
261 if compression not in util.compengines.supportedbundlenames:
262 raise error.UnsupportedBundleSpecification(
262 raise error.UnsupportedBundleSpecification(
263 _(b'%s compression is not supported') % compression
263 _(b'%s compression is not supported') % compression
264 )
264 )
265
265
266 version, params = _parseparams(version)
266 version, params = _parseparams(version)
267
267
268 if version not in _bundlespeccontentopts:
268 if version not in _bundlespeccontentopts:
269 raise error.UnsupportedBundleSpecification(
269 raise error.UnsupportedBundleSpecification(
270 _(b'%s is not a recognized bundle version') % version
270 _(b'%s is not a recognized bundle version') % version
271 )
271 )
272 else:
272 else:
273 # Value could be just the compression or just the version, in which
273 # Value could be just the compression or just the version, in which
274 # case some defaults are assumed (but only when not in strict mode).
274 # case some defaults are assumed (but only when not in strict mode).
275 assert not strict
275 assert not strict
276
276
277 spec, params = _parseparams(spec)
277 spec, params = _parseparams(spec)
278
278
279 if spec in util.compengines.supportedbundlenames:
279 if spec in util.compengines.supportedbundlenames:
280 compression = spec
280 compression = spec
281 version = b'v1'
281 version = b'v1'
282 # Generaldelta repos require v2.
282 # Generaldelta repos require v2.
283 if requirementsmod.GENERALDELTA_REQUIREMENT in repo.requirements:
283 if requirementsmod.GENERALDELTA_REQUIREMENT in repo.requirements:
284 version = b'v2'
284 version = b'v2'
285 elif requirementsmod.REVLOGV2_REQUIREMENT in repo.requirements:
285 elif requirementsmod.REVLOGV2_REQUIREMENT in repo.requirements:
286 version = b'v2'
286 version = b'v2'
287 # Modern compression engines require v2.
287 # Modern compression engines require v2.
288 if compression not in _bundlespecv1compengines:
288 if compression not in _bundlespecv1compengines:
289 version = b'v2'
289 version = b'v2'
290 elif spec in _bundlespeccontentopts:
290 elif spec in _bundlespeccontentopts:
291 if spec == b'packed1':
291 if spec == b'packed1':
292 compression = b'none'
292 compression = b'none'
293 else:
293 else:
294 compression = b'bzip2'
294 compression = b'bzip2'
295 version = spec
295 version = spec
296 else:
296 else:
297 raise error.UnsupportedBundleSpecification(
297 raise error.UnsupportedBundleSpecification(
298 _(b'%s is not a recognized bundle specification') % spec
298 _(b'%s is not a recognized bundle specification') % spec
299 )
299 )
300
300
301 # Bundle version 1 only supports a known set of compression engines.
301 # Bundle version 1 only supports a known set of compression engines.
302 if version == b'v1' and compression not in _bundlespecv1compengines:
302 if version == b'v1' and compression not in _bundlespecv1compengines:
303 raise error.UnsupportedBundleSpecification(
303 raise error.UnsupportedBundleSpecification(
304 _(b'compression engine %s is not supported on v1 bundles')
304 _(b'compression engine %s is not supported on v1 bundles')
305 % compression
305 % compression
306 )
306 )
307
307
308 # The specification for packed1 can optionally declare the data formats
308 # The specification for packed1 can optionally declare the data formats
309 # required to apply it. If we see this metadata, compare against what the
309 # required to apply it. If we see this metadata, compare against what the
310 # repo supports and error if the bundle isn't compatible.
310 # repo supports and error if the bundle isn't compatible.
311 if version == b'packed1' and b'requirements' in params:
311 if version == b'packed1' and b'requirements' in params:
312 requirements = set(cast(bytes, params[b'requirements']).split(b','))
312 requirements = set(cast(bytes, params[b'requirements']).split(b','))
313 missingreqs = requirements - requirementsmod.STREAM_FIXED_REQUIREMENTS
313 missingreqs = requirements - requirementsmod.STREAM_FIXED_REQUIREMENTS
314 if missingreqs:
314 if missingreqs:
315 raise error.UnsupportedBundleSpecification(
315 raise error.UnsupportedBundleSpecification(
316 _(b'missing support for repository features: %s')
316 _(b'missing support for repository features: %s')
317 % b', '.join(sorted(missingreqs))
317 % b', '.join(sorted(missingreqs))
318 )
318 )
319
319
320 # Compute contentopts based on the version
320 # Compute contentopts based on the version
321 if b"stream" in params:
321 if b"stream" in params:
322 # This case is fishy as this mostly derails the version selection
322 # This case is fishy as this mostly derails the version selection
323 # mechanism. `stream` bundles are quite specific and used differently
323 # mechanism. `stream` bundles are quite specific and used differently
324 # as "normal" bundles.
324 # as "normal" bundles.
325 #
325 #
326 # (we should probably define a cleaner way to do this and raise a
326 # (we should probably define a cleaner way to do this and raise a
327 # warning when the old way is encountered)
327 # warning when the old way is encountered)
328 if params[b"stream"] == b"v2":
328 if params[b"stream"] == b"v2":
329 version = b"streamv2"
329 version = b"streamv2"
330 if params[b"stream"] == b"v3-exp":
330 if params[b"stream"] == b"v3-exp":
331 version = b"streamv3-exp"
331 version = b"streamv3-exp"
332 contentopts = _bundlespeccontentopts.get(version, {}).copy()
332 contentopts = _bundlespeccontentopts.get(version, {}).copy()
333 if version == b"streamv2" or version == b"streamv3-exp":
333 if version == b"streamv2" or version == b"streamv3-exp":
334 # streamv2 have been reported as "v2" for a while.
334 # streamv2 have been reported as "v2" for a while.
335 version = b"v2"
335 version = b"v2"
336
336
337 engine = util.compengines.forbundlename(compression)
337 engine = util.compengines.forbundlename(compression)
338 compression, wirecompression = engine.bundletype()
338 compression, wirecompression = engine.bundletype()
339 wireversion = _bundlespeccontentopts[version][b'cg.version']
339 wireversion = _bundlespeccontentopts[version][b'cg.version']
340
340
341 return bundlespec(
341 return bundlespec(
342 compression, wirecompression, version, wireversion, params, contentopts
342 compression, wirecompression, version, wireversion, params, contentopts
343 )
343 )
344
344
345
345
346 def parseclonebundlesmanifest(repo, s):
346 def parseclonebundlesmanifest(repo, s):
347 """Parses the raw text of a clone bundles manifest.
347 """Parses the raw text of a clone bundles manifest.
348
348
349 Returns a list of dicts. The dicts have a ``URL`` key corresponding
349 Returns a list of dicts. The dicts have a ``URL`` key corresponding
350 to the URL and other keys are the attributes for the entry.
350 to the URL and other keys are the attributes for the entry.
351 """
351 """
352 m = []
352 m = []
353 for line in s.splitlines():
353 for line in s.splitlines():
354 fields = line.split()
354 fields = line.split()
355 if not fields:
355 if not fields:
356 continue
356 continue
357 attrs = {b'URL': fields[0]}
357 attrs = {b'URL': fields[0]}
358 for rawattr in fields[1:]:
358 for rawattr in fields[1:]:
359 key, value = rawattr.split(b'=', 1)
359 key, value = rawattr.split(b'=', 1)
360 key = util.urlreq.unquote(key)
360 key = util.urlreq.unquote(key)
361 value = util.urlreq.unquote(value)
361 value = util.urlreq.unquote(value)
362 attrs[key] = value
362 attrs[key] = value
363
363
364 # Parse BUNDLESPEC into components. This makes client-side
364 # Parse BUNDLESPEC into components. This makes client-side
365 # preferences easier to specify since you can prefer a single
365 # preferences easier to specify since you can prefer a single
366 # component of the BUNDLESPEC.
366 # component of the BUNDLESPEC.
367 if key == b'BUNDLESPEC':
367 if key == b'BUNDLESPEC':
368 try:
368 try:
369 bundlespec = parsebundlespec(repo, value)
369 bundlespec = parsebundlespec(repo, value)
370 attrs[b'COMPRESSION'] = bundlespec.compression
370 attrs[b'COMPRESSION'] = bundlespec.compression
371 attrs[b'VERSION'] = bundlespec.version
371 attrs[b'VERSION'] = bundlespec.version
372 except error.InvalidBundleSpecification:
372 except error.InvalidBundleSpecification:
373 pass
373 pass
374 except error.UnsupportedBundleSpecification:
374 except error.UnsupportedBundleSpecification:
375 pass
375 pass
376
376
377 m.append(attrs)
377 m.append(attrs)
378
378
379 return m
379 return m
380
380
381
381
382 def isstreamclonespec(bundlespec):
382 def isstreamclonespec(bundlespec):
383 # Stream clone v1
383 # Stream clone v1
384 if bundlespec.wirecompression == b'UN' and bundlespec.wireversion == b's1':
384 if bundlespec.wirecompression == b'UN' and bundlespec.wireversion == b's1':
385 return True
385 return True
386
386
387 # Stream clone v2
387 # Stream clone v2
388 if (
388 if (
389 bundlespec.wirecompression == b'UN'
389 bundlespec.wirecompression == b'UN'
390 and bundlespec.wireversion == b'02'
390 and bundlespec.wireversion == b'02'
391 and (
391 and bundlespec.contentopts.get(b'stream', None) in (b"v2", b"v3-exp")
392 bundlespec.contentopts.get(b'streamv2')
393 or bundlespec.contentopts.get(b'streamv3-exp')
394 )
395 ):
392 ):
396 return True
393 return True
397
394
398 return False
395 return False
399
396
400
397
401 def filterclonebundleentries(
398 def filterclonebundleentries(
402 repo, entries, streamclonerequested=False, pullbundles=False
399 repo, entries, streamclonerequested=False, pullbundles=False
403 ):
400 ):
404 """Remove incompatible clone bundle manifest entries.
401 """Remove incompatible clone bundle manifest entries.
405
402
406 Accepts a list of entries parsed with ``parseclonebundlesmanifest``
403 Accepts a list of entries parsed with ``parseclonebundlesmanifest``
407 and returns a new list consisting of only the entries that this client
404 and returns a new list consisting of only the entries that this client
408 should be able to apply.
405 should be able to apply.
409
406
410 There is no guarantee we'll be able to apply all returned entries because
407 There is no guarantee we'll be able to apply all returned entries because
411 the metadata we use to filter on may be missing or wrong.
408 the metadata we use to filter on may be missing or wrong.
412 """
409 """
413 newentries = []
410 newentries = []
414 for entry in entries:
411 for entry in entries:
415 url = entry.get(b'URL')
412 url = entry.get(b'URL')
416 if not pullbundles and not any(
413 if not pullbundles and not any(
417 [url.startswith(scheme) for scheme in SUPPORTED_CLONEBUNDLE_SCHEMES]
414 [url.startswith(scheme) for scheme in SUPPORTED_CLONEBUNDLE_SCHEMES]
418 ):
415 ):
419 repo.ui.debug(
416 repo.ui.debug(
420 b'filtering %s because not a supported clonebundle scheme\n'
417 b'filtering %s because not a supported clonebundle scheme\n'
421 % url
418 % url
422 )
419 )
423 continue
420 continue
424
421
425 spec = entry.get(b'BUNDLESPEC')
422 spec = entry.get(b'BUNDLESPEC')
426 if spec:
423 if spec:
427 try:
424 try:
428 bundlespec = parsebundlespec(repo, spec, strict=True)
425 bundlespec = parsebundlespec(repo, spec, strict=True)
429
426
430 # If a stream clone was requested, filter out non-streamclone
427 # If a stream clone was requested, filter out non-streamclone
431 # entries.
428 # entries.
432 if streamclonerequested and not isstreamclonespec(bundlespec):
429 if streamclonerequested and not isstreamclonespec(bundlespec):
433 repo.ui.debug(
430 repo.ui.debug(
434 b'filtering %s because not a stream clone\n' % url
431 b'filtering %s because not a stream clone\n' % url
435 )
432 )
436 continue
433 continue
437
434
438 except error.InvalidBundleSpecification as e:
435 except error.InvalidBundleSpecification as e:
439 repo.ui.debug(stringutil.forcebytestr(e) + b'\n')
436 repo.ui.debug(stringutil.forcebytestr(e) + b'\n')
440 continue
437 continue
441 except error.UnsupportedBundleSpecification as e:
438 except error.UnsupportedBundleSpecification as e:
442 repo.ui.debug(
439 repo.ui.debug(
443 b'filtering %s because unsupported bundle '
440 b'filtering %s because unsupported bundle '
444 b'spec: %s\n' % (url, stringutil.forcebytestr(e))
441 b'spec: %s\n' % (url, stringutil.forcebytestr(e))
445 )
442 )
446 continue
443 continue
447 # If we don't have a spec and requested a stream clone, we don't know
444 # If we don't have a spec and requested a stream clone, we don't know
448 # what the entry is so don't attempt to apply it.
445 # what the entry is so don't attempt to apply it.
449 elif streamclonerequested:
446 elif streamclonerequested:
450 repo.ui.debug(
447 repo.ui.debug(
451 b'filtering %s because cannot determine if a stream '
448 b'filtering %s because cannot determine if a stream '
452 b'clone bundle\n' % url
449 b'clone bundle\n' % url
453 )
450 )
454 continue
451 continue
455
452
456 if b'REQUIRESNI' in entry and not sslutil.hassni:
453 if b'REQUIRESNI' in entry and not sslutil.hassni:
457 repo.ui.debug(b'filtering %s because SNI not supported\n' % url)
454 repo.ui.debug(b'filtering %s because SNI not supported\n' % url)
458 continue
455 continue
459
456
460 if b'REQUIREDRAM' in entry:
457 if b'REQUIREDRAM' in entry:
461 try:
458 try:
462 requiredram = util.sizetoint(entry[b'REQUIREDRAM'])
459 requiredram = util.sizetoint(entry[b'REQUIREDRAM'])
463 except error.ParseError:
460 except error.ParseError:
464 repo.ui.debug(
461 repo.ui.debug(
465 b'filtering %s due to a bad REQUIREDRAM attribute\n' % url
462 b'filtering %s due to a bad REQUIREDRAM attribute\n' % url
466 )
463 )
467 continue
464 continue
468 actualram = repo.ui.estimatememory()
465 actualram = repo.ui.estimatememory()
469 if actualram is not None and actualram * 0.66 < requiredram:
466 if actualram is not None and actualram * 0.66 < requiredram:
470 repo.ui.debug(
467 repo.ui.debug(
471 b'filtering %s as it needs more than 2/3 of system memory\n'
468 b'filtering %s as it needs more than 2/3 of system memory\n'
472 % url
469 % url
473 )
470 )
474 continue
471 continue
475
472
476 newentries.append(entry)
473 newentries.append(entry)
477
474
478 return newentries
475 return newentries
479
476
480
477
481 class clonebundleentry:
478 class clonebundleentry:
482 """Represents an item in a clone bundles manifest.
479 """Represents an item in a clone bundles manifest.
483
480
484 This rich class is needed to support sorting since sorted() in Python 3
481 This rich class is needed to support sorting since sorted() in Python 3
485 doesn't support ``cmp`` and our comparison is complex enough that ``key=``
482 doesn't support ``cmp`` and our comparison is complex enough that ``key=``
486 won't work.
483 won't work.
487 """
484 """
488
485
489 def __init__(self, value, prefers):
486 def __init__(self, value, prefers):
490 self.value = value
487 self.value = value
491 self.prefers = prefers
488 self.prefers = prefers
492
489
493 def _cmp(self, other):
490 def _cmp(self, other):
494 for prefkey, prefvalue in self.prefers:
491 for prefkey, prefvalue in self.prefers:
495 avalue = self.value.get(prefkey)
492 avalue = self.value.get(prefkey)
496 bvalue = other.value.get(prefkey)
493 bvalue = other.value.get(prefkey)
497
494
498 # Special case for b missing attribute and a matches exactly.
495 # Special case for b missing attribute and a matches exactly.
499 if avalue is not None and bvalue is None and avalue == prefvalue:
496 if avalue is not None and bvalue is None and avalue == prefvalue:
500 return -1
497 return -1
501
498
502 # Special case for a missing attribute and b matches exactly.
499 # Special case for a missing attribute and b matches exactly.
503 if bvalue is not None and avalue is None and bvalue == prefvalue:
500 if bvalue is not None and avalue is None and bvalue == prefvalue:
504 return 1
501 return 1
505
502
506 # We can't compare unless attribute present on both.
503 # We can't compare unless attribute present on both.
507 if avalue is None or bvalue is None:
504 if avalue is None or bvalue is None:
508 continue
505 continue
509
506
510 # Same values should fall back to next attribute.
507 # Same values should fall back to next attribute.
511 if avalue == bvalue:
508 if avalue == bvalue:
512 continue
509 continue
513
510
514 # Exact matches come first.
511 # Exact matches come first.
515 if avalue == prefvalue:
512 if avalue == prefvalue:
516 return -1
513 return -1
517 if bvalue == prefvalue:
514 if bvalue == prefvalue:
518 return 1
515 return 1
519
516
520 # Fall back to next attribute.
517 # Fall back to next attribute.
521 continue
518 continue
522
519
523 # If we got here we couldn't sort by attributes and prefers. Fall
520 # If we got here we couldn't sort by attributes and prefers. Fall
524 # back to index order.
521 # back to index order.
525 return 0
522 return 0
526
523
527 def __lt__(self, other):
524 def __lt__(self, other):
528 return self._cmp(other) < 0
525 return self._cmp(other) < 0
529
526
530 def __gt__(self, other):
527 def __gt__(self, other):
531 return self._cmp(other) > 0
528 return self._cmp(other) > 0
532
529
533 def __eq__(self, other):
530 def __eq__(self, other):
534 return self._cmp(other) == 0
531 return self._cmp(other) == 0
535
532
536 def __le__(self, other):
533 def __le__(self, other):
537 return self._cmp(other) <= 0
534 return self._cmp(other) <= 0
538
535
539 def __ge__(self, other):
536 def __ge__(self, other):
540 return self._cmp(other) >= 0
537 return self._cmp(other) >= 0
541
538
542 def __ne__(self, other):
539 def __ne__(self, other):
543 return self._cmp(other) != 0
540 return self._cmp(other) != 0
544
541
545
542
546 def sortclonebundleentries(ui, entries):
543 def sortclonebundleentries(ui, entries):
547 prefers = ui.configlist(b'ui', b'clonebundleprefers')
544 prefers = ui.configlist(b'ui', b'clonebundleprefers')
548 if not prefers:
545 if not prefers:
549 return list(entries)
546 return list(entries)
550
547
551 def _split(p):
548 def _split(p):
552 if b'=' not in p:
549 if b'=' not in p:
553 hint = _(b"each comma separated item should be key=value pairs")
550 hint = _(b"each comma separated item should be key=value pairs")
554 raise error.Abort(
551 raise error.Abort(
555 _(b"invalid ui.clonebundleprefers item: %s") % p, hint=hint
552 _(b"invalid ui.clonebundleprefers item: %s") % p, hint=hint
556 )
553 )
557 return p.split(b'=', 1)
554 return p.split(b'=', 1)
558
555
559 prefers = [_split(p) for p in prefers]
556 prefers = [_split(p) for p in prefers]
560
557
561 items = sorted(clonebundleentry(v, prefers) for v in entries)
558 items = sorted(clonebundleentry(v, prefers) for v in entries)
562 return [i.value for i in items]
559 return [i.value for i in items]
General Comments 0
You need to be logged in to leave comments. Login now