##// END OF EJS Templates
bundle: prevent implicite bundling of internal changeset...
marmoute -
r51213:bcf54837 default
parent child Browse files
Show More
@@ -1,2625 +1,2634 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148
148
149 import collections
149 import collections
150 import errno
150 import errno
151 import os
151 import os
152 import re
152 import re
153 import string
153 import string
154 import struct
154 import struct
155 import sys
155 import sys
156
156
157 from .i18n import _
157 from .i18n import _
158 from .node import (
158 from .node import (
159 hex,
159 hex,
160 short,
160 short,
161 )
161 )
162 from . import (
162 from . import (
163 bookmarks,
163 bookmarks,
164 changegroup,
164 changegroup,
165 encoding,
165 encoding,
166 error,
166 error,
167 obsolete,
167 obsolete,
168 phases,
168 phases,
169 pushkey,
169 pushkey,
170 pycompat,
170 pycompat,
171 requirements,
171 requirements,
172 scmutil,
172 scmutil,
173 streamclone,
173 streamclone,
174 tags,
174 tags,
175 url,
175 url,
176 util,
176 util,
177 )
177 )
178 from .utils import (
178 from .utils import (
179 stringutil,
179 stringutil,
180 urlutil,
180 urlutil,
181 )
181 )
182 from .interfaces import repository
182 from .interfaces import repository
183
183
184 urlerr = util.urlerr
184 urlerr = util.urlerr
185 urlreq = util.urlreq
185 urlreq = util.urlreq
186
186
187 _pack = struct.pack
187 _pack = struct.pack
188 _unpack = struct.unpack
188 _unpack = struct.unpack
189
189
190 _fstreamparamsize = b'>i'
190 _fstreamparamsize = b'>i'
191 _fpartheadersize = b'>i'
191 _fpartheadersize = b'>i'
192 _fparttypesize = b'>B'
192 _fparttypesize = b'>B'
193 _fpartid = b'>I'
193 _fpartid = b'>I'
194 _fpayloadsize = b'>i'
194 _fpayloadsize = b'>i'
195 _fpartparamcount = b'>BB'
195 _fpartparamcount = b'>BB'
196
196
197 preferedchunksize = 32768
197 preferedchunksize = 32768
198
198
199 _parttypeforbidden = re.compile(b'[^a-zA-Z0-9_:-]')
199 _parttypeforbidden = re.compile(b'[^a-zA-Z0-9_:-]')
200
200
201
201
202 def outdebug(ui, message):
202 def outdebug(ui, message):
203 """debug regarding output stream (bundling)"""
203 """debug regarding output stream (bundling)"""
204 if ui.configbool(b'devel', b'bundle2.debug'):
204 if ui.configbool(b'devel', b'bundle2.debug'):
205 ui.debug(b'bundle2-output: %s\n' % message)
205 ui.debug(b'bundle2-output: %s\n' % message)
206
206
207
207
208 def indebug(ui, message):
208 def indebug(ui, message):
209 """debug on input stream (unbundling)"""
209 """debug on input stream (unbundling)"""
210 if ui.configbool(b'devel', b'bundle2.debug'):
210 if ui.configbool(b'devel', b'bundle2.debug'):
211 ui.debug(b'bundle2-input: %s\n' % message)
211 ui.debug(b'bundle2-input: %s\n' % message)
212
212
213
213
214 def validateparttype(parttype):
214 def validateparttype(parttype):
215 """raise ValueError if a parttype contains invalid character"""
215 """raise ValueError if a parttype contains invalid character"""
216 if _parttypeforbidden.search(parttype):
216 if _parttypeforbidden.search(parttype):
217 raise ValueError(parttype)
217 raise ValueError(parttype)
218
218
219
219
220 def _makefpartparamsizes(nbparams):
220 def _makefpartparamsizes(nbparams):
221 """return a struct format to read part parameter sizes
221 """return a struct format to read part parameter sizes
222
222
223 The number parameters is variable so we need to build that format
223 The number parameters is variable so we need to build that format
224 dynamically.
224 dynamically.
225 """
225 """
226 return b'>' + (b'BB' * nbparams)
226 return b'>' + (b'BB' * nbparams)
227
227
228
228
229 parthandlermapping = {}
229 parthandlermapping = {}
230
230
231
231
232 def parthandler(parttype, params=()):
232 def parthandler(parttype, params=()):
233 """decorator that register a function as a bundle2 part handler
233 """decorator that register a function as a bundle2 part handler
234
234
235 eg::
235 eg::
236
236
237 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
237 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
238 def myparttypehandler(...):
238 def myparttypehandler(...):
239 '''process a part of type "my part".'''
239 '''process a part of type "my part".'''
240 ...
240 ...
241 """
241 """
242 validateparttype(parttype)
242 validateparttype(parttype)
243
243
244 def _decorator(func):
244 def _decorator(func):
245 lparttype = parttype.lower() # enforce lower case matching.
245 lparttype = parttype.lower() # enforce lower case matching.
246 assert lparttype not in parthandlermapping
246 assert lparttype not in parthandlermapping
247 parthandlermapping[lparttype] = func
247 parthandlermapping[lparttype] = func
248 func.params = frozenset(params)
248 func.params = frozenset(params)
249 return func
249 return func
250
250
251 return _decorator
251 return _decorator
252
252
253
253
254 class unbundlerecords:
254 class unbundlerecords:
255 """keep record of what happens during and unbundle
255 """keep record of what happens during and unbundle
256
256
257 New records are added using `records.add('cat', obj)`. Where 'cat' is a
257 New records are added using `records.add('cat', obj)`. Where 'cat' is a
258 category of record and obj is an arbitrary object.
258 category of record and obj is an arbitrary object.
259
259
260 `records['cat']` will return all entries of this category 'cat'.
260 `records['cat']` will return all entries of this category 'cat'.
261
261
262 Iterating on the object itself will yield `('category', obj)` tuples
262 Iterating on the object itself will yield `('category', obj)` tuples
263 for all entries.
263 for all entries.
264
264
265 All iterations happens in chronological order.
265 All iterations happens in chronological order.
266 """
266 """
267
267
268 def __init__(self):
268 def __init__(self):
269 self._categories = {}
269 self._categories = {}
270 self._sequences = []
270 self._sequences = []
271 self._replies = {}
271 self._replies = {}
272
272
273 def add(self, category, entry, inreplyto=None):
273 def add(self, category, entry, inreplyto=None):
274 """add a new record of a given category.
274 """add a new record of a given category.
275
275
276 The entry can then be retrieved in the list returned by
276 The entry can then be retrieved in the list returned by
277 self['category']."""
277 self['category']."""
278 self._categories.setdefault(category, []).append(entry)
278 self._categories.setdefault(category, []).append(entry)
279 self._sequences.append((category, entry))
279 self._sequences.append((category, entry))
280 if inreplyto is not None:
280 if inreplyto is not None:
281 self.getreplies(inreplyto).add(category, entry)
281 self.getreplies(inreplyto).add(category, entry)
282
282
283 def getreplies(self, partid):
283 def getreplies(self, partid):
284 """get the records that are replies to a specific part"""
284 """get the records that are replies to a specific part"""
285 return self._replies.setdefault(partid, unbundlerecords())
285 return self._replies.setdefault(partid, unbundlerecords())
286
286
287 def __getitem__(self, cat):
287 def __getitem__(self, cat):
288 return tuple(self._categories.get(cat, ()))
288 return tuple(self._categories.get(cat, ()))
289
289
290 def __iter__(self):
290 def __iter__(self):
291 return iter(self._sequences)
291 return iter(self._sequences)
292
292
293 def __len__(self):
293 def __len__(self):
294 return len(self._sequences)
294 return len(self._sequences)
295
295
296 def __nonzero__(self):
296 def __nonzero__(self):
297 return bool(self._sequences)
297 return bool(self._sequences)
298
298
299 __bool__ = __nonzero__
299 __bool__ = __nonzero__
300
300
301
301
302 class bundleoperation:
302 class bundleoperation:
303 """an object that represents a single bundling process
303 """an object that represents a single bundling process
304
304
305 Its purpose is to carry unbundle-related objects and states.
305 Its purpose is to carry unbundle-related objects and states.
306
306
307 A new object should be created at the beginning of each bundle processing.
307 A new object should be created at the beginning of each bundle processing.
308 The object is to be returned by the processing function.
308 The object is to be returned by the processing function.
309
309
310 The object has very little content now it will ultimately contain:
310 The object has very little content now it will ultimately contain:
311 * an access to the repo the bundle is applied to,
311 * an access to the repo the bundle is applied to,
312 * a ui object,
312 * a ui object,
313 * a way to retrieve a transaction to add changes to the repo,
313 * a way to retrieve a transaction to add changes to the repo,
314 * a way to record the result of processing each part,
314 * a way to record the result of processing each part,
315 * a way to construct a bundle response when applicable.
315 * a way to construct a bundle response when applicable.
316 """
316 """
317
317
318 def __init__(
318 def __init__(
319 self,
319 self,
320 repo,
320 repo,
321 transactiongetter,
321 transactiongetter,
322 captureoutput=True,
322 captureoutput=True,
323 source=b'',
323 source=b'',
324 remote=None,
324 remote=None,
325 ):
325 ):
326 self.repo = repo
326 self.repo = repo
327 # the peer object who produced this bundle if available
327 # the peer object who produced this bundle if available
328 self.remote = remote
328 self.remote = remote
329 self.ui = repo.ui
329 self.ui = repo.ui
330 self.records = unbundlerecords()
330 self.records = unbundlerecords()
331 self.reply = None
331 self.reply = None
332 self.captureoutput = captureoutput
332 self.captureoutput = captureoutput
333 self.hookargs = {}
333 self.hookargs = {}
334 self._gettransaction = transactiongetter
334 self._gettransaction = transactiongetter
335 # carries value that can modify part behavior
335 # carries value that can modify part behavior
336 self.modes = {}
336 self.modes = {}
337 self.source = source
337 self.source = source
338
338
339 def gettransaction(self):
339 def gettransaction(self):
340 transaction = self._gettransaction()
340 transaction = self._gettransaction()
341
341
342 if self.hookargs:
342 if self.hookargs:
343 # the ones added to the transaction supercede those added
343 # the ones added to the transaction supercede those added
344 # to the operation.
344 # to the operation.
345 self.hookargs.update(transaction.hookargs)
345 self.hookargs.update(transaction.hookargs)
346 transaction.hookargs = self.hookargs
346 transaction.hookargs = self.hookargs
347
347
348 # mark the hookargs as flushed. further attempts to add to
348 # mark the hookargs as flushed. further attempts to add to
349 # hookargs will result in an abort.
349 # hookargs will result in an abort.
350 self.hookargs = None
350 self.hookargs = None
351
351
352 return transaction
352 return transaction
353
353
354 def addhookargs(self, hookargs):
354 def addhookargs(self, hookargs):
355 if self.hookargs is None:
355 if self.hookargs is None:
356 raise error.ProgrammingError(
356 raise error.ProgrammingError(
357 b'attempted to add hookargs to '
357 b'attempted to add hookargs to '
358 b'operation after transaction started'
358 b'operation after transaction started'
359 )
359 )
360 self.hookargs.update(hookargs)
360 self.hookargs.update(hookargs)
361
361
362
362
363 class TransactionUnavailable(RuntimeError):
363 class TransactionUnavailable(RuntimeError):
364 pass
364 pass
365
365
366
366
367 def _notransaction():
367 def _notransaction():
368 """default method to get a transaction while processing a bundle
368 """default method to get a transaction while processing a bundle
369
369
370 Raise an exception to highlight the fact that no transaction was expected
370 Raise an exception to highlight the fact that no transaction was expected
371 to be created"""
371 to be created"""
372 raise TransactionUnavailable()
372 raise TransactionUnavailable()
373
373
374
374
375 def applybundle(repo, unbundler, tr, source, url=None, remote=None, **kwargs):
375 def applybundle(repo, unbundler, tr, source, url=None, remote=None, **kwargs):
376 # transform me into unbundler.apply() as soon as the freeze is lifted
376 # transform me into unbundler.apply() as soon as the freeze is lifted
377 if isinstance(unbundler, unbundle20):
377 if isinstance(unbundler, unbundle20):
378 tr.hookargs[b'bundle2'] = b'1'
378 tr.hookargs[b'bundle2'] = b'1'
379 if source is not None and b'source' not in tr.hookargs:
379 if source is not None and b'source' not in tr.hookargs:
380 tr.hookargs[b'source'] = source
380 tr.hookargs[b'source'] = source
381 if url is not None and b'url' not in tr.hookargs:
381 if url is not None and b'url' not in tr.hookargs:
382 tr.hookargs[b'url'] = url
382 tr.hookargs[b'url'] = url
383 return processbundle(
383 return processbundle(
384 repo, unbundler, lambda: tr, source=source, remote=remote
384 repo, unbundler, lambda: tr, source=source, remote=remote
385 )
385 )
386 else:
386 else:
387 # the transactiongetter won't be used, but we might as well set it
387 # the transactiongetter won't be used, but we might as well set it
388 op = bundleoperation(repo, lambda: tr, source=source, remote=remote)
388 op = bundleoperation(repo, lambda: tr, source=source, remote=remote)
389 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
389 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
390 return op
390 return op
391
391
392
392
393 class partiterator:
393 class partiterator:
394 def __init__(self, repo, op, unbundler):
394 def __init__(self, repo, op, unbundler):
395 self.repo = repo
395 self.repo = repo
396 self.op = op
396 self.op = op
397 self.unbundler = unbundler
397 self.unbundler = unbundler
398 self.iterator = None
398 self.iterator = None
399 self.count = 0
399 self.count = 0
400 self.current = None
400 self.current = None
401
401
402 def __enter__(self):
402 def __enter__(self):
403 def func():
403 def func():
404 itr = enumerate(self.unbundler.iterparts(), 1)
404 itr = enumerate(self.unbundler.iterparts(), 1)
405 for count, p in itr:
405 for count, p in itr:
406 self.count = count
406 self.count = count
407 self.current = p
407 self.current = p
408 yield p
408 yield p
409 p.consume()
409 p.consume()
410 self.current = None
410 self.current = None
411
411
412 self.iterator = func()
412 self.iterator = func()
413 return self.iterator
413 return self.iterator
414
414
415 def __exit__(self, type, exc, tb):
415 def __exit__(self, type, exc, tb):
416 if not self.iterator:
416 if not self.iterator:
417 return
417 return
418
418
419 # Only gracefully abort in a normal exception situation. User aborts
419 # Only gracefully abort in a normal exception situation. User aborts
420 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
420 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
421 # and should not gracefully cleanup.
421 # and should not gracefully cleanup.
422 if isinstance(exc, Exception):
422 if isinstance(exc, Exception):
423 # Any exceptions seeking to the end of the bundle at this point are
423 # Any exceptions seeking to the end of the bundle at this point are
424 # almost certainly related to the underlying stream being bad.
424 # almost certainly related to the underlying stream being bad.
425 # And, chances are that the exception we're handling is related to
425 # And, chances are that the exception we're handling is related to
426 # getting in that bad state. So, we swallow the seeking error and
426 # getting in that bad state. So, we swallow the seeking error and
427 # re-raise the original error.
427 # re-raise the original error.
428 seekerror = False
428 seekerror = False
429 try:
429 try:
430 if self.current:
430 if self.current:
431 # consume the part content to not corrupt the stream.
431 # consume the part content to not corrupt the stream.
432 self.current.consume()
432 self.current.consume()
433
433
434 for part in self.iterator:
434 for part in self.iterator:
435 # consume the bundle content
435 # consume the bundle content
436 part.consume()
436 part.consume()
437 except Exception:
437 except Exception:
438 seekerror = True
438 seekerror = True
439
439
440 # Small hack to let caller code distinguish exceptions from bundle2
440 # Small hack to let caller code distinguish exceptions from bundle2
441 # processing from processing the old format. This is mostly needed
441 # processing from processing the old format. This is mostly needed
442 # to handle different return codes to unbundle according to the type
442 # to handle different return codes to unbundle according to the type
443 # of bundle. We should probably clean up or drop this return code
443 # of bundle. We should probably clean up or drop this return code
444 # craziness in a future version.
444 # craziness in a future version.
445 exc.duringunbundle2 = True
445 exc.duringunbundle2 = True
446 salvaged = []
446 salvaged = []
447 replycaps = None
447 replycaps = None
448 if self.op.reply is not None:
448 if self.op.reply is not None:
449 salvaged = self.op.reply.salvageoutput()
449 salvaged = self.op.reply.salvageoutput()
450 replycaps = self.op.reply.capabilities
450 replycaps = self.op.reply.capabilities
451 exc._replycaps = replycaps
451 exc._replycaps = replycaps
452 exc._bundle2salvagedoutput = salvaged
452 exc._bundle2salvagedoutput = salvaged
453
453
454 # Re-raising from a variable loses the original stack. So only use
454 # Re-raising from a variable loses the original stack. So only use
455 # that form if we need to.
455 # that form if we need to.
456 if seekerror:
456 if seekerror:
457 raise exc
457 raise exc
458
458
459 self.repo.ui.debug(
459 self.repo.ui.debug(
460 b'bundle2-input-bundle: %i parts total\n' % self.count
460 b'bundle2-input-bundle: %i parts total\n' % self.count
461 )
461 )
462
462
463
463
464 def processbundle(
464 def processbundle(
465 repo,
465 repo,
466 unbundler,
466 unbundler,
467 transactiongetter=None,
467 transactiongetter=None,
468 op=None,
468 op=None,
469 source=b'',
469 source=b'',
470 remote=None,
470 remote=None,
471 ):
471 ):
472 """This function process a bundle, apply effect to/from a repo
472 """This function process a bundle, apply effect to/from a repo
473
473
474 It iterates over each part then searches for and uses the proper handling
474 It iterates over each part then searches for and uses the proper handling
475 code to process the part. Parts are processed in order.
475 code to process the part. Parts are processed in order.
476
476
477 Unknown Mandatory part will abort the process.
477 Unknown Mandatory part will abort the process.
478
478
479 It is temporarily possible to provide a prebuilt bundleoperation to the
479 It is temporarily possible to provide a prebuilt bundleoperation to the
480 function. This is used to ensure output is properly propagated in case of
480 function. This is used to ensure output is properly propagated in case of
481 an error during the unbundling. This output capturing part will likely be
481 an error during the unbundling. This output capturing part will likely be
482 reworked and this ability will probably go away in the process.
482 reworked and this ability will probably go away in the process.
483 """
483 """
484 if op is None:
484 if op is None:
485 if transactiongetter is None:
485 if transactiongetter is None:
486 transactiongetter = _notransaction
486 transactiongetter = _notransaction
487 op = bundleoperation(
487 op = bundleoperation(
488 repo,
488 repo,
489 transactiongetter,
489 transactiongetter,
490 source=source,
490 source=source,
491 remote=remote,
491 remote=remote,
492 )
492 )
493 # todo:
493 # todo:
494 # - replace this is a init function soon.
494 # - replace this is a init function soon.
495 # - exception catching
495 # - exception catching
496 unbundler.params
496 unbundler.params
497 if repo.ui.debugflag:
497 if repo.ui.debugflag:
498 msg = [b'bundle2-input-bundle:']
498 msg = [b'bundle2-input-bundle:']
499 if unbundler.params:
499 if unbundler.params:
500 msg.append(b' %i params' % len(unbundler.params))
500 msg.append(b' %i params' % len(unbundler.params))
501 if op._gettransaction is None or op._gettransaction is _notransaction:
501 if op._gettransaction is None or op._gettransaction is _notransaction:
502 msg.append(b' no-transaction')
502 msg.append(b' no-transaction')
503 else:
503 else:
504 msg.append(b' with-transaction')
504 msg.append(b' with-transaction')
505 msg.append(b'\n')
505 msg.append(b'\n')
506 repo.ui.debug(b''.join(msg))
506 repo.ui.debug(b''.join(msg))
507
507
508 processparts(repo, op, unbundler)
508 processparts(repo, op, unbundler)
509
509
510 return op
510 return op
511
511
512
512
513 def processparts(repo, op, unbundler):
513 def processparts(repo, op, unbundler):
514 with partiterator(repo, op, unbundler) as parts:
514 with partiterator(repo, op, unbundler) as parts:
515 for part in parts:
515 for part in parts:
516 _processpart(op, part)
516 _processpart(op, part)
517
517
518
518
519 def _processchangegroup(op, cg, tr, source, url, **kwargs):
519 def _processchangegroup(op, cg, tr, source, url, **kwargs):
520 if op.remote is not None and op.remote.path is not None:
520 if op.remote is not None and op.remote.path is not None:
521 remote_path = op.remote.path
521 remote_path = op.remote.path
522 kwargs = kwargs.copy()
522 kwargs = kwargs.copy()
523 kwargs['delta_base_reuse_policy'] = remote_path.delta_reuse_policy
523 kwargs['delta_base_reuse_policy'] = remote_path.delta_reuse_policy
524 ret = cg.apply(op.repo, tr, source, url, **kwargs)
524 ret = cg.apply(op.repo, tr, source, url, **kwargs)
525 op.records.add(
525 op.records.add(
526 b'changegroup',
526 b'changegroup',
527 {
527 {
528 b'return': ret,
528 b'return': ret,
529 },
529 },
530 )
530 )
531 return ret
531 return ret
532
532
533
533
534 def _gethandler(op, part):
534 def _gethandler(op, part):
535 status = b'unknown' # used by debug output
535 status = b'unknown' # used by debug output
536 try:
536 try:
537 handler = parthandlermapping.get(part.type)
537 handler = parthandlermapping.get(part.type)
538 if handler is None:
538 if handler is None:
539 status = b'unsupported-type'
539 status = b'unsupported-type'
540 raise error.BundleUnknownFeatureError(parttype=part.type)
540 raise error.BundleUnknownFeatureError(parttype=part.type)
541 indebug(op.ui, b'found a handler for part %s' % part.type)
541 indebug(op.ui, b'found a handler for part %s' % part.type)
542 unknownparams = part.mandatorykeys - handler.params
542 unknownparams = part.mandatorykeys - handler.params
543 if unknownparams:
543 if unknownparams:
544 unknownparams = list(unknownparams)
544 unknownparams = list(unknownparams)
545 unknownparams.sort()
545 unknownparams.sort()
546 status = b'unsupported-params (%s)' % b', '.join(unknownparams)
546 status = b'unsupported-params (%s)' % b', '.join(unknownparams)
547 raise error.BundleUnknownFeatureError(
547 raise error.BundleUnknownFeatureError(
548 parttype=part.type, params=unknownparams
548 parttype=part.type, params=unknownparams
549 )
549 )
550 status = b'supported'
550 status = b'supported'
551 except error.BundleUnknownFeatureError as exc:
551 except error.BundleUnknownFeatureError as exc:
552 if part.mandatory: # mandatory parts
552 if part.mandatory: # mandatory parts
553 raise
553 raise
554 indebug(op.ui, b'ignoring unsupported advisory part %s' % exc)
554 indebug(op.ui, b'ignoring unsupported advisory part %s' % exc)
555 return # skip to part processing
555 return # skip to part processing
556 finally:
556 finally:
557 if op.ui.debugflag:
557 if op.ui.debugflag:
558 msg = [b'bundle2-input-part: "%s"' % part.type]
558 msg = [b'bundle2-input-part: "%s"' % part.type]
559 if not part.mandatory:
559 if not part.mandatory:
560 msg.append(b' (advisory)')
560 msg.append(b' (advisory)')
561 nbmp = len(part.mandatorykeys)
561 nbmp = len(part.mandatorykeys)
562 nbap = len(part.params) - nbmp
562 nbap = len(part.params) - nbmp
563 if nbmp or nbap:
563 if nbmp or nbap:
564 msg.append(b' (params:')
564 msg.append(b' (params:')
565 if nbmp:
565 if nbmp:
566 msg.append(b' %i mandatory' % nbmp)
566 msg.append(b' %i mandatory' % nbmp)
567 if nbap:
567 if nbap:
568 msg.append(b' %i advisory' % nbmp)
568 msg.append(b' %i advisory' % nbmp)
569 msg.append(b')')
569 msg.append(b')')
570 msg.append(b' %s\n' % status)
570 msg.append(b' %s\n' % status)
571 op.ui.debug(b''.join(msg))
571 op.ui.debug(b''.join(msg))
572
572
573 return handler
573 return handler
574
574
575
575
576 def _processpart(op, part):
576 def _processpart(op, part):
577 """process a single part from a bundle
577 """process a single part from a bundle
578
578
579 The part is guaranteed to have been fully consumed when the function exits
579 The part is guaranteed to have been fully consumed when the function exits
580 (even if an exception is raised)."""
580 (even if an exception is raised)."""
581 handler = _gethandler(op, part)
581 handler = _gethandler(op, part)
582 if handler is None:
582 if handler is None:
583 return
583 return
584
584
585 # handler is called outside the above try block so that we don't
585 # handler is called outside the above try block so that we don't
586 # risk catching KeyErrors from anything other than the
586 # risk catching KeyErrors from anything other than the
587 # parthandlermapping lookup (any KeyError raised by handler()
587 # parthandlermapping lookup (any KeyError raised by handler()
588 # itself represents a defect of a different variety).
588 # itself represents a defect of a different variety).
589 output = None
589 output = None
590 if op.captureoutput and op.reply is not None:
590 if op.captureoutput and op.reply is not None:
591 op.ui.pushbuffer(error=True, subproc=True)
591 op.ui.pushbuffer(error=True, subproc=True)
592 output = b''
592 output = b''
593 try:
593 try:
594 handler(op, part)
594 handler(op, part)
595 finally:
595 finally:
596 if output is not None:
596 if output is not None:
597 output = op.ui.popbuffer()
597 output = op.ui.popbuffer()
598 if output:
598 if output:
599 outpart = op.reply.newpart(b'output', data=output, mandatory=False)
599 outpart = op.reply.newpart(b'output', data=output, mandatory=False)
600 outpart.addparam(
600 outpart.addparam(
601 b'in-reply-to', pycompat.bytestr(part.id), mandatory=False
601 b'in-reply-to', pycompat.bytestr(part.id), mandatory=False
602 )
602 )
603
603
604
604
605 def decodecaps(blob):
605 def decodecaps(blob):
606 """decode a bundle2 caps bytes blob into a dictionary
606 """decode a bundle2 caps bytes blob into a dictionary
607
607
608 The blob is a list of capabilities (one per line)
608 The blob is a list of capabilities (one per line)
609 Capabilities may have values using a line of the form::
609 Capabilities may have values using a line of the form::
610
610
611 capability=value1,value2,value3
611 capability=value1,value2,value3
612
612
613 The values are always a list."""
613 The values are always a list."""
614 caps = {}
614 caps = {}
615 for line in blob.splitlines():
615 for line in blob.splitlines():
616 if not line:
616 if not line:
617 continue
617 continue
618 if b'=' not in line:
618 if b'=' not in line:
619 key, vals = line, ()
619 key, vals = line, ()
620 else:
620 else:
621 key, vals = line.split(b'=', 1)
621 key, vals = line.split(b'=', 1)
622 vals = vals.split(b',')
622 vals = vals.split(b',')
623 key = urlreq.unquote(key)
623 key = urlreq.unquote(key)
624 vals = [urlreq.unquote(v) for v in vals]
624 vals = [urlreq.unquote(v) for v in vals]
625 caps[key] = vals
625 caps[key] = vals
626 return caps
626 return caps
627
627
628
628
629 def encodecaps(caps):
629 def encodecaps(caps):
630 """encode a bundle2 caps dictionary into a bytes blob"""
630 """encode a bundle2 caps dictionary into a bytes blob"""
631 chunks = []
631 chunks = []
632 for ca in sorted(caps):
632 for ca in sorted(caps):
633 vals = caps[ca]
633 vals = caps[ca]
634 ca = urlreq.quote(ca)
634 ca = urlreq.quote(ca)
635 vals = [urlreq.quote(v) for v in vals]
635 vals = [urlreq.quote(v) for v in vals]
636 if vals:
636 if vals:
637 ca = b"%s=%s" % (ca, b','.join(vals))
637 ca = b"%s=%s" % (ca, b','.join(vals))
638 chunks.append(ca)
638 chunks.append(ca)
639 return b'\n'.join(chunks)
639 return b'\n'.join(chunks)
640
640
641
641
642 bundletypes = {
642 bundletypes = {
643 b"": (b"", b'UN'), # only when using unbundle on ssh and old http servers
643 b"": (b"", b'UN'), # only when using unbundle on ssh and old http servers
644 # since the unification ssh accepts a header but there
644 # since the unification ssh accepts a header but there
645 # is no capability signaling it.
645 # is no capability signaling it.
646 b"HG20": (), # special-cased below
646 b"HG20": (), # special-cased below
647 b"HG10UN": (b"HG10UN", b'UN'),
647 b"HG10UN": (b"HG10UN", b'UN'),
648 b"HG10BZ": (b"HG10", b'BZ'),
648 b"HG10BZ": (b"HG10", b'BZ'),
649 b"HG10GZ": (b"HG10GZ", b'GZ'),
649 b"HG10GZ": (b"HG10GZ", b'GZ'),
650 }
650 }
651
651
652 # hgweb uses this list to communicate its preferred type
652 # hgweb uses this list to communicate its preferred type
653 bundlepriority = [b'HG10GZ', b'HG10BZ', b'HG10UN']
653 bundlepriority = [b'HG10GZ', b'HG10BZ', b'HG10UN']
654
654
655
655
656 class bundle20:
656 class bundle20:
657 """represent an outgoing bundle2 container
657 """represent an outgoing bundle2 container
658
658
659 Use the `addparam` method to add stream level parameter. and `newpart` to
659 Use the `addparam` method to add stream level parameter. and `newpart` to
660 populate it. Then call `getchunks` to retrieve all the binary chunks of
660 populate it. Then call `getchunks` to retrieve all the binary chunks of
661 data that compose the bundle2 container."""
661 data that compose the bundle2 container."""
662
662
663 _magicstring = b'HG20'
663 _magicstring = b'HG20'
664
664
665 def __init__(self, ui, capabilities=()):
665 def __init__(self, ui, capabilities=()):
666 self.ui = ui
666 self.ui = ui
667 self._params = []
667 self._params = []
668 self._parts = []
668 self._parts = []
669 self.capabilities = dict(capabilities)
669 self.capabilities = dict(capabilities)
670 self._compengine = util.compengines.forbundletype(b'UN')
670 self._compengine = util.compengines.forbundletype(b'UN')
671 self._compopts = None
671 self._compopts = None
672 # If compression is being handled by a consumer of the raw
672 # If compression is being handled by a consumer of the raw
673 # data (e.g. the wire protocol), unsetting this flag tells
673 # data (e.g. the wire protocol), unsetting this flag tells
674 # consumers that the bundle is best left uncompressed.
674 # consumers that the bundle is best left uncompressed.
675 self.prefercompressed = True
675 self.prefercompressed = True
676
676
677 def setcompression(self, alg, compopts=None):
677 def setcompression(self, alg, compopts=None):
678 """setup core part compression to <alg>"""
678 """setup core part compression to <alg>"""
679 if alg in (None, b'UN'):
679 if alg in (None, b'UN'):
680 return
680 return
681 assert not any(n.lower() == b'compression' for n, v in self._params)
681 assert not any(n.lower() == b'compression' for n, v in self._params)
682 self.addparam(b'Compression', alg)
682 self.addparam(b'Compression', alg)
683 self._compengine = util.compengines.forbundletype(alg)
683 self._compengine = util.compengines.forbundletype(alg)
684 self._compopts = compopts
684 self._compopts = compopts
685
685
686 @property
686 @property
687 def nbparts(self):
687 def nbparts(self):
688 """total number of parts added to the bundler"""
688 """total number of parts added to the bundler"""
689 return len(self._parts)
689 return len(self._parts)
690
690
691 # methods used to defines the bundle2 content
691 # methods used to defines the bundle2 content
692 def addparam(self, name, value=None):
692 def addparam(self, name, value=None):
693 """add a stream level parameter"""
693 """add a stream level parameter"""
694 if not name:
694 if not name:
695 raise error.ProgrammingError(b'empty parameter name')
695 raise error.ProgrammingError(b'empty parameter name')
696 if name[0:1] not in pycompat.bytestr(
696 if name[0:1] not in pycompat.bytestr(
697 string.ascii_letters # pytype: disable=wrong-arg-types
697 string.ascii_letters # pytype: disable=wrong-arg-types
698 ):
698 ):
699 raise error.ProgrammingError(
699 raise error.ProgrammingError(
700 b'non letter first character: %s' % name
700 b'non letter first character: %s' % name
701 )
701 )
702 self._params.append((name, value))
702 self._params.append((name, value))
703
703
704 def addpart(self, part):
704 def addpart(self, part):
705 """add a new part to the bundle2 container
705 """add a new part to the bundle2 container
706
706
707 Parts contains the actual applicative payload."""
707 Parts contains the actual applicative payload."""
708 assert part.id is None
708 assert part.id is None
709 part.id = len(self._parts) # very cheap counter
709 part.id = len(self._parts) # very cheap counter
710 self._parts.append(part)
710 self._parts.append(part)
711
711
712 def newpart(self, typeid, *args, **kwargs):
712 def newpart(self, typeid, *args, **kwargs):
713 """create a new part and add it to the containers
713 """create a new part and add it to the containers
714
714
715 As the part is directly added to the containers. For now, this means
715 As the part is directly added to the containers. For now, this means
716 that any failure to properly initialize the part after calling
716 that any failure to properly initialize the part after calling
717 ``newpart`` should result in a failure of the whole bundling process.
717 ``newpart`` should result in a failure of the whole bundling process.
718
718
719 You can still fall back to manually create and add if you need better
719 You can still fall back to manually create and add if you need better
720 control."""
720 control."""
721 part = bundlepart(typeid, *args, **kwargs)
721 part = bundlepart(typeid, *args, **kwargs)
722 self.addpart(part)
722 self.addpart(part)
723 return part
723 return part
724
724
725 # methods used to generate the bundle2 stream
725 # methods used to generate the bundle2 stream
726 def getchunks(self):
726 def getchunks(self):
727 if self.ui.debugflag:
727 if self.ui.debugflag:
728 msg = [b'bundle2-output-bundle: "%s",' % self._magicstring]
728 msg = [b'bundle2-output-bundle: "%s",' % self._magicstring]
729 if self._params:
729 if self._params:
730 msg.append(b' (%i params)' % len(self._params))
730 msg.append(b' (%i params)' % len(self._params))
731 msg.append(b' %i parts total\n' % len(self._parts))
731 msg.append(b' %i parts total\n' % len(self._parts))
732 self.ui.debug(b''.join(msg))
732 self.ui.debug(b''.join(msg))
733 outdebug(self.ui, b'start emission of %s stream' % self._magicstring)
733 outdebug(self.ui, b'start emission of %s stream' % self._magicstring)
734 yield self._magicstring
734 yield self._magicstring
735 param = self._paramchunk()
735 param = self._paramchunk()
736 outdebug(self.ui, b'bundle parameter: %s' % param)
736 outdebug(self.ui, b'bundle parameter: %s' % param)
737 yield _pack(_fstreamparamsize, len(param))
737 yield _pack(_fstreamparamsize, len(param))
738 if param:
738 if param:
739 yield param
739 yield param
740 for chunk in self._compengine.compressstream(
740 for chunk in self._compengine.compressstream(
741 self._getcorechunk(), self._compopts
741 self._getcorechunk(), self._compopts
742 ):
742 ):
743 yield chunk
743 yield chunk
744
744
745 def _paramchunk(self):
745 def _paramchunk(self):
746 """return a encoded version of all stream parameters"""
746 """return a encoded version of all stream parameters"""
747 blocks = []
747 blocks = []
748 for par, value in self._params:
748 for par, value in self._params:
749 par = urlreq.quote(par)
749 par = urlreq.quote(par)
750 if value is not None:
750 if value is not None:
751 value = urlreq.quote(value)
751 value = urlreq.quote(value)
752 par = b'%s=%s' % (par, value)
752 par = b'%s=%s' % (par, value)
753 blocks.append(par)
753 blocks.append(par)
754 return b' '.join(blocks)
754 return b' '.join(blocks)
755
755
756 def _getcorechunk(self):
756 def _getcorechunk(self):
757 """yield chunk for the core part of the bundle
757 """yield chunk for the core part of the bundle
758
758
759 (all but headers and parameters)"""
759 (all but headers and parameters)"""
760 outdebug(self.ui, b'start of parts')
760 outdebug(self.ui, b'start of parts')
761 for part in self._parts:
761 for part in self._parts:
762 outdebug(self.ui, b'bundle part: "%s"' % part.type)
762 outdebug(self.ui, b'bundle part: "%s"' % part.type)
763 for chunk in part.getchunks(ui=self.ui):
763 for chunk in part.getchunks(ui=self.ui):
764 yield chunk
764 yield chunk
765 outdebug(self.ui, b'end of bundle')
765 outdebug(self.ui, b'end of bundle')
766 yield _pack(_fpartheadersize, 0)
766 yield _pack(_fpartheadersize, 0)
767
767
768 def salvageoutput(self):
768 def salvageoutput(self):
769 """return a list with a copy of all output parts in the bundle
769 """return a list with a copy of all output parts in the bundle
770
770
771 This is meant to be used during error handling to make sure we preserve
771 This is meant to be used during error handling to make sure we preserve
772 server output"""
772 server output"""
773 salvaged = []
773 salvaged = []
774 for part in self._parts:
774 for part in self._parts:
775 if part.type.startswith(b'output'):
775 if part.type.startswith(b'output'):
776 salvaged.append(part.copy())
776 salvaged.append(part.copy())
777 return salvaged
777 return salvaged
778
778
779
779
780 class unpackermixin:
780 class unpackermixin:
781 """A mixin to extract bytes and struct data from a stream"""
781 """A mixin to extract bytes and struct data from a stream"""
782
782
783 def __init__(self, fp):
783 def __init__(self, fp):
784 self._fp = fp
784 self._fp = fp
785
785
786 def _unpack(self, format):
786 def _unpack(self, format):
787 """unpack this struct format from the stream
787 """unpack this struct format from the stream
788
788
789 This method is meant for internal usage by the bundle2 protocol only.
789 This method is meant for internal usage by the bundle2 protocol only.
790 They directly manipulate the low level stream including bundle2 level
790 They directly manipulate the low level stream including bundle2 level
791 instruction.
791 instruction.
792
792
793 Do not use it to implement higher-level logic or methods."""
793 Do not use it to implement higher-level logic or methods."""
794 data = self._readexact(struct.calcsize(format))
794 data = self._readexact(struct.calcsize(format))
795 return _unpack(format, data)
795 return _unpack(format, data)
796
796
797 def _readexact(self, size):
797 def _readexact(self, size):
798 """read exactly <size> bytes from the stream
798 """read exactly <size> bytes from the stream
799
799
800 This method is meant for internal usage by the bundle2 protocol only.
800 This method is meant for internal usage by the bundle2 protocol only.
801 They directly manipulate the low level stream including bundle2 level
801 They directly manipulate the low level stream including bundle2 level
802 instruction.
802 instruction.
803
803
804 Do not use it to implement higher-level logic or methods."""
804 Do not use it to implement higher-level logic or methods."""
805 return changegroup.readexactly(self._fp, size)
805 return changegroup.readexactly(self._fp, size)
806
806
807
807
808 def getunbundler(ui, fp, magicstring=None):
808 def getunbundler(ui, fp, magicstring=None):
809 """return a valid unbundler object for a given magicstring"""
809 """return a valid unbundler object for a given magicstring"""
810 if magicstring is None:
810 if magicstring is None:
811 magicstring = changegroup.readexactly(fp, 4)
811 magicstring = changegroup.readexactly(fp, 4)
812 magic, version = magicstring[0:2], magicstring[2:4]
812 magic, version = magicstring[0:2], magicstring[2:4]
813 if magic != b'HG':
813 if magic != b'HG':
814 ui.debug(
814 ui.debug(
815 b"error: invalid magic: %r (version %r), should be 'HG'\n"
815 b"error: invalid magic: %r (version %r), should be 'HG'\n"
816 % (magic, version)
816 % (magic, version)
817 )
817 )
818 raise error.Abort(_(b'not a Mercurial bundle'))
818 raise error.Abort(_(b'not a Mercurial bundle'))
819 unbundlerclass = formatmap.get(version)
819 unbundlerclass = formatmap.get(version)
820 if unbundlerclass is None:
820 if unbundlerclass is None:
821 raise error.Abort(_(b'unknown bundle version %s') % version)
821 raise error.Abort(_(b'unknown bundle version %s') % version)
822 unbundler = unbundlerclass(ui, fp)
822 unbundler = unbundlerclass(ui, fp)
823 indebug(ui, b'start processing of %s stream' % magicstring)
823 indebug(ui, b'start processing of %s stream' % magicstring)
824 return unbundler
824 return unbundler
825
825
826
826
827 class unbundle20(unpackermixin):
827 class unbundle20(unpackermixin):
828 """interpret a bundle2 stream
828 """interpret a bundle2 stream
829
829
830 This class is fed with a binary stream and yields parts through its
830 This class is fed with a binary stream and yields parts through its
831 `iterparts` methods."""
831 `iterparts` methods."""
832
832
833 _magicstring = b'HG20'
833 _magicstring = b'HG20'
834
834
835 def __init__(self, ui, fp):
835 def __init__(self, ui, fp):
836 """If header is specified, we do not read it out of the stream."""
836 """If header is specified, we do not read it out of the stream."""
837 self.ui = ui
837 self.ui = ui
838 self._compengine = util.compengines.forbundletype(b'UN')
838 self._compengine = util.compengines.forbundletype(b'UN')
839 self._compressed = None
839 self._compressed = None
840 super(unbundle20, self).__init__(fp)
840 super(unbundle20, self).__init__(fp)
841
841
842 @util.propertycache
842 @util.propertycache
843 def params(self):
843 def params(self):
844 """dictionary of stream level parameters"""
844 """dictionary of stream level parameters"""
845 indebug(self.ui, b'reading bundle2 stream parameters')
845 indebug(self.ui, b'reading bundle2 stream parameters')
846 params = {}
846 params = {}
847 paramssize = self._unpack(_fstreamparamsize)[0]
847 paramssize = self._unpack(_fstreamparamsize)[0]
848 if paramssize < 0:
848 if paramssize < 0:
849 raise error.BundleValueError(
849 raise error.BundleValueError(
850 b'negative bundle param size: %i' % paramssize
850 b'negative bundle param size: %i' % paramssize
851 )
851 )
852 if paramssize:
852 if paramssize:
853 params = self._readexact(paramssize)
853 params = self._readexact(paramssize)
854 params = self._processallparams(params)
854 params = self._processallparams(params)
855 return params
855 return params
856
856
857 def _processallparams(self, paramsblock):
857 def _processallparams(self, paramsblock):
858 """ """
858 """ """
859 params = util.sortdict()
859 params = util.sortdict()
860 for p in paramsblock.split(b' '):
860 for p in paramsblock.split(b' '):
861 p = p.split(b'=', 1)
861 p = p.split(b'=', 1)
862 p = [urlreq.unquote(i) for i in p]
862 p = [urlreq.unquote(i) for i in p]
863 if len(p) < 2:
863 if len(p) < 2:
864 p.append(None)
864 p.append(None)
865 self._processparam(*p)
865 self._processparam(*p)
866 params[p[0]] = p[1]
866 params[p[0]] = p[1]
867 return params
867 return params
868
868
869 def _processparam(self, name, value):
869 def _processparam(self, name, value):
870 """process a parameter, applying its effect if needed
870 """process a parameter, applying its effect if needed
871
871
872 Parameter starting with a lower case letter are advisory and will be
872 Parameter starting with a lower case letter are advisory and will be
873 ignored when unknown. Those starting with an upper case letter are
873 ignored when unknown. Those starting with an upper case letter are
874 mandatory and will this function will raise a KeyError when unknown.
874 mandatory and will this function will raise a KeyError when unknown.
875
875
876 Note: no option are currently supported. Any input will be either
876 Note: no option are currently supported. Any input will be either
877 ignored or failing.
877 ignored or failing.
878 """
878 """
879 if not name:
879 if not name:
880 raise ValueError('empty parameter name')
880 raise ValueError('empty parameter name')
881 if name[0:1] not in pycompat.bytestr(
881 if name[0:1] not in pycompat.bytestr(
882 string.ascii_letters # pytype: disable=wrong-arg-types
882 string.ascii_letters # pytype: disable=wrong-arg-types
883 ):
883 ):
884 raise ValueError('non letter first character: %s' % name)
884 raise ValueError('non letter first character: %s' % name)
885 try:
885 try:
886 handler = b2streamparamsmap[name.lower()]
886 handler = b2streamparamsmap[name.lower()]
887 except KeyError:
887 except KeyError:
888 if name[0:1].islower():
888 if name[0:1].islower():
889 indebug(self.ui, b"ignoring unknown parameter %s" % name)
889 indebug(self.ui, b"ignoring unknown parameter %s" % name)
890 else:
890 else:
891 raise error.BundleUnknownFeatureError(params=(name,))
891 raise error.BundleUnknownFeatureError(params=(name,))
892 else:
892 else:
893 handler(self, name, value)
893 handler(self, name, value)
894
894
895 def _forwardchunks(self):
895 def _forwardchunks(self):
896 """utility to transfer a bundle2 as binary
896 """utility to transfer a bundle2 as binary
897
897
898 This is made necessary by the fact the 'getbundle' command over 'ssh'
898 This is made necessary by the fact the 'getbundle' command over 'ssh'
899 have no way to know then the reply end, relying on the bundle to be
899 have no way to know then the reply end, relying on the bundle to be
900 interpreted to know its end. This is terrible and we are sorry, but we
900 interpreted to know its end. This is terrible and we are sorry, but we
901 needed to move forward to get general delta enabled.
901 needed to move forward to get general delta enabled.
902 """
902 """
903 yield self._magicstring
903 yield self._magicstring
904 assert 'params' not in vars(self)
904 assert 'params' not in vars(self)
905 paramssize = self._unpack(_fstreamparamsize)[0]
905 paramssize = self._unpack(_fstreamparamsize)[0]
906 if paramssize < 0:
906 if paramssize < 0:
907 raise error.BundleValueError(
907 raise error.BundleValueError(
908 b'negative bundle param size: %i' % paramssize
908 b'negative bundle param size: %i' % paramssize
909 )
909 )
910 if paramssize:
910 if paramssize:
911 params = self._readexact(paramssize)
911 params = self._readexact(paramssize)
912 self._processallparams(params)
912 self._processallparams(params)
913 # The payload itself is decompressed below, so drop
913 # The payload itself is decompressed below, so drop
914 # the compression parameter passed down to compensate.
914 # the compression parameter passed down to compensate.
915 outparams = []
915 outparams = []
916 for p in params.split(b' '):
916 for p in params.split(b' '):
917 k, v = p.split(b'=', 1)
917 k, v = p.split(b'=', 1)
918 if k.lower() != b'compression':
918 if k.lower() != b'compression':
919 outparams.append(p)
919 outparams.append(p)
920 outparams = b' '.join(outparams)
920 outparams = b' '.join(outparams)
921 yield _pack(_fstreamparamsize, len(outparams))
921 yield _pack(_fstreamparamsize, len(outparams))
922 yield outparams
922 yield outparams
923 else:
923 else:
924 yield _pack(_fstreamparamsize, paramssize)
924 yield _pack(_fstreamparamsize, paramssize)
925 # From there, payload might need to be decompressed
925 # From there, payload might need to be decompressed
926 self._fp = self._compengine.decompressorreader(self._fp)
926 self._fp = self._compengine.decompressorreader(self._fp)
927 emptycount = 0
927 emptycount = 0
928 while emptycount < 2:
928 while emptycount < 2:
929 # so we can brainlessly loop
929 # so we can brainlessly loop
930 assert _fpartheadersize == _fpayloadsize
930 assert _fpartheadersize == _fpayloadsize
931 size = self._unpack(_fpartheadersize)[0]
931 size = self._unpack(_fpartheadersize)[0]
932 yield _pack(_fpartheadersize, size)
932 yield _pack(_fpartheadersize, size)
933 if size:
933 if size:
934 emptycount = 0
934 emptycount = 0
935 else:
935 else:
936 emptycount += 1
936 emptycount += 1
937 continue
937 continue
938 if size == flaginterrupt:
938 if size == flaginterrupt:
939 continue
939 continue
940 elif size < 0:
940 elif size < 0:
941 raise error.BundleValueError(b'negative chunk size: %i')
941 raise error.BundleValueError(b'negative chunk size: %i')
942 yield self._readexact(size)
942 yield self._readexact(size)
943
943
944 def iterparts(self, seekable=False):
944 def iterparts(self, seekable=False):
945 """yield all parts contained in the stream"""
945 """yield all parts contained in the stream"""
946 cls = seekableunbundlepart if seekable else unbundlepart
946 cls = seekableunbundlepart if seekable else unbundlepart
947 # make sure param have been loaded
947 # make sure param have been loaded
948 self.params
948 self.params
949 # From there, payload need to be decompressed
949 # From there, payload need to be decompressed
950 self._fp = self._compengine.decompressorreader(self._fp)
950 self._fp = self._compengine.decompressorreader(self._fp)
951 indebug(self.ui, b'start extraction of bundle2 parts')
951 indebug(self.ui, b'start extraction of bundle2 parts')
952 headerblock = self._readpartheader()
952 headerblock = self._readpartheader()
953 while headerblock is not None:
953 while headerblock is not None:
954 part = cls(self.ui, headerblock, self._fp)
954 part = cls(self.ui, headerblock, self._fp)
955 yield part
955 yield part
956 # Ensure part is fully consumed so we can start reading the next
956 # Ensure part is fully consumed so we can start reading the next
957 # part.
957 # part.
958 part.consume()
958 part.consume()
959
959
960 headerblock = self._readpartheader()
960 headerblock = self._readpartheader()
961 indebug(self.ui, b'end of bundle2 stream')
961 indebug(self.ui, b'end of bundle2 stream')
962
962
963 def _readpartheader(self):
963 def _readpartheader(self):
964 """reads a part header size and return the bytes blob
964 """reads a part header size and return the bytes blob
965
965
966 returns None if empty"""
966 returns None if empty"""
967 headersize = self._unpack(_fpartheadersize)[0]
967 headersize = self._unpack(_fpartheadersize)[0]
968 if headersize < 0:
968 if headersize < 0:
969 raise error.BundleValueError(
969 raise error.BundleValueError(
970 b'negative part header size: %i' % headersize
970 b'negative part header size: %i' % headersize
971 )
971 )
972 indebug(self.ui, b'part header size: %i' % headersize)
972 indebug(self.ui, b'part header size: %i' % headersize)
973 if headersize:
973 if headersize:
974 return self._readexact(headersize)
974 return self._readexact(headersize)
975 return None
975 return None
976
976
977 def compressed(self):
977 def compressed(self):
978 self.params # load params
978 self.params # load params
979 return self._compressed
979 return self._compressed
980
980
981 def close(self):
981 def close(self):
982 """close underlying file"""
982 """close underlying file"""
983 if util.safehasattr(self._fp, 'close'):
983 if util.safehasattr(self._fp, 'close'):
984 return self._fp.close()
984 return self._fp.close()
985
985
986
986
987 formatmap = {b'20': unbundle20}
987 formatmap = {b'20': unbundle20}
988
988
989 b2streamparamsmap = {}
989 b2streamparamsmap = {}
990
990
991
991
992 def b2streamparamhandler(name):
992 def b2streamparamhandler(name):
993 """register a handler for a stream level parameter"""
993 """register a handler for a stream level parameter"""
994
994
995 def decorator(func):
995 def decorator(func):
996 assert name not in formatmap
996 assert name not in formatmap
997 b2streamparamsmap[name] = func
997 b2streamparamsmap[name] = func
998 return func
998 return func
999
999
1000 return decorator
1000 return decorator
1001
1001
1002
1002
1003 @b2streamparamhandler(b'compression')
1003 @b2streamparamhandler(b'compression')
1004 def processcompression(unbundler, param, value):
1004 def processcompression(unbundler, param, value):
1005 """read compression parameter and install payload decompression"""
1005 """read compression parameter and install payload decompression"""
1006 if value not in util.compengines.supportedbundletypes:
1006 if value not in util.compengines.supportedbundletypes:
1007 raise error.BundleUnknownFeatureError(params=(param,), values=(value,))
1007 raise error.BundleUnknownFeatureError(params=(param,), values=(value,))
1008 unbundler._compengine = util.compengines.forbundletype(value)
1008 unbundler._compengine = util.compengines.forbundletype(value)
1009 if value is not None:
1009 if value is not None:
1010 unbundler._compressed = True
1010 unbundler._compressed = True
1011
1011
1012
1012
1013 class bundlepart:
1013 class bundlepart:
1014 """A bundle2 part contains application level payload
1014 """A bundle2 part contains application level payload
1015
1015
1016 The part `type` is used to route the part to the application level
1016 The part `type` is used to route the part to the application level
1017 handler.
1017 handler.
1018
1018
1019 The part payload is contained in ``part.data``. It could be raw bytes or a
1019 The part payload is contained in ``part.data``. It could be raw bytes or a
1020 generator of byte chunks.
1020 generator of byte chunks.
1021
1021
1022 You can add parameters to the part using the ``addparam`` method.
1022 You can add parameters to the part using the ``addparam`` method.
1023 Parameters can be either mandatory (default) or advisory. Remote side
1023 Parameters can be either mandatory (default) or advisory. Remote side
1024 should be able to safely ignore the advisory ones.
1024 should be able to safely ignore the advisory ones.
1025
1025
1026 Both data and parameters cannot be modified after the generation has begun.
1026 Both data and parameters cannot be modified after the generation has begun.
1027 """
1027 """
1028
1028
1029 def __init__(
1029 def __init__(
1030 self,
1030 self,
1031 parttype,
1031 parttype,
1032 mandatoryparams=(),
1032 mandatoryparams=(),
1033 advisoryparams=(),
1033 advisoryparams=(),
1034 data=b'',
1034 data=b'',
1035 mandatory=True,
1035 mandatory=True,
1036 ):
1036 ):
1037 validateparttype(parttype)
1037 validateparttype(parttype)
1038 self.id = None
1038 self.id = None
1039 self.type = parttype
1039 self.type = parttype
1040 self._data = data
1040 self._data = data
1041 self._mandatoryparams = list(mandatoryparams)
1041 self._mandatoryparams = list(mandatoryparams)
1042 self._advisoryparams = list(advisoryparams)
1042 self._advisoryparams = list(advisoryparams)
1043 # checking for duplicated entries
1043 # checking for duplicated entries
1044 self._seenparams = set()
1044 self._seenparams = set()
1045 for pname, __ in self._mandatoryparams + self._advisoryparams:
1045 for pname, __ in self._mandatoryparams + self._advisoryparams:
1046 if pname in self._seenparams:
1046 if pname in self._seenparams:
1047 raise error.ProgrammingError(b'duplicated params: %s' % pname)
1047 raise error.ProgrammingError(b'duplicated params: %s' % pname)
1048 self._seenparams.add(pname)
1048 self._seenparams.add(pname)
1049 # status of the part's generation:
1049 # status of the part's generation:
1050 # - None: not started,
1050 # - None: not started,
1051 # - False: currently generated,
1051 # - False: currently generated,
1052 # - True: generation done.
1052 # - True: generation done.
1053 self._generated = None
1053 self._generated = None
1054 self.mandatory = mandatory
1054 self.mandatory = mandatory
1055
1055
1056 def __repr__(self):
1056 def __repr__(self):
1057 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
1057 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
1058 return '<%s object at %x; id: %s; type: %s; mandatory: %s>' % (
1058 return '<%s object at %x; id: %s; type: %s; mandatory: %s>' % (
1059 cls,
1059 cls,
1060 id(self),
1060 id(self),
1061 self.id,
1061 self.id,
1062 self.type,
1062 self.type,
1063 self.mandatory,
1063 self.mandatory,
1064 )
1064 )
1065
1065
1066 def copy(self):
1066 def copy(self):
1067 """return a copy of the part
1067 """return a copy of the part
1068
1068
1069 The new part have the very same content but no partid assigned yet.
1069 The new part have the very same content but no partid assigned yet.
1070 Parts with generated data cannot be copied."""
1070 Parts with generated data cannot be copied."""
1071 assert not util.safehasattr(self.data, 'next')
1071 assert not util.safehasattr(self.data, 'next')
1072 return self.__class__(
1072 return self.__class__(
1073 self.type,
1073 self.type,
1074 self._mandatoryparams,
1074 self._mandatoryparams,
1075 self._advisoryparams,
1075 self._advisoryparams,
1076 self._data,
1076 self._data,
1077 self.mandatory,
1077 self.mandatory,
1078 )
1078 )
1079
1079
1080 # methods used to defines the part content
1080 # methods used to defines the part content
1081 @property
1081 @property
1082 def data(self):
1082 def data(self):
1083 return self._data
1083 return self._data
1084
1084
1085 @data.setter
1085 @data.setter
1086 def data(self, data):
1086 def data(self, data):
1087 if self._generated is not None:
1087 if self._generated is not None:
1088 raise error.ReadOnlyPartError(b'part is being generated')
1088 raise error.ReadOnlyPartError(b'part is being generated')
1089 self._data = data
1089 self._data = data
1090
1090
1091 @property
1091 @property
1092 def mandatoryparams(self):
1092 def mandatoryparams(self):
1093 # make it an immutable tuple to force people through ``addparam``
1093 # make it an immutable tuple to force people through ``addparam``
1094 return tuple(self._mandatoryparams)
1094 return tuple(self._mandatoryparams)
1095
1095
1096 @property
1096 @property
1097 def advisoryparams(self):
1097 def advisoryparams(self):
1098 # make it an immutable tuple to force people through ``addparam``
1098 # make it an immutable tuple to force people through ``addparam``
1099 return tuple(self._advisoryparams)
1099 return tuple(self._advisoryparams)
1100
1100
1101 def addparam(self, name, value=b'', mandatory=True):
1101 def addparam(self, name, value=b'', mandatory=True):
1102 """add a parameter to the part
1102 """add a parameter to the part
1103
1103
1104 If 'mandatory' is set to True, the remote handler must claim support
1104 If 'mandatory' is set to True, the remote handler must claim support
1105 for this parameter or the unbundling will be aborted.
1105 for this parameter or the unbundling will be aborted.
1106
1106
1107 The 'name' and 'value' cannot exceed 255 bytes each.
1107 The 'name' and 'value' cannot exceed 255 bytes each.
1108 """
1108 """
1109 if self._generated is not None:
1109 if self._generated is not None:
1110 raise error.ReadOnlyPartError(b'part is being generated')
1110 raise error.ReadOnlyPartError(b'part is being generated')
1111 if name in self._seenparams:
1111 if name in self._seenparams:
1112 raise ValueError(b'duplicated params: %s' % name)
1112 raise ValueError(b'duplicated params: %s' % name)
1113 self._seenparams.add(name)
1113 self._seenparams.add(name)
1114 params = self._advisoryparams
1114 params = self._advisoryparams
1115 if mandatory:
1115 if mandatory:
1116 params = self._mandatoryparams
1116 params = self._mandatoryparams
1117 params.append((name, value))
1117 params.append((name, value))
1118
1118
1119 # methods used to generates the bundle2 stream
1119 # methods used to generates the bundle2 stream
1120 def getchunks(self, ui):
1120 def getchunks(self, ui):
1121 if self._generated is not None:
1121 if self._generated is not None:
1122 raise error.ProgrammingError(b'part can only be consumed once')
1122 raise error.ProgrammingError(b'part can only be consumed once')
1123 self._generated = False
1123 self._generated = False
1124
1124
1125 if ui.debugflag:
1125 if ui.debugflag:
1126 msg = [b'bundle2-output-part: "%s"' % self.type]
1126 msg = [b'bundle2-output-part: "%s"' % self.type]
1127 if not self.mandatory:
1127 if not self.mandatory:
1128 msg.append(b' (advisory)')
1128 msg.append(b' (advisory)')
1129 nbmp = len(self.mandatoryparams)
1129 nbmp = len(self.mandatoryparams)
1130 nbap = len(self.advisoryparams)
1130 nbap = len(self.advisoryparams)
1131 if nbmp or nbap:
1131 if nbmp or nbap:
1132 msg.append(b' (params:')
1132 msg.append(b' (params:')
1133 if nbmp:
1133 if nbmp:
1134 msg.append(b' %i mandatory' % nbmp)
1134 msg.append(b' %i mandatory' % nbmp)
1135 if nbap:
1135 if nbap:
1136 msg.append(b' %i advisory' % nbmp)
1136 msg.append(b' %i advisory' % nbmp)
1137 msg.append(b')')
1137 msg.append(b')')
1138 if not self.data:
1138 if not self.data:
1139 msg.append(b' empty payload')
1139 msg.append(b' empty payload')
1140 elif util.safehasattr(self.data, 'next') or util.safehasattr(
1140 elif util.safehasattr(self.data, 'next') or util.safehasattr(
1141 self.data, b'__next__'
1141 self.data, b'__next__'
1142 ):
1142 ):
1143 msg.append(b' streamed payload')
1143 msg.append(b' streamed payload')
1144 else:
1144 else:
1145 msg.append(b' %i bytes payload' % len(self.data))
1145 msg.append(b' %i bytes payload' % len(self.data))
1146 msg.append(b'\n')
1146 msg.append(b'\n')
1147 ui.debug(b''.join(msg))
1147 ui.debug(b''.join(msg))
1148
1148
1149 #### header
1149 #### header
1150 if self.mandatory:
1150 if self.mandatory:
1151 parttype = self.type.upper()
1151 parttype = self.type.upper()
1152 else:
1152 else:
1153 parttype = self.type.lower()
1153 parttype = self.type.lower()
1154 outdebug(ui, b'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1154 outdebug(ui, b'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1155 ## parttype
1155 ## parttype
1156 header = [
1156 header = [
1157 _pack(_fparttypesize, len(parttype)),
1157 _pack(_fparttypesize, len(parttype)),
1158 parttype,
1158 parttype,
1159 _pack(_fpartid, self.id),
1159 _pack(_fpartid, self.id),
1160 ]
1160 ]
1161 ## parameters
1161 ## parameters
1162 # count
1162 # count
1163 manpar = self.mandatoryparams
1163 manpar = self.mandatoryparams
1164 advpar = self.advisoryparams
1164 advpar = self.advisoryparams
1165 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1165 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1166 # size
1166 # size
1167 parsizes = []
1167 parsizes = []
1168 for key, value in manpar:
1168 for key, value in manpar:
1169 parsizes.append(len(key))
1169 parsizes.append(len(key))
1170 parsizes.append(len(value))
1170 parsizes.append(len(value))
1171 for key, value in advpar:
1171 for key, value in advpar:
1172 parsizes.append(len(key))
1172 parsizes.append(len(key))
1173 parsizes.append(len(value))
1173 parsizes.append(len(value))
1174 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1174 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1175 header.append(paramsizes)
1175 header.append(paramsizes)
1176 # key, value
1176 # key, value
1177 for key, value in manpar:
1177 for key, value in manpar:
1178 header.append(key)
1178 header.append(key)
1179 header.append(value)
1179 header.append(value)
1180 for key, value in advpar:
1180 for key, value in advpar:
1181 header.append(key)
1181 header.append(key)
1182 header.append(value)
1182 header.append(value)
1183 ## finalize header
1183 ## finalize header
1184 try:
1184 try:
1185 headerchunk = b''.join(header)
1185 headerchunk = b''.join(header)
1186 except TypeError:
1186 except TypeError:
1187 raise TypeError(
1187 raise TypeError(
1188 'Found a non-bytes trying to '
1188 'Found a non-bytes trying to '
1189 'build bundle part header: %r' % header
1189 'build bundle part header: %r' % header
1190 )
1190 )
1191 outdebug(ui, b'header chunk size: %i' % len(headerchunk))
1191 outdebug(ui, b'header chunk size: %i' % len(headerchunk))
1192 yield _pack(_fpartheadersize, len(headerchunk))
1192 yield _pack(_fpartheadersize, len(headerchunk))
1193 yield headerchunk
1193 yield headerchunk
1194 ## payload
1194 ## payload
1195 try:
1195 try:
1196 for chunk in self._payloadchunks():
1196 for chunk in self._payloadchunks():
1197 outdebug(ui, b'payload chunk size: %i' % len(chunk))
1197 outdebug(ui, b'payload chunk size: %i' % len(chunk))
1198 yield _pack(_fpayloadsize, len(chunk))
1198 yield _pack(_fpayloadsize, len(chunk))
1199 yield chunk
1199 yield chunk
1200 except GeneratorExit:
1200 except GeneratorExit:
1201 # GeneratorExit means that nobody is listening for our
1201 # GeneratorExit means that nobody is listening for our
1202 # results anyway, so just bail quickly rather than trying
1202 # results anyway, so just bail quickly rather than trying
1203 # to produce an error part.
1203 # to produce an error part.
1204 ui.debug(b'bundle2-generatorexit\n')
1204 ui.debug(b'bundle2-generatorexit\n')
1205 raise
1205 raise
1206 except BaseException as exc:
1206 except BaseException as exc:
1207 bexc = stringutil.forcebytestr(exc)
1207 bexc = stringutil.forcebytestr(exc)
1208 # backup exception data for later
1208 # backup exception data for later
1209 ui.debug(
1209 ui.debug(
1210 b'bundle2-input-stream-interrupt: encoding exception %s' % bexc
1210 b'bundle2-input-stream-interrupt: encoding exception %s' % bexc
1211 )
1211 )
1212 tb = sys.exc_info()[2]
1212 tb = sys.exc_info()[2]
1213 msg = b'unexpected error: %s' % bexc
1213 msg = b'unexpected error: %s' % bexc
1214 interpart = bundlepart(
1214 interpart = bundlepart(
1215 b'error:abort', [(b'message', msg)], mandatory=False
1215 b'error:abort', [(b'message', msg)], mandatory=False
1216 )
1216 )
1217 interpart.id = 0
1217 interpart.id = 0
1218 yield _pack(_fpayloadsize, -1)
1218 yield _pack(_fpayloadsize, -1)
1219 for chunk in interpart.getchunks(ui=ui):
1219 for chunk in interpart.getchunks(ui=ui):
1220 yield chunk
1220 yield chunk
1221 outdebug(ui, b'closing payload chunk')
1221 outdebug(ui, b'closing payload chunk')
1222 # abort current part payload
1222 # abort current part payload
1223 yield _pack(_fpayloadsize, 0)
1223 yield _pack(_fpayloadsize, 0)
1224 pycompat.raisewithtb(exc, tb)
1224 pycompat.raisewithtb(exc, tb)
1225 # end of payload
1225 # end of payload
1226 outdebug(ui, b'closing payload chunk')
1226 outdebug(ui, b'closing payload chunk')
1227 yield _pack(_fpayloadsize, 0)
1227 yield _pack(_fpayloadsize, 0)
1228 self._generated = True
1228 self._generated = True
1229
1229
1230 def _payloadchunks(self):
1230 def _payloadchunks(self):
1231 """yield chunks of a the part payload
1231 """yield chunks of a the part payload
1232
1232
1233 Exists to handle the different methods to provide data to a part."""
1233 Exists to handle the different methods to provide data to a part."""
1234 # we only support fixed size data now.
1234 # we only support fixed size data now.
1235 # This will be improved in the future.
1235 # This will be improved in the future.
1236 if util.safehasattr(self.data, 'next') or util.safehasattr(
1236 if util.safehasattr(self.data, 'next') or util.safehasattr(
1237 self.data, b'__next__'
1237 self.data, b'__next__'
1238 ):
1238 ):
1239 buff = util.chunkbuffer(self.data)
1239 buff = util.chunkbuffer(self.data)
1240 chunk = buff.read(preferedchunksize)
1240 chunk = buff.read(preferedchunksize)
1241 while chunk:
1241 while chunk:
1242 yield chunk
1242 yield chunk
1243 chunk = buff.read(preferedchunksize)
1243 chunk = buff.read(preferedchunksize)
1244 elif len(self.data):
1244 elif len(self.data):
1245 yield self.data
1245 yield self.data
1246
1246
1247
1247
1248 flaginterrupt = -1
1248 flaginterrupt = -1
1249
1249
1250
1250
1251 class interrupthandler(unpackermixin):
1251 class interrupthandler(unpackermixin):
1252 """read one part and process it with restricted capability
1252 """read one part and process it with restricted capability
1253
1253
1254 This allows to transmit exception raised on the producer size during part
1254 This allows to transmit exception raised on the producer size during part
1255 iteration while the consumer is reading a part.
1255 iteration while the consumer is reading a part.
1256
1256
1257 Part processed in this manner only have access to a ui object,"""
1257 Part processed in this manner only have access to a ui object,"""
1258
1258
1259 def __init__(self, ui, fp):
1259 def __init__(self, ui, fp):
1260 super(interrupthandler, self).__init__(fp)
1260 super(interrupthandler, self).__init__(fp)
1261 self.ui = ui
1261 self.ui = ui
1262
1262
1263 def _readpartheader(self):
1263 def _readpartheader(self):
1264 """reads a part header size and return the bytes blob
1264 """reads a part header size and return the bytes blob
1265
1265
1266 returns None if empty"""
1266 returns None if empty"""
1267 headersize = self._unpack(_fpartheadersize)[0]
1267 headersize = self._unpack(_fpartheadersize)[0]
1268 if headersize < 0:
1268 if headersize < 0:
1269 raise error.BundleValueError(
1269 raise error.BundleValueError(
1270 b'negative part header size: %i' % headersize
1270 b'negative part header size: %i' % headersize
1271 )
1271 )
1272 indebug(self.ui, b'part header size: %i\n' % headersize)
1272 indebug(self.ui, b'part header size: %i\n' % headersize)
1273 if headersize:
1273 if headersize:
1274 return self._readexact(headersize)
1274 return self._readexact(headersize)
1275 return None
1275 return None
1276
1276
1277 def __call__(self):
1277 def __call__(self):
1278
1278
1279 self.ui.debug(
1279 self.ui.debug(
1280 b'bundle2-input-stream-interrupt: opening out of band context\n'
1280 b'bundle2-input-stream-interrupt: opening out of band context\n'
1281 )
1281 )
1282 indebug(self.ui, b'bundle2 stream interruption, looking for a part.')
1282 indebug(self.ui, b'bundle2 stream interruption, looking for a part.')
1283 headerblock = self._readpartheader()
1283 headerblock = self._readpartheader()
1284 if headerblock is None:
1284 if headerblock is None:
1285 indebug(self.ui, b'no part found during interruption.')
1285 indebug(self.ui, b'no part found during interruption.')
1286 return
1286 return
1287 part = unbundlepart(self.ui, headerblock, self._fp)
1287 part = unbundlepart(self.ui, headerblock, self._fp)
1288 op = interruptoperation(self.ui)
1288 op = interruptoperation(self.ui)
1289 hardabort = False
1289 hardabort = False
1290 try:
1290 try:
1291 _processpart(op, part)
1291 _processpart(op, part)
1292 except (SystemExit, KeyboardInterrupt):
1292 except (SystemExit, KeyboardInterrupt):
1293 hardabort = True
1293 hardabort = True
1294 raise
1294 raise
1295 finally:
1295 finally:
1296 if not hardabort:
1296 if not hardabort:
1297 part.consume()
1297 part.consume()
1298 self.ui.debug(
1298 self.ui.debug(
1299 b'bundle2-input-stream-interrupt: closing out of band context\n'
1299 b'bundle2-input-stream-interrupt: closing out of band context\n'
1300 )
1300 )
1301
1301
1302
1302
1303 class interruptoperation:
1303 class interruptoperation:
1304 """A limited operation to be use by part handler during interruption
1304 """A limited operation to be use by part handler during interruption
1305
1305
1306 It only have access to an ui object.
1306 It only have access to an ui object.
1307 """
1307 """
1308
1308
1309 def __init__(self, ui):
1309 def __init__(self, ui):
1310 self.ui = ui
1310 self.ui = ui
1311 self.reply = None
1311 self.reply = None
1312 self.captureoutput = False
1312 self.captureoutput = False
1313
1313
1314 @property
1314 @property
1315 def repo(self):
1315 def repo(self):
1316 raise error.ProgrammingError(b'no repo access from stream interruption')
1316 raise error.ProgrammingError(b'no repo access from stream interruption')
1317
1317
1318 def gettransaction(self):
1318 def gettransaction(self):
1319 raise TransactionUnavailable(b'no repo access from stream interruption')
1319 raise TransactionUnavailable(b'no repo access from stream interruption')
1320
1320
1321
1321
1322 def decodepayloadchunks(ui, fh):
1322 def decodepayloadchunks(ui, fh):
1323 """Reads bundle2 part payload data into chunks.
1323 """Reads bundle2 part payload data into chunks.
1324
1324
1325 Part payload data consists of framed chunks. This function takes
1325 Part payload data consists of framed chunks. This function takes
1326 a file handle and emits those chunks.
1326 a file handle and emits those chunks.
1327 """
1327 """
1328 dolog = ui.configbool(b'devel', b'bundle2.debug')
1328 dolog = ui.configbool(b'devel', b'bundle2.debug')
1329 debug = ui.debug
1329 debug = ui.debug
1330
1330
1331 headerstruct = struct.Struct(_fpayloadsize)
1331 headerstruct = struct.Struct(_fpayloadsize)
1332 headersize = headerstruct.size
1332 headersize = headerstruct.size
1333 unpack = headerstruct.unpack
1333 unpack = headerstruct.unpack
1334
1334
1335 readexactly = changegroup.readexactly
1335 readexactly = changegroup.readexactly
1336 read = fh.read
1336 read = fh.read
1337
1337
1338 chunksize = unpack(readexactly(fh, headersize))[0]
1338 chunksize = unpack(readexactly(fh, headersize))[0]
1339 indebug(ui, b'payload chunk size: %i' % chunksize)
1339 indebug(ui, b'payload chunk size: %i' % chunksize)
1340
1340
1341 # changegroup.readexactly() is inlined below for performance.
1341 # changegroup.readexactly() is inlined below for performance.
1342 while chunksize:
1342 while chunksize:
1343 if chunksize >= 0:
1343 if chunksize >= 0:
1344 s = read(chunksize)
1344 s = read(chunksize)
1345 if len(s) < chunksize:
1345 if len(s) < chunksize:
1346 raise error.Abort(
1346 raise error.Abort(
1347 _(
1347 _(
1348 b'stream ended unexpectedly '
1348 b'stream ended unexpectedly '
1349 b' (got %d bytes, expected %d)'
1349 b' (got %d bytes, expected %d)'
1350 )
1350 )
1351 % (len(s), chunksize)
1351 % (len(s), chunksize)
1352 )
1352 )
1353
1353
1354 yield s
1354 yield s
1355 elif chunksize == flaginterrupt:
1355 elif chunksize == flaginterrupt:
1356 # Interrupt "signal" detected. The regular stream is interrupted
1356 # Interrupt "signal" detected. The regular stream is interrupted
1357 # and a bundle2 part follows. Consume it.
1357 # and a bundle2 part follows. Consume it.
1358 interrupthandler(ui, fh)()
1358 interrupthandler(ui, fh)()
1359 else:
1359 else:
1360 raise error.BundleValueError(
1360 raise error.BundleValueError(
1361 b'negative payload chunk size: %s' % chunksize
1361 b'negative payload chunk size: %s' % chunksize
1362 )
1362 )
1363
1363
1364 s = read(headersize)
1364 s = read(headersize)
1365 if len(s) < headersize:
1365 if len(s) < headersize:
1366 raise error.Abort(
1366 raise error.Abort(
1367 _(b'stream ended unexpectedly (got %d bytes, expected %d)')
1367 _(b'stream ended unexpectedly (got %d bytes, expected %d)')
1368 % (len(s), chunksize)
1368 % (len(s), chunksize)
1369 )
1369 )
1370
1370
1371 chunksize = unpack(s)[0]
1371 chunksize = unpack(s)[0]
1372
1372
1373 # indebug() inlined for performance.
1373 # indebug() inlined for performance.
1374 if dolog:
1374 if dolog:
1375 debug(b'bundle2-input: payload chunk size: %i\n' % chunksize)
1375 debug(b'bundle2-input: payload chunk size: %i\n' % chunksize)
1376
1376
1377
1377
1378 class unbundlepart(unpackermixin):
1378 class unbundlepart(unpackermixin):
1379 """a bundle part read from a bundle"""
1379 """a bundle part read from a bundle"""
1380
1380
1381 def __init__(self, ui, header, fp):
1381 def __init__(self, ui, header, fp):
1382 super(unbundlepart, self).__init__(fp)
1382 super(unbundlepart, self).__init__(fp)
1383 self._seekable = util.safehasattr(fp, 'seek') and util.safehasattr(
1383 self._seekable = util.safehasattr(fp, 'seek') and util.safehasattr(
1384 fp, b'tell'
1384 fp, b'tell'
1385 )
1385 )
1386 self.ui = ui
1386 self.ui = ui
1387 # unbundle state attr
1387 # unbundle state attr
1388 self._headerdata = header
1388 self._headerdata = header
1389 self._headeroffset = 0
1389 self._headeroffset = 0
1390 self._initialized = False
1390 self._initialized = False
1391 self.consumed = False
1391 self.consumed = False
1392 # part data
1392 # part data
1393 self.id = None
1393 self.id = None
1394 self.type = None
1394 self.type = None
1395 self.mandatoryparams = None
1395 self.mandatoryparams = None
1396 self.advisoryparams = None
1396 self.advisoryparams = None
1397 self.params = None
1397 self.params = None
1398 self.mandatorykeys = ()
1398 self.mandatorykeys = ()
1399 self._readheader()
1399 self._readheader()
1400 self._mandatory = None
1400 self._mandatory = None
1401 self._pos = 0
1401 self._pos = 0
1402
1402
1403 def _fromheader(self, size):
1403 def _fromheader(self, size):
1404 """return the next <size> byte from the header"""
1404 """return the next <size> byte from the header"""
1405 offset = self._headeroffset
1405 offset = self._headeroffset
1406 data = self._headerdata[offset : (offset + size)]
1406 data = self._headerdata[offset : (offset + size)]
1407 self._headeroffset = offset + size
1407 self._headeroffset = offset + size
1408 return data
1408 return data
1409
1409
1410 def _unpackheader(self, format):
1410 def _unpackheader(self, format):
1411 """read given format from header
1411 """read given format from header
1412
1412
1413 This automatically compute the size of the format to read."""
1413 This automatically compute the size of the format to read."""
1414 data = self._fromheader(struct.calcsize(format))
1414 data = self._fromheader(struct.calcsize(format))
1415 return _unpack(format, data)
1415 return _unpack(format, data)
1416
1416
1417 def _initparams(self, mandatoryparams, advisoryparams):
1417 def _initparams(self, mandatoryparams, advisoryparams):
1418 """internal function to setup all logic related parameters"""
1418 """internal function to setup all logic related parameters"""
1419 # make it read only to prevent people touching it by mistake.
1419 # make it read only to prevent people touching it by mistake.
1420 self.mandatoryparams = tuple(mandatoryparams)
1420 self.mandatoryparams = tuple(mandatoryparams)
1421 self.advisoryparams = tuple(advisoryparams)
1421 self.advisoryparams = tuple(advisoryparams)
1422 # user friendly UI
1422 # user friendly UI
1423 self.params = util.sortdict(self.mandatoryparams)
1423 self.params = util.sortdict(self.mandatoryparams)
1424 self.params.update(self.advisoryparams)
1424 self.params.update(self.advisoryparams)
1425 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1425 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1426
1426
1427 def _readheader(self):
1427 def _readheader(self):
1428 """read the header and setup the object"""
1428 """read the header and setup the object"""
1429 typesize = self._unpackheader(_fparttypesize)[0]
1429 typesize = self._unpackheader(_fparttypesize)[0]
1430 self.type = self._fromheader(typesize)
1430 self.type = self._fromheader(typesize)
1431 indebug(self.ui, b'part type: "%s"' % self.type)
1431 indebug(self.ui, b'part type: "%s"' % self.type)
1432 self.id = self._unpackheader(_fpartid)[0]
1432 self.id = self._unpackheader(_fpartid)[0]
1433 indebug(self.ui, b'part id: "%s"' % pycompat.bytestr(self.id))
1433 indebug(self.ui, b'part id: "%s"' % pycompat.bytestr(self.id))
1434 # extract mandatory bit from type
1434 # extract mandatory bit from type
1435 self.mandatory = self.type != self.type.lower()
1435 self.mandatory = self.type != self.type.lower()
1436 self.type = self.type.lower()
1436 self.type = self.type.lower()
1437 ## reading parameters
1437 ## reading parameters
1438 # param count
1438 # param count
1439 mancount, advcount = self._unpackheader(_fpartparamcount)
1439 mancount, advcount = self._unpackheader(_fpartparamcount)
1440 indebug(self.ui, b'part parameters: %i' % (mancount + advcount))
1440 indebug(self.ui, b'part parameters: %i' % (mancount + advcount))
1441 # param size
1441 # param size
1442 fparamsizes = _makefpartparamsizes(mancount + advcount)
1442 fparamsizes = _makefpartparamsizes(mancount + advcount)
1443 paramsizes = self._unpackheader(fparamsizes)
1443 paramsizes = self._unpackheader(fparamsizes)
1444 # make it a list of couple again
1444 # make it a list of couple again
1445 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1445 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1446 # split mandatory from advisory
1446 # split mandatory from advisory
1447 mansizes = paramsizes[:mancount]
1447 mansizes = paramsizes[:mancount]
1448 advsizes = paramsizes[mancount:]
1448 advsizes = paramsizes[mancount:]
1449 # retrieve param value
1449 # retrieve param value
1450 manparams = []
1450 manparams = []
1451 for key, value in mansizes:
1451 for key, value in mansizes:
1452 manparams.append((self._fromheader(key), self._fromheader(value)))
1452 manparams.append((self._fromheader(key), self._fromheader(value)))
1453 advparams = []
1453 advparams = []
1454 for key, value in advsizes:
1454 for key, value in advsizes:
1455 advparams.append((self._fromheader(key), self._fromheader(value)))
1455 advparams.append((self._fromheader(key), self._fromheader(value)))
1456 self._initparams(manparams, advparams)
1456 self._initparams(manparams, advparams)
1457 ## part payload
1457 ## part payload
1458 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1458 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1459 # we read the data, tell it
1459 # we read the data, tell it
1460 self._initialized = True
1460 self._initialized = True
1461
1461
1462 def _payloadchunks(self):
1462 def _payloadchunks(self):
1463 """Generator of decoded chunks in the payload."""
1463 """Generator of decoded chunks in the payload."""
1464 return decodepayloadchunks(self.ui, self._fp)
1464 return decodepayloadchunks(self.ui, self._fp)
1465
1465
1466 def consume(self):
1466 def consume(self):
1467 """Read the part payload until completion.
1467 """Read the part payload until completion.
1468
1468
1469 By consuming the part data, the underlying stream read offset will
1469 By consuming the part data, the underlying stream read offset will
1470 be advanced to the next part (or end of stream).
1470 be advanced to the next part (or end of stream).
1471 """
1471 """
1472 if self.consumed:
1472 if self.consumed:
1473 return
1473 return
1474
1474
1475 chunk = self.read(32768)
1475 chunk = self.read(32768)
1476 while chunk:
1476 while chunk:
1477 self._pos += len(chunk)
1477 self._pos += len(chunk)
1478 chunk = self.read(32768)
1478 chunk = self.read(32768)
1479
1479
1480 def read(self, size=None):
1480 def read(self, size=None):
1481 """read payload data"""
1481 """read payload data"""
1482 if not self._initialized:
1482 if not self._initialized:
1483 self._readheader()
1483 self._readheader()
1484 if size is None:
1484 if size is None:
1485 data = self._payloadstream.read()
1485 data = self._payloadstream.read()
1486 else:
1486 else:
1487 data = self._payloadstream.read(size)
1487 data = self._payloadstream.read(size)
1488 self._pos += len(data)
1488 self._pos += len(data)
1489 if size is None or len(data) < size:
1489 if size is None or len(data) < size:
1490 if not self.consumed and self._pos:
1490 if not self.consumed and self._pos:
1491 self.ui.debug(
1491 self.ui.debug(
1492 b'bundle2-input-part: total payload size %i\n' % self._pos
1492 b'bundle2-input-part: total payload size %i\n' % self._pos
1493 )
1493 )
1494 self.consumed = True
1494 self.consumed = True
1495 return data
1495 return data
1496
1496
1497
1497
1498 class seekableunbundlepart(unbundlepart):
1498 class seekableunbundlepart(unbundlepart):
1499 """A bundle2 part in a bundle that is seekable.
1499 """A bundle2 part in a bundle that is seekable.
1500
1500
1501 Regular ``unbundlepart`` instances can only be read once. This class
1501 Regular ``unbundlepart`` instances can only be read once. This class
1502 extends ``unbundlepart`` to enable bi-directional seeking within the
1502 extends ``unbundlepart`` to enable bi-directional seeking within the
1503 part.
1503 part.
1504
1504
1505 Bundle2 part data consists of framed chunks. Offsets when seeking
1505 Bundle2 part data consists of framed chunks. Offsets when seeking
1506 refer to the decoded data, not the offsets in the underlying bundle2
1506 refer to the decoded data, not the offsets in the underlying bundle2
1507 stream.
1507 stream.
1508
1508
1509 To facilitate quickly seeking within the decoded data, instances of this
1509 To facilitate quickly seeking within the decoded data, instances of this
1510 class maintain a mapping between offsets in the underlying stream and
1510 class maintain a mapping between offsets in the underlying stream and
1511 the decoded payload. This mapping will consume memory in proportion
1511 the decoded payload. This mapping will consume memory in proportion
1512 to the number of chunks within the payload (which almost certainly
1512 to the number of chunks within the payload (which almost certainly
1513 increases in proportion with the size of the part).
1513 increases in proportion with the size of the part).
1514 """
1514 """
1515
1515
1516 def __init__(self, ui, header, fp):
1516 def __init__(self, ui, header, fp):
1517 # (payload, file) offsets for chunk starts.
1517 # (payload, file) offsets for chunk starts.
1518 self._chunkindex = []
1518 self._chunkindex = []
1519
1519
1520 super(seekableunbundlepart, self).__init__(ui, header, fp)
1520 super(seekableunbundlepart, self).__init__(ui, header, fp)
1521
1521
1522 def _payloadchunks(self, chunknum=0):
1522 def _payloadchunks(self, chunknum=0):
1523 '''seek to specified chunk and start yielding data'''
1523 '''seek to specified chunk and start yielding data'''
1524 if len(self._chunkindex) == 0:
1524 if len(self._chunkindex) == 0:
1525 assert chunknum == 0, b'Must start with chunk 0'
1525 assert chunknum == 0, b'Must start with chunk 0'
1526 self._chunkindex.append((0, self._tellfp()))
1526 self._chunkindex.append((0, self._tellfp()))
1527 else:
1527 else:
1528 assert chunknum < len(self._chunkindex), (
1528 assert chunknum < len(self._chunkindex), (
1529 b'Unknown chunk %d' % chunknum
1529 b'Unknown chunk %d' % chunknum
1530 )
1530 )
1531 self._seekfp(self._chunkindex[chunknum][1])
1531 self._seekfp(self._chunkindex[chunknum][1])
1532
1532
1533 pos = self._chunkindex[chunknum][0]
1533 pos = self._chunkindex[chunknum][0]
1534
1534
1535 for chunk in decodepayloadchunks(self.ui, self._fp):
1535 for chunk in decodepayloadchunks(self.ui, self._fp):
1536 chunknum += 1
1536 chunknum += 1
1537 pos += len(chunk)
1537 pos += len(chunk)
1538 if chunknum == len(self._chunkindex):
1538 if chunknum == len(self._chunkindex):
1539 self._chunkindex.append((pos, self._tellfp()))
1539 self._chunkindex.append((pos, self._tellfp()))
1540
1540
1541 yield chunk
1541 yield chunk
1542
1542
1543 def _findchunk(self, pos):
1543 def _findchunk(self, pos):
1544 '''for a given payload position, return a chunk number and offset'''
1544 '''for a given payload position, return a chunk number and offset'''
1545 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1545 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1546 if ppos == pos:
1546 if ppos == pos:
1547 return chunk, 0
1547 return chunk, 0
1548 elif ppos > pos:
1548 elif ppos > pos:
1549 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1549 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1550 raise ValueError(b'Unknown chunk')
1550 raise ValueError(b'Unknown chunk')
1551
1551
1552 def tell(self):
1552 def tell(self):
1553 return self._pos
1553 return self._pos
1554
1554
1555 def seek(self, offset, whence=os.SEEK_SET):
1555 def seek(self, offset, whence=os.SEEK_SET):
1556 if whence == os.SEEK_SET:
1556 if whence == os.SEEK_SET:
1557 newpos = offset
1557 newpos = offset
1558 elif whence == os.SEEK_CUR:
1558 elif whence == os.SEEK_CUR:
1559 newpos = self._pos + offset
1559 newpos = self._pos + offset
1560 elif whence == os.SEEK_END:
1560 elif whence == os.SEEK_END:
1561 if not self.consumed:
1561 if not self.consumed:
1562 # Can't use self.consume() here because it advances self._pos.
1562 # Can't use self.consume() here because it advances self._pos.
1563 chunk = self.read(32768)
1563 chunk = self.read(32768)
1564 while chunk:
1564 while chunk:
1565 chunk = self.read(32768)
1565 chunk = self.read(32768)
1566 newpos = self._chunkindex[-1][0] - offset
1566 newpos = self._chunkindex[-1][0] - offset
1567 else:
1567 else:
1568 raise ValueError(b'Unknown whence value: %r' % (whence,))
1568 raise ValueError(b'Unknown whence value: %r' % (whence,))
1569
1569
1570 if newpos > self._chunkindex[-1][0] and not self.consumed:
1570 if newpos > self._chunkindex[-1][0] and not self.consumed:
1571 # Can't use self.consume() here because it advances self._pos.
1571 # Can't use self.consume() here because it advances self._pos.
1572 chunk = self.read(32768)
1572 chunk = self.read(32768)
1573 while chunk:
1573 while chunk:
1574 chunk = self.read(32668)
1574 chunk = self.read(32668)
1575
1575
1576 if not 0 <= newpos <= self._chunkindex[-1][0]:
1576 if not 0 <= newpos <= self._chunkindex[-1][0]:
1577 raise ValueError(b'Offset out of range')
1577 raise ValueError(b'Offset out of range')
1578
1578
1579 if self._pos != newpos:
1579 if self._pos != newpos:
1580 chunk, internaloffset = self._findchunk(newpos)
1580 chunk, internaloffset = self._findchunk(newpos)
1581 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1581 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1582 adjust = self.read(internaloffset)
1582 adjust = self.read(internaloffset)
1583 if len(adjust) != internaloffset:
1583 if len(adjust) != internaloffset:
1584 raise error.Abort(_(b'Seek failed\n'))
1584 raise error.Abort(_(b'Seek failed\n'))
1585 self._pos = newpos
1585 self._pos = newpos
1586
1586
1587 def _seekfp(self, offset, whence=0):
1587 def _seekfp(self, offset, whence=0):
1588 """move the underlying file pointer
1588 """move the underlying file pointer
1589
1589
1590 This method is meant for internal usage by the bundle2 protocol only.
1590 This method is meant for internal usage by the bundle2 protocol only.
1591 They directly manipulate the low level stream including bundle2 level
1591 They directly manipulate the low level stream including bundle2 level
1592 instruction.
1592 instruction.
1593
1593
1594 Do not use it to implement higher-level logic or methods."""
1594 Do not use it to implement higher-level logic or methods."""
1595 if self._seekable:
1595 if self._seekable:
1596 return self._fp.seek(offset, whence)
1596 return self._fp.seek(offset, whence)
1597 else:
1597 else:
1598 raise NotImplementedError(_(b'File pointer is not seekable'))
1598 raise NotImplementedError(_(b'File pointer is not seekable'))
1599
1599
1600 def _tellfp(self):
1600 def _tellfp(self):
1601 """return the file offset, or None if file is not seekable
1601 """return the file offset, or None if file is not seekable
1602
1602
1603 This method is meant for internal usage by the bundle2 protocol only.
1603 This method is meant for internal usage by the bundle2 protocol only.
1604 They directly manipulate the low level stream including bundle2 level
1604 They directly manipulate the low level stream including bundle2 level
1605 instruction.
1605 instruction.
1606
1606
1607 Do not use it to implement higher-level logic or methods."""
1607 Do not use it to implement higher-level logic or methods."""
1608 if self._seekable:
1608 if self._seekable:
1609 try:
1609 try:
1610 return self._fp.tell()
1610 return self._fp.tell()
1611 except IOError as e:
1611 except IOError as e:
1612 if e.errno == errno.ESPIPE:
1612 if e.errno == errno.ESPIPE:
1613 self._seekable = False
1613 self._seekable = False
1614 else:
1614 else:
1615 raise
1615 raise
1616 return None
1616 return None
1617
1617
1618
1618
1619 # These are only the static capabilities.
1619 # These are only the static capabilities.
1620 # Check the 'getrepocaps' function for the rest.
1620 # Check the 'getrepocaps' function for the rest.
1621 capabilities = {
1621 capabilities = {
1622 b'HG20': (),
1622 b'HG20': (),
1623 b'bookmarks': (),
1623 b'bookmarks': (),
1624 b'error': (b'abort', b'unsupportedcontent', b'pushraced', b'pushkey'),
1624 b'error': (b'abort', b'unsupportedcontent', b'pushraced', b'pushkey'),
1625 b'listkeys': (),
1625 b'listkeys': (),
1626 b'pushkey': (),
1626 b'pushkey': (),
1627 b'digests': tuple(sorted(util.DIGESTS.keys())),
1627 b'digests': tuple(sorted(util.DIGESTS.keys())),
1628 b'remote-changegroup': (b'http', b'https'),
1628 b'remote-changegroup': (b'http', b'https'),
1629 b'hgtagsfnodes': (),
1629 b'hgtagsfnodes': (),
1630 b'phases': (b'heads',),
1630 b'phases': (b'heads',),
1631 b'stream': (b'v2',),
1631 b'stream': (b'v2',),
1632 }
1632 }
1633
1633
1634
1634
1635 def getrepocaps(repo, allowpushback=False, role=None):
1635 def getrepocaps(repo, allowpushback=False, role=None):
1636 """return the bundle2 capabilities for a given repo
1636 """return the bundle2 capabilities for a given repo
1637
1637
1638 Exists to allow extensions (like evolution) to mutate the capabilities.
1638 Exists to allow extensions (like evolution) to mutate the capabilities.
1639
1639
1640 The returned value is used for servers advertising their capabilities as
1640 The returned value is used for servers advertising their capabilities as
1641 well as clients advertising their capabilities to servers as part of
1641 well as clients advertising their capabilities to servers as part of
1642 bundle2 requests. The ``role`` argument specifies which is which.
1642 bundle2 requests. The ``role`` argument specifies which is which.
1643 """
1643 """
1644 if role not in (b'client', b'server'):
1644 if role not in (b'client', b'server'):
1645 raise error.ProgrammingError(b'role argument must be client or server')
1645 raise error.ProgrammingError(b'role argument must be client or server')
1646
1646
1647 caps = capabilities.copy()
1647 caps = capabilities.copy()
1648 caps[b'changegroup'] = tuple(
1648 caps[b'changegroup'] = tuple(
1649 sorted(changegroup.supportedincomingversions(repo))
1649 sorted(changegroup.supportedincomingversions(repo))
1650 )
1650 )
1651 if obsolete.isenabled(repo, obsolete.exchangeopt):
1651 if obsolete.isenabled(repo, obsolete.exchangeopt):
1652 supportedformat = tuple(b'V%i' % v for v in obsolete.formats)
1652 supportedformat = tuple(b'V%i' % v for v in obsolete.formats)
1653 caps[b'obsmarkers'] = supportedformat
1653 caps[b'obsmarkers'] = supportedformat
1654 if allowpushback:
1654 if allowpushback:
1655 caps[b'pushback'] = ()
1655 caps[b'pushback'] = ()
1656 cpmode = repo.ui.config(b'server', b'concurrent-push-mode')
1656 cpmode = repo.ui.config(b'server', b'concurrent-push-mode')
1657 if cpmode == b'check-related':
1657 if cpmode == b'check-related':
1658 caps[b'checkheads'] = (b'related',)
1658 caps[b'checkheads'] = (b'related',)
1659 if b'phases' in repo.ui.configlist(b'devel', b'legacy.exchange'):
1659 if b'phases' in repo.ui.configlist(b'devel', b'legacy.exchange'):
1660 caps.pop(b'phases')
1660 caps.pop(b'phases')
1661
1661
1662 # Don't advertise stream clone support in server mode if not configured.
1662 # Don't advertise stream clone support in server mode if not configured.
1663 if role == b'server':
1663 if role == b'server':
1664 streamsupported = repo.ui.configbool(
1664 streamsupported = repo.ui.configbool(
1665 b'server', b'uncompressed', untrusted=True
1665 b'server', b'uncompressed', untrusted=True
1666 )
1666 )
1667 featuresupported = repo.ui.configbool(b'server', b'bundle2.stream')
1667 featuresupported = repo.ui.configbool(b'server', b'bundle2.stream')
1668
1668
1669 if not streamsupported or not featuresupported:
1669 if not streamsupported or not featuresupported:
1670 caps.pop(b'stream')
1670 caps.pop(b'stream')
1671 # Else always advertise support on client, because payload support
1671 # Else always advertise support on client, because payload support
1672 # should always be advertised.
1672 # should always be advertised.
1673
1673
1674 # b'rev-branch-cache is no longer advertised, but still supported
1674 # b'rev-branch-cache is no longer advertised, but still supported
1675 # for legacy clients.
1675 # for legacy clients.
1676
1676
1677 return caps
1677 return caps
1678
1678
1679
1679
1680 def bundle2caps(remote):
1680 def bundle2caps(remote):
1681 """return the bundle capabilities of a peer as dict"""
1681 """return the bundle capabilities of a peer as dict"""
1682 raw = remote.capable(b'bundle2')
1682 raw = remote.capable(b'bundle2')
1683 if not raw and raw != b'':
1683 if not raw and raw != b'':
1684 return {}
1684 return {}
1685 capsblob = urlreq.unquote(remote.capable(b'bundle2'))
1685 capsblob = urlreq.unquote(remote.capable(b'bundle2'))
1686 return decodecaps(capsblob)
1686 return decodecaps(capsblob)
1687
1687
1688
1688
1689 def obsmarkersversion(caps):
1689 def obsmarkersversion(caps):
1690 """extract the list of supported obsmarkers versions from a bundle2caps dict"""
1690 """extract the list of supported obsmarkers versions from a bundle2caps dict"""
1691 obscaps = caps.get(b'obsmarkers', ())
1691 obscaps = caps.get(b'obsmarkers', ())
1692 return [int(c[1:]) for c in obscaps if c.startswith(b'V')]
1692 return [int(c[1:]) for c in obscaps if c.startswith(b'V')]
1693
1693
1694
1694
1695 def writenewbundle(
1695 def writenewbundle(
1696 ui,
1696 ui,
1697 repo,
1697 repo,
1698 source,
1698 source,
1699 filename,
1699 filename,
1700 bundletype,
1700 bundletype,
1701 outgoing,
1701 outgoing,
1702 opts,
1702 opts,
1703 vfs=None,
1703 vfs=None,
1704 compression=None,
1704 compression=None,
1705 compopts=None,
1705 compopts=None,
1706 allow_internal=False,
1706 ):
1707 ):
1707 if bundletype.startswith(b'HG10'):
1708 if bundletype.startswith(b'HG10'):
1708 cg = changegroup.makechangegroup(repo, outgoing, b'01', source)
1709 cg = changegroup.makechangegroup(repo, outgoing, b'01', source)
1709 return writebundle(
1710 return writebundle(
1710 ui,
1711 ui,
1711 cg,
1712 cg,
1712 filename,
1713 filename,
1713 bundletype,
1714 bundletype,
1714 vfs=vfs,
1715 vfs=vfs,
1715 compression=compression,
1716 compression=compression,
1716 compopts=compopts,
1717 compopts=compopts,
1717 )
1718 )
1718 elif not bundletype.startswith(b'HG20'):
1719 elif not bundletype.startswith(b'HG20'):
1719 raise error.ProgrammingError(b'unknown bundle type: %s' % bundletype)
1720 raise error.ProgrammingError(b'unknown bundle type: %s' % bundletype)
1720
1721
1722 # enforce that no internal phase are to be bundled
1723 bundled_internal = repo.revs(b"%ln and _internal()", outgoing.ancestorsof)
1724 if bundled_internal and not allow_internal:
1725 count = len(repo.revs(b'%ln and _internal()', outgoing.missing))
1726 msg = "backup bundle would contains %d internal changesets"
1727 msg %= count
1728 raise error.ProgrammingError(msg)
1729
1721 caps = {}
1730 caps = {}
1722 if opts.get(b'obsolescence', False):
1731 if opts.get(b'obsolescence', False):
1723 caps[b'obsmarkers'] = (b'V1',)
1732 caps[b'obsmarkers'] = (b'V1',)
1724 bundle = bundle20(ui, caps)
1733 bundle = bundle20(ui, caps)
1725 bundle.setcompression(compression, compopts)
1734 bundle.setcompression(compression, compopts)
1726 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1735 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1727 chunkiter = bundle.getchunks()
1736 chunkiter = bundle.getchunks()
1728
1737
1729 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1738 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1730
1739
1731
1740
1732 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1741 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1733 # We should eventually reconcile this logic with the one behind
1742 # We should eventually reconcile this logic with the one behind
1734 # 'exchange.getbundle2partsgenerator'.
1743 # 'exchange.getbundle2partsgenerator'.
1735 #
1744 #
1736 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1745 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1737 # different right now. So we keep them separated for now for the sake of
1746 # different right now. So we keep them separated for now for the sake of
1738 # simplicity.
1747 # simplicity.
1739
1748
1740 # we might not always want a changegroup in such bundle, for example in
1749 # we might not always want a changegroup in such bundle, for example in
1741 # stream bundles
1750 # stream bundles
1742 if opts.get(b'changegroup', True):
1751 if opts.get(b'changegroup', True):
1743 cgversion = opts.get(b'cg.version')
1752 cgversion = opts.get(b'cg.version')
1744 if cgversion is None:
1753 if cgversion is None:
1745 cgversion = changegroup.safeversion(repo)
1754 cgversion = changegroup.safeversion(repo)
1746 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1755 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1747 part = bundler.newpart(b'changegroup', data=cg.getchunks())
1756 part = bundler.newpart(b'changegroup', data=cg.getchunks())
1748 part.addparam(b'version', cg.version)
1757 part.addparam(b'version', cg.version)
1749 if b'clcount' in cg.extras:
1758 if b'clcount' in cg.extras:
1750 part.addparam(
1759 part.addparam(
1751 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1760 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1752 )
1761 )
1753 if opts.get(b'phases'):
1762 if opts.get(b'phases'):
1754 target_phase = phases.draft
1763 target_phase = phases.draft
1755 for head in outgoing.ancestorsof:
1764 for head in outgoing.ancestorsof:
1756 target_phase = max(target_phase, repo[head].phase())
1765 target_phase = max(target_phase, repo[head].phase())
1757 if target_phase > phases.draft:
1766 if target_phase > phases.draft:
1758 part.addparam(
1767 part.addparam(
1759 b'targetphase',
1768 b'targetphase',
1760 b'%d' % target_phase,
1769 b'%d' % target_phase,
1761 mandatory=False,
1770 mandatory=False,
1762 )
1771 )
1763 if repository.REPO_FEATURE_SIDE_DATA in repo.features:
1772 if repository.REPO_FEATURE_SIDE_DATA in repo.features:
1764 part.addparam(b'exp-sidedata', b'1')
1773 part.addparam(b'exp-sidedata', b'1')
1765
1774
1766 if opts.get(b'streamv2', False):
1775 if opts.get(b'streamv2', False):
1767 addpartbundlestream2(bundler, repo, stream=True)
1776 addpartbundlestream2(bundler, repo, stream=True)
1768
1777
1769 if opts.get(b'tagsfnodescache', True):
1778 if opts.get(b'tagsfnodescache', True):
1770 addparttagsfnodescache(repo, bundler, outgoing)
1779 addparttagsfnodescache(repo, bundler, outgoing)
1771
1780
1772 if opts.get(b'revbranchcache', True):
1781 if opts.get(b'revbranchcache', True):
1773 addpartrevbranchcache(repo, bundler, outgoing)
1782 addpartrevbranchcache(repo, bundler, outgoing)
1774
1783
1775 if opts.get(b'obsolescence', False):
1784 if opts.get(b'obsolescence', False):
1776 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1785 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1777 buildobsmarkerspart(
1786 buildobsmarkerspart(
1778 bundler,
1787 bundler,
1779 obsmarkers,
1788 obsmarkers,
1780 mandatory=opts.get(b'obsolescence-mandatory', True),
1789 mandatory=opts.get(b'obsolescence-mandatory', True),
1781 )
1790 )
1782
1791
1783 if opts.get(b'phases', False):
1792 if opts.get(b'phases', False):
1784 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1793 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1785 phasedata = phases.binaryencode(headsbyphase)
1794 phasedata = phases.binaryencode(headsbyphase)
1786 bundler.newpart(b'phase-heads', data=phasedata)
1795 bundler.newpart(b'phase-heads', data=phasedata)
1787
1796
1788
1797
1789 def addparttagsfnodescache(repo, bundler, outgoing):
1798 def addparttagsfnodescache(repo, bundler, outgoing):
1790 # we include the tags fnode cache for the bundle changeset
1799 # we include the tags fnode cache for the bundle changeset
1791 # (as an optional parts)
1800 # (as an optional parts)
1792 cache = tags.hgtagsfnodescache(repo.unfiltered())
1801 cache = tags.hgtagsfnodescache(repo.unfiltered())
1793 chunks = []
1802 chunks = []
1794
1803
1795 # .hgtags fnodes are only relevant for head changesets. While we could
1804 # .hgtags fnodes are only relevant for head changesets. While we could
1796 # transfer values for all known nodes, there will likely be little to
1805 # transfer values for all known nodes, there will likely be little to
1797 # no benefit.
1806 # no benefit.
1798 #
1807 #
1799 # We don't bother using a generator to produce output data because
1808 # We don't bother using a generator to produce output data because
1800 # a) we only have 40 bytes per head and even esoteric numbers of heads
1809 # a) we only have 40 bytes per head and even esoteric numbers of heads
1801 # consume little memory (1M heads is 40MB) b) we don't want to send the
1810 # consume little memory (1M heads is 40MB) b) we don't want to send the
1802 # part if we don't have entries and knowing if we have entries requires
1811 # part if we don't have entries and knowing if we have entries requires
1803 # cache lookups.
1812 # cache lookups.
1804 for node in outgoing.ancestorsof:
1813 for node in outgoing.ancestorsof:
1805 # Don't compute missing, as this may slow down serving.
1814 # Don't compute missing, as this may slow down serving.
1806 fnode = cache.getfnode(node, computemissing=False)
1815 fnode = cache.getfnode(node, computemissing=False)
1807 if fnode:
1816 if fnode:
1808 chunks.extend([node, fnode])
1817 chunks.extend([node, fnode])
1809
1818
1810 if chunks:
1819 if chunks:
1811 bundler.newpart(b'hgtagsfnodes', data=b''.join(chunks))
1820 bundler.newpart(b'hgtagsfnodes', data=b''.join(chunks))
1812
1821
1813
1822
1814 def addpartrevbranchcache(repo, bundler, outgoing):
1823 def addpartrevbranchcache(repo, bundler, outgoing):
1815 # we include the rev branch cache for the bundle changeset
1824 # we include the rev branch cache for the bundle changeset
1816 # (as an optional parts)
1825 # (as an optional parts)
1817 cache = repo.revbranchcache()
1826 cache = repo.revbranchcache()
1818 cl = repo.unfiltered().changelog
1827 cl = repo.unfiltered().changelog
1819 branchesdata = collections.defaultdict(lambda: (set(), set()))
1828 branchesdata = collections.defaultdict(lambda: (set(), set()))
1820 for node in outgoing.missing:
1829 for node in outgoing.missing:
1821 branch, close = cache.branchinfo(cl.rev(node))
1830 branch, close = cache.branchinfo(cl.rev(node))
1822 branchesdata[branch][close].add(node)
1831 branchesdata[branch][close].add(node)
1823
1832
1824 def generate():
1833 def generate():
1825 for branch, (nodes, closed) in sorted(branchesdata.items()):
1834 for branch, (nodes, closed) in sorted(branchesdata.items()):
1826 utf8branch = encoding.fromlocal(branch)
1835 utf8branch = encoding.fromlocal(branch)
1827 yield rbcstruct.pack(len(utf8branch), len(nodes), len(closed))
1836 yield rbcstruct.pack(len(utf8branch), len(nodes), len(closed))
1828 yield utf8branch
1837 yield utf8branch
1829 for n in sorted(nodes):
1838 for n in sorted(nodes):
1830 yield n
1839 yield n
1831 for n in sorted(closed):
1840 for n in sorted(closed):
1832 yield n
1841 yield n
1833
1842
1834 bundler.newpart(b'cache:rev-branch-cache', data=generate(), mandatory=False)
1843 bundler.newpart(b'cache:rev-branch-cache', data=generate(), mandatory=False)
1835
1844
1836
1845
1837 def _formatrequirementsspec(requirements):
1846 def _formatrequirementsspec(requirements):
1838 requirements = [req for req in requirements if req != b"shared"]
1847 requirements = [req for req in requirements if req != b"shared"]
1839 return urlreq.quote(b','.join(sorted(requirements)))
1848 return urlreq.quote(b','.join(sorted(requirements)))
1840
1849
1841
1850
1842 def _formatrequirementsparams(requirements):
1851 def _formatrequirementsparams(requirements):
1843 requirements = _formatrequirementsspec(requirements)
1852 requirements = _formatrequirementsspec(requirements)
1844 params = b"%s%s" % (urlreq.quote(b"requirements="), requirements)
1853 params = b"%s%s" % (urlreq.quote(b"requirements="), requirements)
1845 return params
1854 return params
1846
1855
1847
1856
1848 def format_remote_wanted_sidedata(repo):
1857 def format_remote_wanted_sidedata(repo):
1849 """Formats a repo's wanted sidedata categories into a bytestring for
1858 """Formats a repo's wanted sidedata categories into a bytestring for
1850 capabilities exchange."""
1859 capabilities exchange."""
1851 wanted = b""
1860 wanted = b""
1852 if repo._wanted_sidedata:
1861 if repo._wanted_sidedata:
1853 wanted = b','.join(
1862 wanted = b','.join(
1854 pycompat.bytestr(c) for c in sorted(repo._wanted_sidedata)
1863 pycompat.bytestr(c) for c in sorted(repo._wanted_sidedata)
1855 )
1864 )
1856 return wanted
1865 return wanted
1857
1866
1858
1867
1859 def read_remote_wanted_sidedata(remote):
1868 def read_remote_wanted_sidedata(remote):
1860 sidedata_categories = remote.capable(b'exp-wanted-sidedata')
1869 sidedata_categories = remote.capable(b'exp-wanted-sidedata')
1861 return read_wanted_sidedata(sidedata_categories)
1870 return read_wanted_sidedata(sidedata_categories)
1862
1871
1863
1872
1864 def read_wanted_sidedata(formatted):
1873 def read_wanted_sidedata(formatted):
1865 if formatted:
1874 if formatted:
1866 return set(formatted.split(b','))
1875 return set(formatted.split(b','))
1867 return set()
1876 return set()
1868
1877
1869
1878
1870 def addpartbundlestream2(bundler, repo, **kwargs):
1879 def addpartbundlestream2(bundler, repo, **kwargs):
1871 if not kwargs.get('stream', False):
1880 if not kwargs.get('stream', False):
1872 return
1881 return
1873
1882
1874 if not streamclone.allowservergeneration(repo):
1883 if not streamclone.allowservergeneration(repo):
1875 raise error.Abort(
1884 raise error.Abort(
1876 _(
1885 _(
1877 b'stream data requested but server does not allow '
1886 b'stream data requested but server does not allow '
1878 b'this feature'
1887 b'this feature'
1879 ),
1888 ),
1880 hint=_(
1889 hint=_(
1881 b'well-behaved clients should not be '
1890 b'well-behaved clients should not be '
1882 b'requesting stream data from servers not '
1891 b'requesting stream data from servers not '
1883 b'advertising it; the client may be buggy'
1892 b'advertising it; the client may be buggy'
1884 ),
1893 ),
1885 )
1894 )
1886
1895
1887 # Stream clones don't compress well. And compression undermines a
1896 # Stream clones don't compress well. And compression undermines a
1888 # goal of stream clones, which is to be fast. Communicate the desire
1897 # goal of stream clones, which is to be fast. Communicate the desire
1889 # to avoid compression to consumers of the bundle.
1898 # to avoid compression to consumers of the bundle.
1890 bundler.prefercompressed = False
1899 bundler.prefercompressed = False
1891
1900
1892 # get the includes and excludes
1901 # get the includes and excludes
1893 includepats = kwargs.get('includepats')
1902 includepats = kwargs.get('includepats')
1894 excludepats = kwargs.get('excludepats')
1903 excludepats = kwargs.get('excludepats')
1895
1904
1896 narrowstream = repo.ui.configbool(
1905 narrowstream = repo.ui.configbool(
1897 b'experimental', b'server.stream-narrow-clones'
1906 b'experimental', b'server.stream-narrow-clones'
1898 )
1907 )
1899
1908
1900 if (includepats or excludepats) and not narrowstream:
1909 if (includepats or excludepats) and not narrowstream:
1901 raise error.Abort(_(b'server does not support narrow stream clones'))
1910 raise error.Abort(_(b'server does not support narrow stream clones'))
1902
1911
1903 includeobsmarkers = False
1912 includeobsmarkers = False
1904 if repo.obsstore:
1913 if repo.obsstore:
1905 remoteversions = obsmarkersversion(bundler.capabilities)
1914 remoteversions = obsmarkersversion(bundler.capabilities)
1906 if not remoteversions:
1915 if not remoteversions:
1907 raise error.Abort(
1916 raise error.Abort(
1908 _(
1917 _(
1909 b'server has obsolescence markers, but client '
1918 b'server has obsolescence markers, but client '
1910 b'cannot receive them via stream clone'
1919 b'cannot receive them via stream clone'
1911 )
1920 )
1912 )
1921 )
1913 elif repo.obsstore._version in remoteversions:
1922 elif repo.obsstore._version in remoteversions:
1914 includeobsmarkers = True
1923 includeobsmarkers = True
1915
1924
1916 filecount, bytecount, it = streamclone.generatev2(
1925 filecount, bytecount, it = streamclone.generatev2(
1917 repo, includepats, excludepats, includeobsmarkers
1926 repo, includepats, excludepats, includeobsmarkers
1918 )
1927 )
1919 requirements = streamclone.streamed_requirements(repo)
1928 requirements = streamclone.streamed_requirements(repo)
1920 requirements = _formatrequirementsspec(requirements)
1929 requirements = _formatrequirementsspec(requirements)
1921 part = bundler.newpart(b'stream2', data=it)
1930 part = bundler.newpart(b'stream2', data=it)
1922 part.addparam(b'bytecount', b'%d' % bytecount, mandatory=True)
1931 part.addparam(b'bytecount', b'%d' % bytecount, mandatory=True)
1923 part.addparam(b'filecount', b'%d' % filecount, mandatory=True)
1932 part.addparam(b'filecount', b'%d' % filecount, mandatory=True)
1924 part.addparam(b'requirements', requirements, mandatory=True)
1933 part.addparam(b'requirements', requirements, mandatory=True)
1925
1934
1926
1935
1927 def buildobsmarkerspart(bundler, markers, mandatory=True):
1936 def buildobsmarkerspart(bundler, markers, mandatory=True):
1928 """add an obsmarker part to the bundler with <markers>
1937 """add an obsmarker part to the bundler with <markers>
1929
1938
1930 No part is created if markers is empty.
1939 No part is created if markers is empty.
1931 Raises ValueError if the bundler doesn't support any known obsmarker format.
1940 Raises ValueError if the bundler doesn't support any known obsmarker format.
1932 """
1941 """
1933 if not markers:
1942 if not markers:
1934 return None
1943 return None
1935
1944
1936 remoteversions = obsmarkersversion(bundler.capabilities)
1945 remoteversions = obsmarkersversion(bundler.capabilities)
1937 version = obsolete.commonversion(remoteversions)
1946 version = obsolete.commonversion(remoteversions)
1938 if version is None:
1947 if version is None:
1939 raise ValueError(b'bundler does not support common obsmarker format')
1948 raise ValueError(b'bundler does not support common obsmarker format')
1940 stream = obsolete.encodemarkers(markers, True, version=version)
1949 stream = obsolete.encodemarkers(markers, True, version=version)
1941 return bundler.newpart(b'obsmarkers', data=stream, mandatory=mandatory)
1950 return bundler.newpart(b'obsmarkers', data=stream, mandatory=mandatory)
1942
1951
1943
1952
1944 def writebundle(
1953 def writebundle(
1945 ui, cg, filename, bundletype, vfs=None, compression=None, compopts=None
1954 ui, cg, filename, bundletype, vfs=None, compression=None, compopts=None
1946 ):
1955 ):
1947 """Write a bundle file and return its filename.
1956 """Write a bundle file and return its filename.
1948
1957
1949 Existing files will not be overwritten.
1958 Existing files will not be overwritten.
1950 If no filename is specified, a temporary file is created.
1959 If no filename is specified, a temporary file is created.
1951 bz2 compression can be turned off.
1960 bz2 compression can be turned off.
1952 The bundle file will be deleted in case of errors.
1961 The bundle file will be deleted in case of errors.
1953 """
1962 """
1954
1963
1955 if bundletype == b"HG20":
1964 if bundletype == b"HG20":
1956 bundle = bundle20(ui)
1965 bundle = bundle20(ui)
1957 bundle.setcompression(compression, compopts)
1966 bundle.setcompression(compression, compopts)
1958 part = bundle.newpart(b'changegroup', data=cg.getchunks())
1967 part = bundle.newpart(b'changegroup', data=cg.getchunks())
1959 part.addparam(b'version', cg.version)
1968 part.addparam(b'version', cg.version)
1960 if b'clcount' in cg.extras:
1969 if b'clcount' in cg.extras:
1961 part.addparam(
1970 part.addparam(
1962 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1971 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1963 )
1972 )
1964 chunkiter = bundle.getchunks()
1973 chunkiter = bundle.getchunks()
1965 else:
1974 else:
1966 # compression argument is only for the bundle2 case
1975 # compression argument is only for the bundle2 case
1967 assert compression is None
1976 assert compression is None
1968 if cg.version != b'01':
1977 if cg.version != b'01':
1969 raise error.Abort(
1978 raise error.Abort(
1970 _(b'old bundle types only supports v1 changegroups')
1979 _(b'old bundle types only supports v1 changegroups')
1971 )
1980 )
1972
1981
1973 # HG20 is the case without 2 values to unpack, but is handled above.
1982 # HG20 is the case without 2 values to unpack, but is handled above.
1974 # pytype: disable=bad-unpacking
1983 # pytype: disable=bad-unpacking
1975 header, comp = bundletypes[bundletype]
1984 header, comp = bundletypes[bundletype]
1976 # pytype: enable=bad-unpacking
1985 # pytype: enable=bad-unpacking
1977
1986
1978 if comp not in util.compengines.supportedbundletypes:
1987 if comp not in util.compengines.supportedbundletypes:
1979 raise error.Abort(_(b'unknown stream compression type: %s') % comp)
1988 raise error.Abort(_(b'unknown stream compression type: %s') % comp)
1980 compengine = util.compengines.forbundletype(comp)
1989 compengine = util.compengines.forbundletype(comp)
1981
1990
1982 def chunkiter():
1991 def chunkiter():
1983 yield header
1992 yield header
1984 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1993 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1985 yield chunk
1994 yield chunk
1986
1995
1987 chunkiter = chunkiter()
1996 chunkiter = chunkiter()
1988
1997
1989 # parse the changegroup data, otherwise we will block
1998 # parse the changegroup data, otherwise we will block
1990 # in case of sshrepo because we don't know the end of the stream
1999 # in case of sshrepo because we don't know the end of the stream
1991 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
2000 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1992
2001
1993
2002
1994 def combinechangegroupresults(op):
2003 def combinechangegroupresults(op):
1995 """logic to combine 0 or more addchangegroup results into one"""
2004 """logic to combine 0 or more addchangegroup results into one"""
1996 results = [r.get(b'return', 0) for r in op.records[b'changegroup']]
2005 results = [r.get(b'return', 0) for r in op.records[b'changegroup']]
1997 changedheads = 0
2006 changedheads = 0
1998 result = 1
2007 result = 1
1999 for ret in results:
2008 for ret in results:
2000 # If any changegroup result is 0, return 0
2009 # If any changegroup result is 0, return 0
2001 if ret == 0:
2010 if ret == 0:
2002 result = 0
2011 result = 0
2003 break
2012 break
2004 if ret < -1:
2013 if ret < -1:
2005 changedheads += ret + 1
2014 changedheads += ret + 1
2006 elif ret > 1:
2015 elif ret > 1:
2007 changedheads += ret - 1
2016 changedheads += ret - 1
2008 if changedheads > 0:
2017 if changedheads > 0:
2009 result = 1 + changedheads
2018 result = 1 + changedheads
2010 elif changedheads < 0:
2019 elif changedheads < 0:
2011 result = -1 + changedheads
2020 result = -1 + changedheads
2012 return result
2021 return result
2013
2022
2014
2023
2015 @parthandler(
2024 @parthandler(
2016 b'changegroup',
2025 b'changegroup',
2017 (
2026 (
2018 b'version',
2027 b'version',
2019 b'nbchanges',
2028 b'nbchanges',
2020 b'exp-sidedata',
2029 b'exp-sidedata',
2021 b'exp-wanted-sidedata',
2030 b'exp-wanted-sidedata',
2022 b'treemanifest',
2031 b'treemanifest',
2023 b'targetphase',
2032 b'targetphase',
2024 ),
2033 ),
2025 )
2034 )
2026 def handlechangegroup(op, inpart):
2035 def handlechangegroup(op, inpart):
2027 """apply a changegroup part on the repo"""
2036 """apply a changegroup part on the repo"""
2028 from . import localrepo
2037 from . import localrepo
2029
2038
2030 tr = op.gettransaction()
2039 tr = op.gettransaction()
2031 unpackerversion = inpart.params.get(b'version', b'01')
2040 unpackerversion = inpart.params.get(b'version', b'01')
2032 # We should raise an appropriate exception here
2041 # We should raise an appropriate exception here
2033 cg = changegroup.getunbundler(unpackerversion, inpart, None)
2042 cg = changegroup.getunbundler(unpackerversion, inpart, None)
2034 # the source and url passed here are overwritten by the one contained in
2043 # the source and url passed here are overwritten by the one contained in
2035 # the transaction.hookargs argument. So 'bundle2' is a placeholder
2044 # the transaction.hookargs argument. So 'bundle2' is a placeholder
2036 nbchangesets = None
2045 nbchangesets = None
2037 if b'nbchanges' in inpart.params:
2046 if b'nbchanges' in inpart.params:
2038 nbchangesets = int(inpart.params.get(b'nbchanges'))
2047 nbchangesets = int(inpart.params.get(b'nbchanges'))
2039 if b'treemanifest' in inpart.params and not scmutil.istreemanifest(op.repo):
2048 if b'treemanifest' in inpart.params and not scmutil.istreemanifest(op.repo):
2040 if len(op.repo.changelog) != 0:
2049 if len(op.repo.changelog) != 0:
2041 raise error.Abort(
2050 raise error.Abort(
2042 _(
2051 _(
2043 b"bundle contains tree manifests, but local repo is "
2052 b"bundle contains tree manifests, but local repo is "
2044 b"non-empty and does not use tree manifests"
2053 b"non-empty and does not use tree manifests"
2045 )
2054 )
2046 )
2055 )
2047 op.repo.requirements.add(requirements.TREEMANIFEST_REQUIREMENT)
2056 op.repo.requirements.add(requirements.TREEMANIFEST_REQUIREMENT)
2048 op.repo.svfs.options = localrepo.resolvestorevfsoptions(
2057 op.repo.svfs.options = localrepo.resolvestorevfsoptions(
2049 op.repo.ui, op.repo.requirements, op.repo.features
2058 op.repo.ui, op.repo.requirements, op.repo.features
2050 )
2059 )
2051 scmutil.writereporequirements(op.repo)
2060 scmutil.writereporequirements(op.repo)
2052
2061
2053 extrakwargs = {}
2062 extrakwargs = {}
2054 targetphase = inpart.params.get(b'targetphase')
2063 targetphase = inpart.params.get(b'targetphase')
2055 if targetphase is not None:
2064 if targetphase is not None:
2056 extrakwargs['targetphase'] = int(targetphase)
2065 extrakwargs['targetphase'] = int(targetphase)
2057
2066
2058 remote_sidedata = inpart.params.get(b'exp-wanted-sidedata')
2067 remote_sidedata = inpart.params.get(b'exp-wanted-sidedata')
2059 extrakwargs['sidedata_categories'] = read_wanted_sidedata(remote_sidedata)
2068 extrakwargs['sidedata_categories'] = read_wanted_sidedata(remote_sidedata)
2060
2069
2061 ret = _processchangegroup(
2070 ret = _processchangegroup(
2062 op,
2071 op,
2063 cg,
2072 cg,
2064 tr,
2073 tr,
2065 op.source,
2074 op.source,
2066 b'bundle2',
2075 b'bundle2',
2067 expectedtotal=nbchangesets,
2076 expectedtotal=nbchangesets,
2068 **extrakwargs
2077 **extrakwargs
2069 )
2078 )
2070 if op.reply is not None:
2079 if op.reply is not None:
2071 # This is definitely not the final form of this
2080 # This is definitely not the final form of this
2072 # return. But one need to start somewhere.
2081 # return. But one need to start somewhere.
2073 part = op.reply.newpart(b'reply:changegroup', mandatory=False)
2082 part = op.reply.newpart(b'reply:changegroup', mandatory=False)
2074 part.addparam(
2083 part.addparam(
2075 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2084 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2076 )
2085 )
2077 part.addparam(b'return', b'%i' % ret, mandatory=False)
2086 part.addparam(b'return', b'%i' % ret, mandatory=False)
2078 assert not inpart.read()
2087 assert not inpart.read()
2079
2088
2080
2089
2081 _remotechangegroupparams = tuple(
2090 _remotechangegroupparams = tuple(
2082 [b'url', b'size', b'digests']
2091 [b'url', b'size', b'digests']
2083 + [b'digest:%s' % k for k in util.DIGESTS.keys()]
2092 + [b'digest:%s' % k for k in util.DIGESTS.keys()]
2084 )
2093 )
2085
2094
2086
2095
2087 @parthandler(b'remote-changegroup', _remotechangegroupparams)
2096 @parthandler(b'remote-changegroup', _remotechangegroupparams)
2088 def handleremotechangegroup(op, inpart):
2097 def handleremotechangegroup(op, inpart):
2089 """apply a bundle10 on the repo, given an url and validation information
2098 """apply a bundle10 on the repo, given an url and validation information
2090
2099
2091 All the information about the remote bundle to import are given as
2100 All the information about the remote bundle to import are given as
2092 parameters. The parameters include:
2101 parameters. The parameters include:
2093 - url: the url to the bundle10.
2102 - url: the url to the bundle10.
2094 - size: the bundle10 file size. It is used to validate what was
2103 - size: the bundle10 file size. It is used to validate what was
2095 retrieved by the client matches the server knowledge about the bundle.
2104 retrieved by the client matches the server knowledge about the bundle.
2096 - digests: a space separated list of the digest types provided as
2105 - digests: a space separated list of the digest types provided as
2097 parameters.
2106 parameters.
2098 - digest:<digest-type>: the hexadecimal representation of the digest with
2107 - digest:<digest-type>: the hexadecimal representation of the digest with
2099 that name. Like the size, it is used to validate what was retrieved by
2108 that name. Like the size, it is used to validate what was retrieved by
2100 the client matches what the server knows about the bundle.
2109 the client matches what the server knows about the bundle.
2101
2110
2102 When multiple digest types are given, all of them are checked.
2111 When multiple digest types are given, all of them are checked.
2103 """
2112 """
2104 try:
2113 try:
2105 raw_url = inpart.params[b'url']
2114 raw_url = inpart.params[b'url']
2106 except KeyError:
2115 except KeyError:
2107 raise error.Abort(_(b'remote-changegroup: missing "%s" param') % b'url')
2116 raise error.Abort(_(b'remote-changegroup: missing "%s" param') % b'url')
2108 parsed_url = urlutil.url(raw_url)
2117 parsed_url = urlutil.url(raw_url)
2109 if parsed_url.scheme not in capabilities[b'remote-changegroup']:
2118 if parsed_url.scheme not in capabilities[b'remote-changegroup']:
2110 raise error.Abort(
2119 raise error.Abort(
2111 _(b'remote-changegroup does not support %s urls')
2120 _(b'remote-changegroup does not support %s urls')
2112 % parsed_url.scheme
2121 % parsed_url.scheme
2113 )
2122 )
2114
2123
2115 try:
2124 try:
2116 size = int(inpart.params[b'size'])
2125 size = int(inpart.params[b'size'])
2117 except ValueError:
2126 except ValueError:
2118 raise error.Abort(
2127 raise error.Abort(
2119 _(b'remote-changegroup: invalid value for param "%s"') % b'size'
2128 _(b'remote-changegroup: invalid value for param "%s"') % b'size'
2120 )
2129 )
2121 except KeyError:
2130 except KeyError:
2122 raise error.Abort(
2131 raise error.Abort(
2123 _(b'remote-changegroup: missing "%s" param') % b'size'
2132 _(b'remote-changegroup: missing "%s" param') % b'size'
2124 )
2133 )
2125
2134
2126 digests = {}
2135 digests = {}
2127 for typ in inpart.params.get(b'digests', b'').split():
2136 for typ in inpart.params.get(b'digests', b'').split():
2128 param = b'digest:%s' % typ
2137 param = b'digest:%s' % typ
2129 try:
2138 try:
2130 value = inpart.params[param]
2139 value = inpart.params[param]
2131 except KeyError:
2140 except KeyError:
2132 raise error.Abort(
2141 raise error.Abort(
2133 _(b'remote-changegroup: missing "%s" param') % param
2142 _(b'remote-changegroup: missing "%s" param') % param
2134 )
2143 )
2135 digests[typ] = value
2144 digests[typ] = value
2136
2145
2137 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
2146 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
2138
2147
2139 tr = op.gettransaction()
2148 tr = op.gettransaction()
2140 from . import exchange
2149 from . import exchange
2141
2150
2142 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
2151 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
2143 if not isinstance(cg, changegroup.cg1unpacker):
2152 if not isinstance(cg, changegroup.cg1unpacker):
2144 raise error.Abort(
2153 raise error.Abort(
2145 _(b'%s: not a bundle version 1.0') % urlutil.hidepassword(raw_url)
2154 _(b'%s: not a bundle version 1.0') % urlutil.hidepassword(raw_url)
2146 )
2155 )
2147 ret = _processchangegroup(op, cg, tr, op.source, b'bundle2')
2156 ret = _processchangegroup(op, cg, tr, op.source, b'bundle2')
2148 if op.reply is not None:
2157 if op.reply is not None:
2149 # This is definitely not the final form of this
2158 # This is definitely not the final form of this
2150 # return. But one need to start somewhere.
2159 # return. But one need to start somewhere.
2151 part = op.reply.newpart(b'reply:changegroup')
2160 part = op.reply.newpart(b'reply:changegroup')
2152 part.addparam(
2161 part.addparam(
2153 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2162 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2154 )
2163 )
2155 part.addparam(b'return', b'%i' % ret, mandatory=False)
2164 part.addparam(b'return', b'%i' % ret, mandatory=False)
2156 try:
2165 try:
2157 real_part.validate()
2166 real_part.validate()
2158 except error.Abort as e:
2167 except error.Abort as e:
2159 raise error.Abort(
2168 raise error.Abort(
2160 _(b'bundle at %s is corrupted:\n%s')
2169 _(b'bundle at %s is corrupted:\n%s')
2161 % (urlutil.hidepassword(raw_url), e.message)
2170 % (urlutil.hidepassword(raw_url), e.message)
2162 )
2171 )
2163 assert not inpart.read()
2172 assert not inpart.read()
2164
2173
2165
2174
2166 @parthandler(b'reply:changegroup', (b'return', b'in-reply-to'))
2175 @parthandler(b'reply:changegroup', (b'return', b'in-reply-to'))
2167 def handlereplychangegroup(op, inpart):
2176 def handlereplychangegroup(op, inpart):
2168 ret = int(inpart.params[b'return'])
2177 ret = int(inpart.params[b'return'])
2169 replyto = int(inpart.params[b'in-reply-to'])
2178 replyto = int(inpart.params[b'in-reply-to'])
2170 op.records.add(b'changegroup', {b'return': ret}, replyto)
2179 op.records.add(b'changegroup', {b'return': ret}, replyto)
2171
2180
2172
2181
2173 @parthandler(b'check:bookmarks')
2182 @parthandler(b'check:bookmarks')
2174 def handlecheckbookmarks(op, inpart):
2183 def handlecheckbookmarks(op, inpart):
2175 """check location of bookmarks
2184 """check location of bookmarks
2176
2185
2177 This part is to be used to detect push race regarding bookmark, it
2186 This part is to be used to detect push race regarding bookmark, it
2178 contains binary encoded (bookmark, node) tuple. If the local state does
2187 contains binary encoded (bookmark, node) tuple. If the local state does
2179 not marks the one in the part, a PushRaced exception is raised
2188 not marks the one in the part, a PushRaced exception is raised
2180 """
2189 """
2181 bookdata = bookmarks.binarydecode(op.repo, inpart)
2190 bookdata = bookmarks.binarydecode(op.repo, inpart)
2182
2191
2183 msgstandard = (
2192 msgstandard = (
2184 b'remote repository changed while pushing - please try again '
2193 b'remote repository changed while pushing - please try again '
2185 b'(bookmark "%s" move from %s to %s)'
2194 b'(bookmark "%s" move from %s to %s)'
2186 )
2195 )
2187 msgmissing = (
2196 msgmissing = (
2188 b'remote repository changed while pushing - please try again '
2197 b'remote repository changed while pushing - please try again '
2189 b'(bookmark "%s" is missing, expected %s)'
2198 b'(bookmark "%s" is missing, expected %s)'
2190 )
2199 )
2191 msgexist = (
2200 msgexist = (
2192 b'remote repository changed while pushing - please try again '
2201 b'remote repository changed while pushing - please try again '
2193 b'(bookmark "%s" set on %s, expected missing)'
2202 b'(bookmark "%s" set on %s, expected missing)'
2194 )
2203 )
2195 for book, node in bookdata:
2204 for book, node in bookdata:
2196 currentnode = op.repo._bookmarks.get(book)
2205 currentnode = op.repo._bookmarks.get(book)
2197 if currentnode != node:
2206 if currentnode != node:
2198 if node is None:
2207 if node is None:
2199 finalmsg = msgexist % (book, short(currentnode))
2208 finalmsg = msgexist % (book, short(currentnode))
2200 elif currentnode is None:
2209 elif currentnode is None:
2201 finalmsg = msgmissing % (book, short(node))
2210 finalmsg = msgmissing % (book, short(node))
2202 else:
2211 else:
2203 finalmsg = msgstandard % (
2212 finalmsg = msgstandard % (
2204 book,
2213 book,
2205 short(node),
2214 short(node),
2206 short(currentnode),
2215 short(currentnode),
2207 )
2216 )
2208 raise error.PushRaced(finalmsg)
2217 raise error.PushRaced(finalmsg)
2209
2218
2210
2219
2211 @parthandler(b'check:heads')
2220 @parthandler(b'check:heads')
2212 def handlecheckheads(op, inpart):
2221 def handlecheckheads(op, inpart):
2213 """check that head of the repo did not change
2222 """check that head of the repo did not change
2214
2223
2215 This is used to detect a push race when using unbundle.
2224 This is used to detect a push race when using unbundle.
2216 This replaces the "heads" argument of unbundle."""
2225 This replaces the "heads" argument of unbundle."""
2217 h = inpart.read(20)
2226 h = inpart.read(20)
2218 heads = []
2227 heads = []
2219 while len(h) == 20:
2228 while len(h) == 20:
2220 heads.append(h)
2229 heads.append(h)
2221 h = inpart.read(20)
2230 h = inpart.read(20)
2222 assert not h
2231 assert not h
2223 # Trigger a transaction so that we are guaranteed to have the lock now.
2232 # Trigger a transaction so that we are guaranteed to have the lock now.
2224 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2233 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2225 op.gettransaction()
2234 op.gettransaction()
2226 if sorted(heads) != sorted(op.repo.heads()):
2235 if sorted(heads) != sorted(op.repo.heads()):
2227 raise error.PushRaced(
2236 raise error.PushRaced(
2228 b'remote repository changed while pushing - please try again'
2237 b'remote repository changed while pushing - please try again'
2229 )
2238 )
2230
2239
2231
2240
2232 @parthandler(b'check:updated-heads')
2241 @parthandler(b'check:updated-heads')
2233 def handlecheckupdatedheads(op, inpart):
2242 def handlecheckupdatedheads(op, inpart):
2234 """check for race on the heads touched by a push
2243 """check for race on the heads touched by a push
2235
2244
2236 This is similar to 'check:heads' but focus on the heads actually updated
2245 This is similar to 'check:heads' but focus on the heads actually updated
2237 during the push. If other activities happen on unrelated heads, it is
2246 during the push. If other activities happen on unrelated heads, it is
2238 ignored.
2247 ignored.
2239
2248
2240 This allow server with high traffic to avoid push contention as long as
2249 This allow server with high traffic to avoid push contention as long as
2241 unrelated parts of the graph are involved."""
2250 unrelated parts of the graph are involved."""
2242 h = inpart.read(20)
2251 h = inpart.read(20)
2243 heads = []
2252 heads = []
2244 while len(h) == 20:
2253 while len(h) == 20:
2245 heads.append(h)
2254 heads.append(h)
2246 h = inpart.read(20)
2255 h = inpart.read(20)
2247 assert not h
2256 assert not h
2248 # trigger a transaction so that we are guaranteed to have the lock now.
2257 # trigger a transaction so that we are guaranteed to have the lock now.
2249 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2258 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2250 op.gettransaction()
2259 op.gettransaction()
2251
2260
2252 currentheads = set()
2261 currentheads = set()
2253 for ls in op.repo.branchmap().iterheads():
2262 for ls in op.repo.branchmap().iterheads():
2254 currentheads.update(ls)
2263 currentheads.update(ls)
2255
2264
2256 for h in heads:
2265 for h in heads:
2257 if h not in currentheads:
2266 if h not in currentheads:
2258 raise error.PushRaced(
2267 raise error.PushRaced(
2259 b'remote repository changed while pushing - '
2268 b'remote repository changed while pushing - '
2260 b'please try again'
2269 b'please try again'
2261 )
2270 )
2262
2271
2263
2272
2264 @parthandler(b'check:phases')
2273 @parthandler(b'check:phases')
2265 def handlecheckphases(op, inpart):
2274 def handlecheckphases(op, inpart):
2266 """check that phase boundaries of the repository did not change
2275 """check that phase boundaries of the repository did not change
2267
2276
2268 This is used to detect a push race.
2277 This is used to detect a push race.
2269 """
2278 """
2270 phasetonodes = phases.binarydecode(inpart)
2279 phasetonodes = phases.binarydecode(inpart)
2271 unfi = op.repo.unfiltered()
2280 unfi = op.repo.unfiltered()
2272 cl = unfi.changelog
2281 cl = unfi.changelog
2273 phasecache = unfi._phasecache
2282 phasecache = unfi._phasecache
2274 msg = (
2283 msg = (
2275 b'remote repository changed while pushing - please try again '
2284 b'remote repository changed while pushing - please try again '
2276 b'(%s is %s expected %s)'
2285 b'(%s is %s expected %s)'
2277 )
2286 )
2278 for expectedphase, nodes in phasetonodes.items():
2287 for expectedphase, nodes in phasetonodes.items():
2279 for n in nodes:
2288 for n in nodes:
2280 actualphase = phasecache.phase(unfi, cl.rev(n))
2289 actualphase = phasecache.phase(unfi, cl.rev(n))
2281 if actualphase != expectedphase:
2290 if actualphase != expectedphase:
2282 finalmsg = msg % (
2291 finalmsg = msg % (
2283 short(n),
2292 short(n),
2284 phases.phasenames[actualphase],
2293 phases.phasenames[actualphase],
2285 phases.phasenames[expectedphase],
2294 phases.phasenames[expectedphase],
2286 )
2295 )
2287 raise error.PushRaced(finalmsg)
2296 raise error.PushRaced(finalmsg)
2288
2297
2289
2298
2290 @parthandler(b'output')
2299 @parthandler(b'output')
2291 def handleoutput(op, inpart):
2300 def handleoutput(op, inpart):
2292 """forward output captured on the server to the client"""
2301 """forward output captured on the server to the client"""
2293 for line in inpart.read().splitlines():
2302 for line in inpart.read().splitlines():
2294 op.ui.status(_(b'remote: %s\n') % line)
2303 op.ui.status(_(b'remote: %s\n') % line)
2295
2304
2296
2305
2297 @parthandler(b'replycaps')
2306 @parthandler(b'replycaps')
2298 def handlereplycaps(op, inpart):
2307 def handlereplycaps(op, inpart):
2299 """Notify that a reply bundle should be created
2308 """Notify that a reply bundle should be created
2300
2309
2301 The payload contains the capabilities information for the reply"""
2310 The payload contains the capabilities information for the reply"""
2302 caps = decodecaps(inpart.read())
2311 caps = decodecaps(inpart.read())
2303 if op.reply is None:
2312 if op.reply is None:
2304 op.reply = bundle20(op.ui, caps)
2313 op.reply = bundle20(op.ui, caps)
2305
2314
2306
2315
2307 class AbortFromPart(error.Abort):
2316 class AbortFromPart(error.Abort):
2308 """Sub-class of Abort that denotes an error from a bundle2 part."""
2317 """Sub-class of Abort that denotes an error from a bundle2 part."""
2309
2318
2310
2319
2311 @parthandler(b'error:abort', (b'message', b'hint'))
2320 @parthandler(b'error:abort', (b'message', b'hint'))
2312 def handleerrorabort(op, inpart):
2321 def handleerrorabort(op, inpart):
2313 """Used to transmit abort error over the wire"""
2322 """Used to transmit abort error over the wire"""
2314 raise AbortFromPart(
2323 raise AbortFromPart(
2315 inpart.params[b'message'], hint=inpart.params.get(b'hint')
2324 inpart.params[b'message'], hint=inpart.params.get(b'hint')
2316 )
2325 )
2317
2326
2318
2327
2319 @parthandler(
2328 @parthandler(
2320 b'error:pushkey',
2329 b'error:pushkey',
2321 (b'namespace', b'key', b'new', b'old', b'ret', b'in-reply-to'),
2330 (b'namespace', b'key', b'new', b'old', b'ret', b'in-reply-to'),
2322 )
2331 )
2323 def handleerrorpushkey(op, inpart):
2332 def handleerrorpushkey(op, inpart):
2324 """Used to transmit failure of a mandatory pushkey over the wire"""
2333 """Used to transmit failure of a mandatory pushkey over the wire"""
2325 kwargs = {}
2334 kwargs = {}
2326 for name in (b'namespace', b'key', b'new', b'old', b'ret'):
2335 for name in (b'namespace', b'key', b'new', b'old', b'ret'):
2327 value = inpart.params.get(name)
2336 value = inpart.params.get(name)
2328 if value is not None:
2337 if value is not None:
2329 kwargs[name] = value
2338 kwargs[name] = value
2330 raise error.PushkeyFailed(
2339 raise error.PushkeyFailed(
2331 inpart.params[b'in-reply-to'], **pycompat.strkwargs(kwargs)
2340 inpart.params[b'in-reply-to'], **pycompat.strkwargs(kwargs)
2332 )
2341 )
2333
2342
2334
2343
2335 @parthandler(b'error:unsupportedcontent', (b'parttype', b'params'))
2344 @parthandler(b'error:unsupportedcontent', (b'parttype', b'params'))
2336 def handleerrorunsupportedcontent(op, inpart):
2345 def handleerrorunsupportedcontent(op, inpart):
2337 """Used to transmit unknown content error over the wire"""
2346 """Used to transmit unknown content error over the wire"""
2338 kwargs = {}
2347 kwargs = {}
2339 parttype = inpart.params.get(b'parttype')
2348 parttype = inpart.params.get(b'parttype')
2340 if parttype is not None:
2349 if parttype is not None:
2341 kwargs[b'parttype'] = parttype
2350 kwargs[b'parttype'] = parttype
2342 params = inpart.params.get(b'params')
2351 params = inpart.params.get(b'params')
2343 if params is not None:
2352 if params is not None:
2344 kwargs[b'params'] = params.split(b'\0')
2353 kwargs[b'params'] = params.split(b'\0')
2345
2354
2346 raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
2355 raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
2347
2356
2348
2357
2349 @parthandler(b'error:pushraced', (b'message',))
2358 @parthandler(b'error:pushraced', (b'message',))
2350 def handleerrorpushraced(op, inpart):
2359 def handleerrorpushraced(op, inpart):
2351 """Used to transmit push race error over the wire"""
2360 """Used to transmit push race error over the wire"""
2352 raise error.ResponseError(_(b'push failed:'), inpart.params[b'message'])
2361 raise error.ResponseError(_(b'push failed:'), inpart.params[b'message'])
2353
2362
2354
2363
2355 @parthandler(b'listkeys', (b'namespace',))
2364 @parthandler(b'listkeys', (b'namespace',))
2356 def handlelistkeys(op, inpart):
2365 def handlelistkeys(op, inpart):
2357 """retrieve pushkey namespace content stored in a bundle2"""
2366 """retrieve pushkey namespace content stored in a bundle2"""
2358 namespace = inpart.params[b'namespace']
2367 namespace = inpart.params[b'namespace']
2359 r = pushkey.decodekeys(inpart.read())
2368 r = pushkey.decodekeys(inpart.read())
2360 op.records.add(b'listkeys', (namespace, r))
2369 op.records.add(b'listkeys', (namespace, r))
2361
2370
2362
2371
2363 @parthandler(b'pushkey', (b'namespace', b'key', b'old', b'new'))
2372 @parthandler(b'pushkey', (b'namespace', b'key', b'old', b'new'))
2364 def handlepushkey(op, inpart):
2373 def handlepushkey(op, inpart):
2365 """process a pushkey request"""
2374 """process a pushkey request"""
2366 dec = pushkey.decode
2375 dec = pushkey.decode
2367 namespace = dec(inpart.params[b'namespace'])
2376 namespace = dec(inpart.params[b'namespace'])
2368 key = dec(inpart.params[b'key'])
2377 key = dec(inpart.params[b'key'])
2369 old = dec(inpart.params[b'old'])
2378 old = dec(inpart.params[b'old'])
2370 new = dec(inpart.params[b'new'])
2379 new = dec(inpart.params[b'new'])
2371 # Grab the transaction to ensure that we have the lock before performing the
2380 # Grab the transaction to ensure that we have the lock before performing the
2372 # pushkey.
2381 # pushkey.
2373 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2382 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2374 op.gettransaction()
2383 op.gettransaction()
2375 ret = op.repo.pushkey(namespace, key, old, new)
2384 ret = op.repo.pushkey(namespace, key, old, new)
2376 record = {b'namespace': namespace, b'key': key, b'old': old, b'new': new}
2385 record = {b'namespace': namespace, b'key': key, b'old': old, b'new': new}
2377 op.records.add(b'pushkey', record)
2386 op.records.add(b'pushkey', record)
2378 if op.reply is not None:
2387 if op.reply is not None:
2379 rpart = op.reply.newpart(b'reply:pushkey')
2388 rpart = op.reply.newpart(b'reply:pushkey')
2380 rpart.addparam(
2389 rpart.addparam(
2381 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2390 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2382 )
2391 )
2383 rpart.addparam(b'return', b'%i' % ret, mandatory=False)
2392 rpart.addparam(b'return', b'%i' % ret, mandatory=False)
2384 if inpart.mandatory and not ret:
2393 if inpart.mandatory and not ret:
2385 kwargs = {}
2394 kwargs = {}
2386 for key in (b'namespace', b'key', b'new', b'old', b'ret'):
2395 for key in (b'namespace', b'key', b'new', b'old', b'ret'):
2387 if key in inpart.params:
2396 if key in inpart.params:
2388 kwargs[key] = inpart.params[key]
2397 kwargs[key] = inpart.params[key]
2389 raise error.PushkeyFailed(
2398 raise error.PushkeyFailed(
2390 partid=b'%d' % inpart.id, **pycompat.strkwargs(kwargs)
2399 partid=b'%d' % inpart.id, **pycompat.strkwargs(kwargs)
2391 )
2400 )
2392
2401
2393
2402
2394 @parthandler(b'bookmarks')
2403 @parthandler(b'bookmarks')
2395 def handlebookmark(op, inpart):
2404 def handlebookmark(op, inpart):
2396 """transmit bookmark information
2405 """transmit bookmark information
2397
2406
2398 The part contains binary encoded bookmark information.
2407 The part contains binary encoded bookmark information.
2399
2408
2400 The exact behavior of this part can be controlled by the 'bookmarks' mode
2409 The exact behavior of this part can be controlled by the 'bookmarks' mode
2401 on the bundle operation.
2410 on the bundle operation.
2402
2411
2403 When mode is 'apply' (the default) the bookmark information is applied as
2412 When mode is 'apply' (the default) the bookmark information is applied as
2404 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2413 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2405 issued earlier to check for push races in such update. This behavior is
2414 issued earlier to check for push races in such update. This behavior is
2406 suitable for pushing.
2415 suitable for pushing.
2407
2416
2408 When mode is 'records', the information is recorded into the 'bookmarks'
2417 When mode is 'records', the information is recorded into the 'bookmarks'
2409 records of the bundle operation. This behavior is suitable for pulling.
2418 records of the bundle operation. This behavior is suitable for pulling.
2410 """
2419 """
2411 changes = bookmarks.binarydecode(op.repo, inpart)
2420 changes = bookmarks.binarydecode(op.repo, inpart)
2412
2421
2413 pushkeycompat = op.repo.ui.configbool(
2422 pushkeycompat = op.repo.ui.configbool(
2414 b'server', b'bookmarks-pushkey-compat'
2423 b'server', b'bookmarks-pushkey-compat'
2415 )
2424 )
2416 bookmarksmode = op.modes.get(b'bookmarks', b'apply')
2425 bookmarksmode = op.modes.get(b'bookmarks', b'apply')
2417
2426
2418 if bookmarksmode == b'apply':
2427 if bookmarksmode == b'apply':
2419 tr = op.gettransaction()
2428 tr = op.gettransaction()
2420 bookstore = op.repo._bookmarks
2429 bookstore = op.repo._bookmarks
2421 if pushkeycompat:
2430 if pushkeycompat:
2422 allhooks = []
2431 allhooks = []
2423 for book, node in changes:
2432 for book, node in changes:
2424 hookargs = tr.hookargs.copy()
2433 hookargs = tr.hookargs.copy()
2425 hookargs[b'pushkeycompat'] = b'1'
2434 hookargs[b'pushkeycompat'] = b'1'
2426 hookargs[b'namespace'] = b'bookmarks'
2435 hookargs[b'namespace'] = b'bookmarks'
2427 hookargs[b'key'] = book
2436 hookargs[b'key'] = book
2428 hookargs[b'old'] = hex(bookstore.get(book, b''))
2437 hookargs[b'old'] = hex(bookstore.get(book, b''))
2429 hookargs[b'new'] = hex(node if node is not None else b'')
2438 hookargs[b'new'] = hex(node if node is not None else b'')
2430 allhooks.append(hookargs)
2439 allhooks.append(hookargs)
2431
2440
2432 for hookargs in allhooks:
2441 for hookargs in allhooks:
2433 op.repo.hook(
2442 op.repo.hook(
2434 b'prepushkey', throw=True, **pycompat.strkwargs(hookargs)
2443 b'prepushkey', throw=True, **pycompat.strkwargs(hookargs)
2435 )
2444 )
2436
2445
2437 for book, node in changes:
2446 for book, node in changes:
2438 if bookmarks.isdivergent(book):
2447 if bookmarks.isdivergent(book):
2439 msg = _(b'cannot accept divergent bookmark %s!') % book
2448 msg = _(b'cannot accept divergent bookmark %s!') % book
2440 raise error.Abort(msg)
2449 raise error.Abort(msg)
2441
2450
2442 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2451 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2443
2452
2444 if pushkeycompat:
2453 if pushkeycompat:
2445
2454
2446 def runhook(unused_success):
2455 def runhook(unused_success):
2447 for hookargs in allhooks:
2456 for hookargs in allhooks:
2448 op.repo.hook(b'pushkey', **pycompat.strkwargs(hookargs))
2457 op.repo.hook(b'pushkey', **pycompat.strkwargs(hookargs))
2449
2458
2450 op.repo._afterlock(runhook)
2459 op.repo._afterlock(runhook)
2451
2460
2452 elif bookmarksmode == b'records':
2461 elif bookmarksmode == b'records':
2453 for book, node in changes:
2462 for book, node in changes:
2454 record = {b'bookmark': book, b'node': node}
2463 record = {b'bookmark': book, b'node': node}
2455 op.records.add(b'bookmarks', record)
2464 op.records.add(b'bookmarks', record)
2456 else:
2465 else:
2457 raise error.ProgrammingError(
2466 raise error.ProgrammingError(
2458 b'unknown bookmark mode: %s' % bookmarksmode
2467 b'unknown bookmark mode: %s' % bookmarksmode
2459 )
2468 )
2460
2469
2461
2470
2462 @parthandler(b'phase-heads')
2471 @parthandler(b'phase-heads')
2463 def handlephases(op, inpart):
2472 def handlephases(op, inpart):
2464 """apply phases from bundle part to repo"""
2473 """apply phases from bundle part to repo"""
2465 headsbyphase = phases.binarydecode(inpart)
2474 headsbyphase = phases.binarydecode(inpart)
2466 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2475 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2467
2476
2468
2477
2469 @parthandler(b'reply:pushkey', (b'return', b'in-reply-to'))
2478 @parthandler(b'reply:pushkey', (b'return', b'in-reply-to'))
2470 def handlepushkeyreply(op, inpart):
2479 def handlepushkeyreply(op, inpart):
2471 """retrieve the result of a pushkey request"""
2480 """retrieve the result of a pushkey request"""
2472 ret = int(inpart.params[b'return'])
2481 ret = int(inpart.params[b'return'])
2473 partid = int(inpart.params[b'in-reply-to'])
2482 partid = int(inpart.params[b'in-reply-to'])
2474 op.records.add(b'pushkey', {b'return': ret}, partid)
2483 op.records.add(b'pushkey', {b'return': ret}, partid)
2475
2484
2476
2485
2477 @parthandler(b'obsmarkers')
2486 @parthandler(b'obsmarkers')
2478 def handleobsmarker(op, inpart):
2487 def handleobsmarker(op, inpart):
2479 """add a stream of obsmarkers to the repo"""
2488 """add a stream of obsmarkers to the repo"""
2480 tr = op.gettransaction()
2489 tr = op.gettransaction()
2481 markerdata = inpart.read()
2490 markerdata = inpart.read()
2482 if op.ui.config(b'experimental', b'obsmarkers-exchange-debug'):
2491 if op.ui.config(b'experimental', b'obsmarkers-exchange-debug'):
2483 op.ui.writenoi18n(
2492 op.ui.writenoi18n(
2484 b'obsmarker-exchange: %i bytes received\n' % len(markerdata)
2493 b'obsmarker-exchange: %i bytes received\n' % len(markerdata)
2485 )
2494 )
2486 # The mergemarkers call will crash if marker creation is not enabled.
2495 # The mergemarkers call will crash if marker creation is not enabled.
2487 # we want to avoid this if the part is advisory.
2496 # we want to avoid this if the part is advisory.
2488 if not inpart.mandatory and op.repo.obsstore.readonly:
2497 if not inpart.mandatory and op.repo.obsstore.readonly:
2489 op.repo.ui.debug(
2498 op.repo.ui.debug(
2490 b'ignoring obsolescence markers, feature not enabled\n'
2499 b'ignoring obsolescence markers, feature not enabled\n'
2491 )
2500 )
2492 return
2501 return
2493 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2502 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2494 op.repo.invalidatevolatilesets()
2503 op.repo.invalidatevolatilesets()
2495 op.records.add(b'obsmarkers', {b'new': new})
2504 op.records.add(b'obsmarkers', {b'new': new})
2496 if op.reply is not None:
2505 if op.reply is not None:
2497 rpart = op.reply.newpart(b'reply:obsmarkers')
2506 rpart = op.reply.newpart(b'reply:obsmarkers')
2498 rpart.addparam(
2507 rpart.addparam(
2499 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2508 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2500 )
2509 )
2501 rpart.addparam(b'new', b'%i' % new, mandatory=False)
2510 rpart.addparam(b'new', b'%i' % new, mandatory=False)
2502
2511
2503
2512
2504 @parthandler(b'reply:obsmarkers', (b'new', b'in-reply-to'))
2513 @parthandler(b'reply:obsmarkers', (b'new', b'in-reply-to'))
2505 def handleobsmarkerreply(op, inpart):
2514 def handleobsmarkerreply(op, inpart):
2506 """retrieve the result of a pushkey request"""
2515 """retrieve the result of a pushkey request"""
2507 ret = int(inpart.params[b'new'])
2516 ret = int(inpart.params[b'new'])
2508 partid = int(inpart.params[b'in-reply-to'])
2517 partid = int(inpart.params[b'in-reply-to'])
2509 op.records.add(b'obsmarkers', {b'new': ret}, partid)
2518 op.records.add(b'obsmarkers', {b'new': ret}, partid)
2510
2519
2511
2520
2512 @parthandler(b'hgtagsfnodes')
2521 @parthandler(b'hgtagsfnodes')
2513 def handlehgtagsfnodes(op, inpart):
2522 def handlehgtagsfnodes(op, inpart):
2514 """Applies .hgtags fnodes cache entries to the local repo.
2523 """Applies .hgtags fnodes cache entries to the local repo.
2515
2524
2516 Payload is pairs of 20 byte changeset nodes and filenodes.
2525 Payload is pairs of 20 byte changeset nodes and filenodes.
2517 """
2526 """
2518 # Grab the transaction so we ensure that we have the lock at this point.
2527 # Grab the transaction so we ensure that we have the lock at this point.
2519 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2528 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2520 op.gettransaction()
2529 op.gettransaction()
2521 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2530 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2522
2531
2523 count = 0
2532 count = 0
2524 while True:
2533 while True:
2525 node = inpart.read(20)
2534 node = inpart.read(20)
2526 fnode = inpart.read(20)
2535 fnode = inpart.read(20)
2527 if len(node) < 20 or len(fnode) < 20:
2536 if len(node) < 20 or len(fnode) < 20:
2528 op.ui.debug(b'ignoring incomplete received .hgtags fnodes data\n')
2537 op.ui.debug(b'ignoring incomplete received .hgtags fnodes data\n')
2529 break
2538 break
2530 cache.setfnode(node, fnode)
2539 cache.setfnode(node, fnode)
2531 count += 1
2540 count += 1
2532
2541
2533 cache.write()
2542 cache.write()
2534 op.ui.debug(b'applied %i hgtags fnodes cache entries\n' % count)
2543 op.ui.debug(b'applied %i hgtags fnodes cache entries\n' % count)
2535
2544
2536
2545
2537 rbcstruct = struct.Struct(b'>III')
2546 rbcstruct = struct.Struct(b'>III')
2538
2547
2539
2548
2540 @parthandler(b'cache:rev-branch-cache')
2549 @parthandler(b'cache:rev-branch-cache')
2541 def handlerbc(op, inpart):
2550 def handlerbc(op, inpart):
2542 """Legacy part, ignored for compatibility with bundles from or
2551 """Legacy part, ignored for compatibility with bundles from or
2543 for Mercurial before 5.7. Newer Mercurial computes the cache
2552 for Mercurial before 5.7. Newer Mercurial computes the cache
2544 efficiently enough during unbundling that the additional transfer
2553 efficiently enough during unbundling that the additional transfer
2545 is unnecessary."""
2554 is unnecessary."""
2546
2555
2547
2556
2548 @parthandler(b'pushvars')
2557 @parthandler(b'pushvars')
2549 def bundle2getvars(op, part):
2558 def bundle2getvars(op, part):
2550 '''unbundle a bundle2 containing shellvars on the server'''
2559 '''unbundle a bundle2 containing shellvars on the server'''
2551 # An option to disable unbundling on server-side for security reasons
2560 # An option to disable unbundling on server-side for security reasons
2552 if op.ui.configbool(b'push', b'pushvars.server'):
2561 if op.ui.configbool(b'push', b'pushvars.server'):
2553 hookargs = {}
2562 hookargs = {}
2554 for key, value in part.advisoryparams:
2563 for key, value in part.advisoryparams:
2555 key = key.upper()
2564 key = key.upper()
2556 # We want pushed variables to have USERVAR_ prepended so we know
2565 # We want pushed variables to have USERVAR_ prepended so we know
2557 # they came from the --pushvar flag.
2566 # they came from the --pushvar flag.
2558 key = b"USERVAR_" + key
2567 key = b"USERVAR_" + key
2559 hookargs[key] = value
2568 hookargs[key] = value
2560 op.addhookargs(hookargs)
2569 op.addhookargs(hookargs)
2561
2570
2562
2571
2563 @parthandler(b'stream2', (b'requirements', b'filecount', b'bytecount'))
2572 @parthandler(b'stream2', (b'requirements', b'filecount', b'bytecount'))
2564 def handlestreamv2bundle(op, part):
2573 def handlestreamv2bundle(op, part):
2565
2574
2566 requirements = urlreq.unquote(part.params[b'requirements'])
2575 requirements = urlreq.unquote(part.params[b'requirements'])
2567 requirements = requirements.split(b',') if requirements else []
2576 requirements = requirements.split(b',') if requirements else []
2568 filecount = int(part.params[b'filecount'])
2577 filecount = int(part.params[b'filecount'])
2569 bytecount = int(part.params[b'bytecount'])
2578 bytecount = int(part.params[b'bytecount'])
2570
2579
2571 repo = op.repo
2580 repo = op.repo
2572 if len(repo):
2581 if len(repo):
2573 msg = _(b'cannot apply stream clone to non empty repository')
2582 msg = _(b'cannot apply stream clone to non empty repository')
2574 raise error.Abort(msg)
2583 raise error.Abort(msg)
2575
2584
2576 repo.ui.debug(b'applying stream bundle\n')
2585 repo.ui.debug(b'applying stream bundle\n')
2577 streamclone.applybundlev2(repo, part, filecount, bytecount, requirements)
2586 streamclone.applybundlev2(repo, part, filecount, bytecount, requirements)
2578
2587
2579
2588
2580 def widen_bundle(
2589 def widen_bundle(
2581 bundler, repo, oldmatcher, newmatcher, common, known, cgversion, ellipses
2590 bundler, repo, oldmatcher, newmatcher, common, known, cgversion, ellipses
2582 ):
2591 ):
2583 """generates bundle2 for widening a narrow clone
2592 """generates bundle2 for widening a narrow clone
2584
2593
2585 bundler is the bundle to which data should be added
2594 bundler is the bundle to which data should be added
2586 repo is the localrepository instance
2595 repo is the localrepository instance
2587 oldmatcher matches what the client already has
2596 oldmatcher matches what the client already has
2588 newmatcher matches what the client needs (including what it already has)
2597 newmatcher matches what the client needs (including what it already has)
2589 common is set of common heads between server and client
2598 common is set of common heads between server and client
2590 known is a set of revs known on the client side (used in ellipses)
2599 known is a set of revs known on the client side (used in ellipses)
2591 cgversion is the changegroup version to send
2600 cgversion is the changegroup version to send
2592 ellipses is boolean value telling whether to send ellipses data or not
2601 ellipses is boolean value telling whether to send ellipses data or not
2593
2602
2594 returns bundle2 of the data required for extending
2603 returns bundle2 of the data required for extending
2595 """
2604 """
2596 commonnodes = set()
2605 commonnodes = set()
2597 cl = repo.changelog
2606 cl = repo.changelog
2598 for r in repo.revs(b"::%ln", common):
2607 for r in repo.revs(b"::%ln", common):
2599 commonnodes.add(cl.node(r))
2608 commonnodes.add(cl.node(r))
2600 if commonnodes:
2609 if commonnodes:
2601 packer = changegroup.getbundler(
2610 packer = changegroup.getbundler(
2602 cgversion,
2611 cgversion,
2603 repo,
2612 repo,
2604 oldmatcher=oldmatcher,
2613 oldmatcher=oldmatcher,
2605 matcher=newmatcher,
2614 matcher=newmatcher,
2606 fullnodes=commonnodes,
2615 fullnodes=commonnodes,
2607 )
2616 )
2608 cgdata = packer.generate(
2617 cgdata = packer.generate(
2609 {repo.nullid},
2618 {repo.nullid},
2610 list(commonnodes),
2619 list(commonnodes),
2611 False,
2620 False,
2612 b'narrow_widen',
2621 b'narrow_widen',
2613 changelog=False,
2622 changelog=False,
2614 )
2623 )
2615
2624
2616 part = bundler.newpart(b'changegroup', data=cgdata)
2625 part = bundler.newpart(b'changegroup', data=cgdata)
2617 part.addparam(b'version', cgversion)
2626 part.addparam(b'version', cgversion)
2618 if scmutil.istreemanifest(repo):
2627 if scmutil.istreemanifest(repo):
2619 part.addparam(b'treemanifest', b'1')
2628 part.addparam(b'treemanifest', b'1')
2620 if repository.REPO_FEATURE_SIDE_DATA in repo.features:
2629 if repository.REPO_FEATURE_SIDE_DATA in repo.features:
2621 part.addparam(b'exp-sidedata', b'1')
2630 part.addparam(b'exp-sidedata', b'1')
2622 wanted = format_remote_wanted_sidedata(repo)
2631 wanted = format_remote_wanted_sidedata(repo)
2623 part.addparam(b'exp-wanted-sidedata', wanted)
2632 part.addparam(b'exp-wanted-sidedata', wanted)
2624
2633
2625 return bundler
2634 return bundler
@@ -1,579 +1,588 b''
1 # repair.py - functions for repository repair for mercurial
1 # repair.py - functions for repository repair for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Chris Mason <mason@suse.com>
3 # Copyright 2005, 2006 Chris Mason <mason@suse.com>
4 # Copyright 2007 Olivia Mackall
4 # Copyright 2007 Olivia Mackall
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8
8
9
9
10 import errno
10 import errno
11
11
12 from .i18n import _
12 from .i18n import _
13 from .node import (
13 from .node import (
14 hex,
14 hex,
15 short,
15 short,
16 )
16 )
17 from . import (
17 from . import (
18 bundle2,
18 bundle2,
19 changegroup,
19 changegroup,
20 discovery,
20 discovery,
21 error,
21 error,
22 exchange,
22 exchange,
23 obsolete,
23 obsolete,
24 obsutil,
24 obsutil,
25 pathutil,
25 pathutil,
26 phases,
26 phases,
27 requirements,
27 requirements,
28 scmutil,
28 scmutil,
29 util,
29 util,
30 )
30 )
31 from .utils import (
31 from .utils import (
32 hashutil,
32 hashutil,
33 stringutil,
33 stringutil,
34 urlutil,
34 urlutil,
35 )
35 )
36
36
37
37
38 def backupbundle(
38 def backupbundle(
39 repo, bases, heads, node, suffix, compress=True, obsolescence=True
39 repo,
40 bases,
41 heads,
42 node,
43 suffix,
44 compress=True,
45 obsolescence=True,
46 tmp_backup=False,
40 ):
47 ):
41 """create a bundle with the specified revisions as a backup"""
48 """create a bundle with the specified revisions as a backup"""
42
49
43 backupdir = b"strip-backup"
50 backupdir = b"strip-backup"
44 vfs = repo.vfs
51 vfs = repo.vfs
45 if not vfs.isdir(backupdir):
52 if not vfs.isdir(backupdir):
46 vfs.mkdir(backupdir)
53 vfs.mkdir(backupdir)
47
54
48 # Include a hash of all the nodes in the filename for uniqueness
55 # Include a hash of all the nodes in the filename for uniqueness
49 allcommits = repo.set(b'%ln::%ln', bases, heads)
56 allcommits = repo.set(b'%ln::%ln', bases, heads)
50 allhashes = sorted(c.hex() for c in allcommits)
57 allhashes = sorted(c.hex() for c in allcommits)
51 totalhash = hashutil.sha1(b''.join(allhashes)).digest()
58 totalhash = hashutil.sha1(b''.join(allhashes)).digest()
52 name = b"%s/%s-%s-%s.hg" % (
59 name = b"%s/%s-%s-%s.hg" % (
53 backupdir,
60 backupdir,
54 short(node),
61 short(node),
55 hex(totalhash[:4]),
62 hex(totalhash[:4]),
56 suffix,
63 suffix,
57 )
64 )
58
65
59 cgversion = changegroup.localversion(repo)
66 cgversion = changegroup.localversion(repo)
60 comp = None
67 comp = None
61 if cgversion != b'01':
68 if cgversion != b'01':
62 bundletype = b"HG20"
69 bundletype = b"HG20"
63 if compress:
70 if compress:
64 comp = b'BZ'
71 comp = b'BZ'
65 elif compress:
72 elif compress:
66 bundletype = b"HG10BZ"
73 bundletype = b"HG10BZ"
67 else:
74 else:
68 bundletype = b"HG10UN"
75 bundletype = b"HG10UN"
69
76
70 outgoing = discovery.outgoing(repo, missingroots=bases, ancestorsof=heads)
77 outgoing = discovery.outgoing(repo, missingroots=bases, ancestorsof=heads)
71 contentopts = {
78 contentopts = {
72 b'cg.version': cgversion,
79 b'cg.version': cgversion,
73 b'obsolescence': obsolescence,
80 b'obsolescence': obsolescence,
74 b'phases': True,
81 b'phases': True,
75 }
82 }
76 return bundle2.writenewbundle(
83 return bundle2.writenewbundle(
77 repo.ui,
84 repo.ui,
78 repo,
85 repo,
79 b'strip',
86 b'strip',
80 name,
87 name,
81 bundletype,
88 bundletype,
82 outgoing,
89 outgoing,
83 contentopts,
90 contentopts,
84 vfs,
91 vfs,
85 compression=comp,
92 compression=comp,
93 allow_internal=tmp_backup,
86 )
94 )
87
95
88
96
89 def _collectfiles(repo, striprev):
97 def _collectfiles(repo, striprev):
90 """find out the filelogs affected by the strip"""
98 """find out the filelogs affected by the strip"""
91 files = set()
99 files = set()
92
100
93 for x in range(striprev, len(repo)):
101 for x in range(striprev, len(repo)):
94 files.update(repo[x].files())
102 files.update(repo[x].files())
95
103
96 return sorted(files)
104 return sorted(files)
97
105
98
106
99 def _collectrevlog(revlog, striprev):
107 def _collectrevlog(revlog, striprev):
100 _, brokenset = revlog.getstrippoint(striprev)
108 _, brokenset = revlog.getstrippoint(striprev)
101 return [revlog.linkrev(r) for r in brokenset]
109 return [revlog.linkrev(r) for r in brokenset]
102
110
103
111
104 def _collectbrokencsets(repo, files, striprev):
112 def _collectbrokencsets(repo, files, striprev):
105 """return the changesets which will be broken by the truncation"""
113 """return the changesets which will be broken by the truncation"""
106 s = set()
114 s = set()
107
115
108 for revlog in manifestrevlogs(repo):
116 for revlog in manifestrevlogs(repo):
109 s.update(_collectrevlog(revlog, striprev))
117 s.update(_collectrevlog(revlog, striprev))
110 for fname in files:
118 for fname in files:
111 s.update(_collectrevlog(repo.file(fname), striprev))
119 s.update(_collectrevlog(repo.file(fname), striprev))
112
120
113 return s
121 return s
114
122
115
123
116 def strip(ui, repo, nodelist, backup=True, topic=b'backup'):
124 def strip(ui, repo, nodelist, backup=True, topic=b'backup'):
117 # This function requires the caller to lock the repo, but it operates
125 # This function requires the caller to lock the repo, but it operates
118 # within a transaction of its own, and thus requires there to be no current
126 # within a transaction of its own, and thus requires there to be no current
119 # transaction when it is called.
127 # transaction when it is called.
120 if repo.currenttransaction() is not None:
128 if repo.currenttransaction() is not None:
121 raise error.ProgrammingError(b'cannot strip from inside a transaction')
129 raise error.ProgrammingError(b'cannot strip from inside a transaction')
122
130
123 # Simple way to maintain backwards compatibility for this
131 # Simple way to maintain backwards compatibility for this
124 # argument.
132 # argument.
125 if backup in [b'none', b'strip']:
133 if backup in [b'none', b'strip']:
126 backup = False
134 backup = False
127
135
128 repo = repo.unfiltered()
136 repo = repo.unfiltered()
129 repo.destroying()
137 repo.destroying()
130 vfs = repo.vfs
138 vfs = repo.vfs
131 # load bookmark before changelog to avoid side effect from outdated
139 # load bookmark before changelog to avoid side effect from outdated
132 # changelog (see repo._refreshchangelog)
140 # changelog (see repo._refreshchangelog)
133 repo._bookmarks
141 repo._bookmarks
134 cl = repo.changelog
142 cl = repo.changelog
135
143
136 # TODO handle undo of merge sets
144 # TODO handle undo of merge sets
137 if isinstance(nodelist, bytes):
145 if isinstance(nodelist, bytes):
138 nodelist = [nodelist]
146 nodelist = [nodelist]
139 striplist = [cl.rev(node) for node in nodelist]
147 striplist = [cl.rev(node) for node in nodelist]
140 striprev = min(striplist)
148 striprev = min(striplist)
141
149
142 files = _collectfiles(repo, striprev)
150 files = _collectfiles(repo, striprev)
143 saverevs = _collectbrokencsets(repo, files, striprev)
151 saverevs = _collectbrokencsets(repo, files, striprev)
144
152
145 # Some revisions with rev > striprev may not be descendants of striprev.
153 # Some revisions with rev > striprev may not be descendants of striprev.
146 # We have to find these revisions and put them in a bundle, so that
154 # We have to find these revisions and put them in a bundle, so that
147 # we can restore them after the truncations.
155 # we can restore them after the truncations.
148 # To create the bundle we use repo.changegroupsubset which requires
156 # To create the bundle we use repo.changegroupsubset which requires
149 # the list of heads and bases of the set of interesting revisions.
157 # the list of heads and bases of the set of interesting revisions.
150 # (head = revision in the set that has no descendant in the set;
158 # (head = revision in the set that has no descendant in the set;
151 # base = revision in the set that has no ancestor in the set)
159 # base = revision in the set that has no ancestor in the set)
152 tostrip = set(striplist)
160 tostrip = set(striplist)
153 saveheads = set(saverevs)
161 saveheads = set(saverevs)
154 for r in cl.revs(start=striprev + 1):
162 for r in cl.revs(start=striprev + 1):
155 if any(p in tostrip for p in cl.parentrevs(r)):
163 if any(p in tostrip for p in cl.parentrevs(r)):
156 tostrip.add(r)
164 tostrip.add(r)
157
165
158 if r not in tostrip:
166 if r not in tostrip:
159 saverevs.add(r)
167 saverevs.add(r)
160 saveheads.difference_update(cl.parentrevs(r))
168 saveheads.difference_update(cl.parentrevs(r))
161 saveheads.add(r)
169 saveheads.add(r)
162 saveheads = [cl.node(r) for r in saveheads]
170 saveheads = [cl.node(r) for r in saveheads]
163
171
164 # compute base nodes
172 # compute base nodes
165 if saverevs:
173 if saverevs:
166 descendants = set(cl.descendants(saverevs))
174 descendants = set(cl.descendants(saverevs))
167 saverevs.difference_update(descendants)
175 saverevs.difference_update(descendants)
168 savebases = [cl.node(r) for r in saverevs]
176 savebases = [cl.node(r) for r in saverevs]
169 stripbases = [cl.node(r) for r in tostrip]
177 stripbases = [cl.node(r) for r in tostrip]
170
178
171 stripobsidx = obsmarkers = ()
179 stripobsidx = obsmarkers = ()
172 if repo.ui.configbool(b'devel', b'strip-obsmarkers'):
180 if repo.ui.configbool(b'devel', b'strip-obsmarkers'):
173 obsmarkers = obsutil.exclusivemarkers(repo, stripbases)
181 obsmarkers = obsutil.exclusivemarkers(repo, stripbases)
174 if obsmarkers:
182 if obsmarkers:
175 stripobsidx = [
183 stripobsidx = [
176 i for i, m in enumerate(repo.obsstore) if m in obsmarkers
184 i for i, m in enumerate(repo.obsstore) if m in obsmarkers
177 ]
185 ]
178
186
179 newbmtarget, updatebm = _bookmarkmovements(repo, tostrip)
187 newbmtarget, updatebm = _bookmarkmovements(repo, tostrip)
180
188
181 backupfile = None
189 backupfile = None
182 node = nodelist[-1]
190 node = nodelist[-1]
183 if backup:
191 if backup:
184 backupfile = _createstripbackup(repo, stripbases, node, topic)
192 backupfile = _createstripbackup(repo, stripbases, node, topic)
185 # create a changegroup for all the branches we need to keep
193 # create a changegroup for all the branches we need to keep
186 tmpbundlefile = None
194 tmpbundlefile = None
187 if saveheads:
195 if saveheads:
188 # do not compress temporary bundle if we remove it from disk later
196 # do not compress temporary bundle if we remove it from disk later
189 #
197 #
190 # We do not include obsolescence, it might re-introduce prune markers
198 # We do not include obsolescence, it might re-introduce prune markers
191 # we are trying to strip. This is harmless since the stripped markers
199 # we are trying to strip. This is harmless since the stripped markers
192 # are already backed up and we did not touched the markers for the
200 # are already backed up and we did not touched the markers for the
193 # saved changesets.
201 # saved changesets.
194 tmpbundlefile = backupbundle(
202 tmpbundlefile = backupbundle(
195 repo,
203 repo,
196 savebases,
204 savebases,
197 saveheads,
205 saveheads,
198 node,
206 node,
199 b'temp',
207 b'temp',
200 compress=False,
208 compress=False,
201 obsolescence=False,
209 obsolescence=False,
210 tmp_backup=True,
202 )
211 )
203
212
204 with ui.uninterruptible():
213 with ui.uninterruptible():
205 try:
214 try:
206 with repo.transaction(b"strip") as tr:
215 with repo.transaction(b"strip") as tr:
207 # TODO this code violates the interface abstraction of the
216 # TODO this code violates the interface abstraction of the
208 # transaction and makes assumptions that file storage is
217 # transaction and makes assumptions that file storage is
209 # using append-only files. We'll need some kind of storage
218 # using append-only files. We'll need some kind of storage
210 # API to handle stripping for us.
219 # API to handle stripping for us.
211 oldfiles = set(tr._offsetmap.keys())
220 oldfiles = set(tr._offsetmap.keys())
212 oldfiles.update(tr._newfiles)
221 oldfiles.update(tr._newfiles)
213
222
214 tr.startgroup()
223 tr.startgroup()
215 cl.strip(striprev, tr)
224 cl.strip(striprev, tr)
216 stripmanifest(repo, striprev, tr, files)
225 stripmanifest(repo, striprev, tr, files)
217
226
218 for fn in files:
227 for fn in files:
219 repo.file(fn).strip(striprev, tr)
228 repo.file(fn).strip(striprev, tr)
220 tr.endgroup()
229 tr.endgroup()
221
230
222 entries = tr.readjournal()
231 entries = tr.readjournal()
223
232
224 for file, troffset in entries:
233 for file, troffset in entries:
225 if file in oldfiles:
234 if file in oldfiles:
226 continue
235 continue
227 with repo.svfs(file, b'a', checkambig=True) as fp:
236 with repo.svfs(file, b'a', checkambig=True) as fp:
228 fp.truncate(troffset)
237 fp.truncate(troffset)
229 if troffset == 0:
238 if troffset == 0:
230 repo.store.markremoved(file)
239 repo.store.markremoved(file)
231
240
232 deleteobsmarkers(repo.obsstore, stripobsidx)
241 deleteobsmarkers(repo.obsstore, stripobsidx)
233 del repo.obsstore
242 del repo.obsstore
234 repo.invalidatevolatilesets()
243 repo.invalidatevolatilesets()
235 repo._phasecache.filterunknown(repo)
244 repo._phasecache.filterunknown(repo)
236
245
237 if tmpbundlefile:
246 if tmpbundlefile:
238 ui.note(_(b"adding branch\n"))
247 ui.note(_(b"adding branch\n"))
239 f = vfs.open(tmpbundlefile, b"rb")
248 f = vfs.open(tmpbundlefile, b"rb")
240 gen = exchange.readbundle(ui, f, tmpbundlefile, vfs)
249 gen = exchange.readbundle(ui, f, tmpbundlefile, vfs)
241 # silence internal shuffling chatter
250 # silence internal shuffling chatter
242 maybe_silent = (
251 maybe_silent = (
243 repo.ui.silent()
252 repo.ui.silent()
244 if not repo.ui.verbose
253 if not repo.ui.verbose
245 else util.nullcontextmanager()
254 else util.nullcontextmanager()
246 )
255 )
247 with maybe_silent:
256 with maybe_silent:
248 tmpbundleurl = b'bundle:' + vfs.join(tmpbundlefile)
257 tmpbundleurl = b'bundle:' + vfs.join(tmpbundlefile)
249 txnname = b'strip'
258 txnname = b'strip'
250 if not isinstance(gen, bundle2.unbundle20):
259 if not isinstance(gen, bundle2.unbundle20):
251 txnname = b"strip\n%s" % urlutil.hidepassword(
260 txnname = b"strip\n%s" % urlutil.hidepassword(
252 tmpbundleurl
261 tmpbundleurl
253 )
262 )
254 with repo.transaction(txnname) as tr:
263 with repo.transaction(txnname) as tr:
255 bundle2.applybundle(
264 bundle2.applybundle(
256 repo, gen, tr, source=b'strip', url=tmpbundleurl
265 repo, gen, tr, source=b'strip', url=tmpbundleurl
257 )
266 )
258 f.close()
267 f.close()
259
268
260 with repo.transaction(b'repair') as tr:
269 with repo.transaction(b'repair') as tr:
261 bmchanges = [(m, repo[newbmtarget].node()) for m in updatebm]
270 bmchanges = [(m, repo[newbmtarget].node()) for m in updatebm]
262 repo._bookmarks.applychanges(repo, tr, bmchanges)
271 repo._bookmarks.applychanges(repo, tr, bmchanges)
263
272
264 # remove undo files
273 # remove undo files
265 for undovfs, undofile in repo.undofiles():
274 for undovfs, undofile in repo.undofiles():
266 try:
275 try:
267 undovfs.unlink(undofile)
276 undovfs.unlink(undofile)
268 except OSError as e:
277 except OSError as e:
269 if e.errno != errno.ENOENT:
278 if e.errno != errno.ENOENT:
270 ui.warn(
279 ui.warn(
271 _(b'error removing %s: %s\n')
280 _(b'error removing %s: %s\n')
272 % (
281 % (
273 undovfs.join(undofile),
282 undovfs.join(undofile),
274 stringutil.forcebytestr(e),
283 stringutil.forcebytestr(e),
275 )
284 )
276 )
285 )
277
286
278 except: # re-raises
287 except: # re-raises
279 if backupfile:
288 if backupfile:
280 ui.warn(
289 ui.warn(
281 _(b"strip failed, backup bundle stored in '%s'\n")
290 _(b"strip failed, backup bundle stored in '%s'\n")
282 % vfs.join(backupfile)
291 % vfs.join(backupfile)
283 )
292 )
284 if tmpbundlefile:
293 if tmpbundlefile:
285 ui.warn(
294 ui.warn(
286 _(b"strip failed, unrecovered changes stored in '%s'\n")
295 _(b"strip failed, unrecovered changes stored in '%s'\n")
287 % vfs.join(tmpbundlefile)
296 % vfs.join(tmpbundlefile)
288 )
297 )
289 ui.warn(
298 ui.warn(
290 _(
299 _(
291 b"(fix the problem, then recover the changesets with "
300 b"(fix the problem, then recover the changesets with "
292 b"\"hg unbundle '%s'\")\n"
301 b"\"hg unbundle '%s'\")\n"
293 )
302 )
294 % vfs.join(tmpbundlefile)
303 % vfs.join(tmpbundlefile)
295 )
304 )
296 raise
305 raise
297 else:
306 else:
298 if tmpbundlefile:
307 if tmpbundlefile:
299 # Remove temporary bundle only if there were no exceptions
308 # Remove temporary bundle only if there were no exceptions
300 vfs.unlink(tmpbundlefile)
309 vfs.unlink(tmpbundlefile)
301
310
302 repo.destroyed()
311 repo.destroyed()
303 # return the backup file path (or None if 'backup' was False) so
312 # return the backup file path (or None if 'backup' was False) so
304 # extensions can use it
313 # extensions can use it
305 return backupfile
314 return backupfile
306
315
307
316
308 def softstrip(ui, repo, nodelist, backup=True, topic=b'backup'):
317 def softstrip(ui, repo, nodelist, backup=True, topic=b'backup'):
309 """perform a "soft" strip using the archived phase"""
318 """perform a "soft" strip using the archived phase"""
310 tostrip = [c.node() for c in repo.set(b'sort(%ln::)', nodelist)]
319 tostrip = [c.node() for c in repo.set(b'sort(%ln::)', nodelist)]
311 if not tostrip:
320 if not tostrip:
312 return None
321 return None
313
322
314 backupfile = None
323 backupfile = None
315 if backup:
324 if backup:
316 node = tostrip[0]
325 node = tostrip[0]
317 backupfile = _createstripbackup(repo, tostrip, node, topic)
326 backupfile = _createstripbackup(repo, tostrip, node, topic)
318
327
319 newbmtarget, updatebm = _bookmarkmovements(repo, tostrip)
328 newbmtarget, updatebm = _bookmarkmovements(repo, tostrip)
320 with repo.transaction(b'strip') as tr:
329 with repo.transaction(b'strip') as tr:
321 phases.retractboundary(repo, tr, phases.archived, tostrip)
330 phases.retractboundary(repo, tr, phases.archived, tostrip)
322 bmchanges = [(m, repo[newbmtarget].node()) for m in updatebm]
331 bmchanges = [(m, repo[newbmtarget].node()) for m in updatebm]
323 repo._bookmarks.applychanges(repo, tr, bmchanges)
332 repo._bookmarks.applychanges(repo, tr, bmchanges)
324 return backupfile
333 return backupfile
325
334
326
335
327 def _bookmarkmovements(repo, tostrip):
336 def _bookmarkmovements(repo, tostrip):
328 # compute necessary bookmark movement
337 # compute necessary bookmark movement
329 bm = repo._bookmarks
338 bm = repo._bookmarks
330 updatebm = []
339 updatebm = []
331 for m in bm:
340 for m in bm:
332 rev = repo[bm[m]].rev()
341 rev = repo[bm[m]].rev()
333 if rev in tostrip:
342 if rev in tostrip:
334 updatebm.append(m)
343 updatebm.append(m)
335 newbmtarget = None
344 newbmtarget = None
336 # If we need to move bookmarks, compute bookmark
345 # If we need to move bookmarks, compute bookmark
337 # targets. Otherwise we can skip doing this logic.
346 # targets. Otherwise we can skip doing this logic.
338 if updatebm:
347 if updatebm:
339 # For a set s, max(parents(s) - s) is the same as max(heads(::s - s)),
348 # For a set s, max(parents(s) - s) is the same as max(heads(::s - s)),
340 # but is much faster
349 # but is much faster
341 newbmtarget = repo.revs(b'max(parents(%ld) - (%ld))', tostrip, tostrip)
350 newbmtarget = repo.revs(b'max(parents(%ld) - (%ld))', tostrip, tostrip)
342 if newbmtarget:
351 if newbmtarget:
343 newbmtarget = repo[newbmtarget.first()].node()
352 newbmtarget = repo[newbmtarget.first()].node()
344 else:
353 else:
345 newbmtarget = b'.'
354 newbmtarget = b'.'
346 return newbmtarget, updatebm
355 return newbmtarget, updatebm
347
356
348
357
349 def _createstripbackup(repo, stripbases, node, topic):
358 def _createstripbackup(repo, stripbases, node, topic):
350 # backup the changeset we are about to strip
359 # backup the changeset we are about to strip
351 vfs = repo.vfs
360 vfs = repo.vfs
352 unfi = repo.unfiltered()
361 unfi = repo.unfiltered()
353 to_node = unfi.changelog.node
362 to_node = unfi.changelog.node
354 # internal changeset are internal implementation details that should not
363 # internal changeset are internal implementation details that should not
355 # leave the repository and not be exposed to the users. In addition feature
364 # leave the repository and not be exposed to the users. In addition feature
356 # using them requires to be resistant to strip. See test case for more
365 # using them requires to be resistant to strip. See test case for more
357 # details.
366 # details.
358 all_backup = unfi.revs(
367 all_backup = unfi.revs(
359 b"(%ln)::(%ld) and not _internal()",
368 b"(%ln)::(%ld) and not _internal()",
360 stripbases,
369 stripbases,
361 unfi.changelog.headrevs(),
370 unfi.changelog.headrevs(),
362 )
371 )
363 if not all_backup:
372 if not all_backup:
364 return None
373 return None
365
374
366 def to_nodes(revs):
375 def to_nodes(revs):
367 return [to_node(r) for r in revs]
376 return [to_node(r) for r in revs]
368
377
369 bases = to_nodes(unfi.revs("roots(%ld)", all_backup))
378 bases = to_nodes(unfi.revs("roots(%ld)", all_backup))
370 heads = to_nodes(unfi.revs("heads(%ld)", all_backup))
379 heads = to_nodes(unfi.revs("heads(%ld)", all_backup))
371 backupfile = backupbundle(repo, bases, heads, node, topic)
380 backupfile = backupbundle(repo, bases, heads, node, topic)
372 repo.ui.status(_(b"saved backup bundle to %s\n") % vfs.join(backupfile))
381 repo.ui.status(_(b"saved backup bundle to %s\n") % vfs.join(backupfile))
373 repo.ui.log(
382 repo.ui.log(
374 b"backupbundle", b"saved backup bundle to %s\n", vfs.join(backupfile)
383 b"backupbundle", b"saved backup bundle to %s\n", vfs.join(backupfile)
375 )
384 )
376 return backupfile
385 return backupfile
377
386
378
387
379 def safestriproots(ui, repo, nodes):
388 def safestriproots(ui, repo, nodes):
380 """return list of roots of nodes where descendants are covered by nodes"""
389 """return list of roots of nodes where descendants are covered by nodes"""
381 torev = repo.unfiltered().changelog.rev
390 torev = repo.unfiltered().changelog.rev
382 revs = {torev(n) for n in nodes}
391 revs = {torev(n) for n in nodes}
383 # tostrip = wanted - unsafe = wanted - ancestors(orphaned)
392 # tostrip = wanted - unsafe = wanted - ancestors(orphaned)
384 # orphaned = affected - wanted
393 # orphaned = affected - wanted
385 # affected = descendants(roots(wanted))
394 # affected = descendants(roots(wanted))
386 # wanted = revs
395 # wanted = revs
387 revset = b'%ld - ( ::( (roots(%ld):: and not _phase(%s)) -%ld) )'
396 revset = b'%ld - ( ::( (roots(%ld):: and not _phase(%s)) -%ld) )'
388 tostrip = set(repo.revs(revset, revs, revs, phases.internal, revs))
397 tostrip = set(repo.revs(revset, revs, revs, phases.internal, revs))
389 notstrip = revs - tostrip
398 notstrip = revs - tostrip
390 if notstrip:
399 if notstrip:
391 nodestr = b', '.join(sorted(short(repo[n].node()) for n in notstrip))
400 nodestr = b', '.join(sorted(short(repo[n].node()) for n in notstrip))
392 ui.warn(
401 ui.warn(
393 _(b'warning: orphaned descendants detected, not stripping %s\n')
402 _(b'warning: orphaned descendants detected, not stripping %s\n')
394 % nodestr
403 % nodestr
395 )
404 )
396 return [c.node() for c in repo.set(b'roots(%ld)', tostrip)]
405 return [c.node() for c in repo.set(b'roots(%ld)', tostrip)]
397
406
398
407
399 class stripcallback:
408 class stripcallback:
400 """used as a transaction postclose callback"""
409 """used as a transaction postclose callback"""
401
410
402 def __init__(self, ui, repo, backup, topic):
411 def __init__(self, ui, repo, backup, topic):
403 self.ui = ui
412 self.ui = ui
404 self.repo = repo
413 self.repo = repo
405 self.backup = backup
414 self.backup = backup
406 self.topic = topic or b'backup'
415 self.topic = topic or b'backup'
407 self.nodelist = []
416 self.nodelist = []
408
417
409 def addnodes(self, nodes):
418 def addnodes(self, nodes):
410 self.nodelist.extend(nodes)
419 self.nodelist.extend(nodes)
411
420
412 def __call__(self, tr):
421 def __call__(self, tr):
413 roots = safestriproots(self.ui, self.repo, self.nodelist)
422 roots = safestriproots(self.ui, self.repo, self.nodelist)
414 if roots:
423 if roots:
415 strip(self.ui, self.repo, roots, self.backup, self.topic)
424 strip(self.ui, self.repo, roots, self.backup, self.topic)
416
425
417
426
418 def delayedstrip(ui, repo, nodelist, topic=None, backup=True):
427 def delayedstrip(ui, repo, nodelist, topic=None, backup=True):
419 """like strip, but works inside transaction and won't strip irreverent revs
428 """like strip, but works inside transaction and won't strip irreverent revs
420
429
421 nodelist must explicitly contain all descendants. Otherwise a warning will
430 nodelist must explicitly contain all descendants. Otherwise a warning will
422 be printed that some nodes are not stripped.
431 be printed that some nodes are not stripped.
423
432
424 Will do a backup if `backup` is True. The last non-None "topic" will be
433 Will do a backup if `backup` is True. The last non-None "topic" will be
425 used as the backup topic name. The default backup topic name is "backup".
434 used as the backup topic name. The default backup topic name is "backup".
426 """
435 """
427 tr = repo.currenttransaction()
436 tr = repo.currenttransaction()
428 if not tr:
437 if not tr:
429 nodes = safestriproots(ui, repo, nodelist)
438 nodes = safestriproots(ui, repo, nodelist)
430 return strip(ui, repo, nodes, backup=backup, topic=topic)
439 return strip(ui, repo, nodes, backup=backup, topic=topic)
431 # transaction postclose callbacks are called in alphabet order.
440 # transaction postclose callbacks are called in alphabet order.
432 # use '\xff' as prefix so we are likely to be called last.
441 # use '\xff' as prefix so we are likely to be called last.
433 callback = tr.getpostclose(b'\xffstrip')
442 callback = tr.getpostclose(b'\xffstrip')
434 if callback is None:
443 if callback is None:
435 callback = stripcallback(ui, repo, backup=backup, topic=topic)
444 callback = stripcallback(ui, repo, backup=backup, topic=topic)
436 tr.addpostclose(b'\xffstrip', callback)
445 tr.addpostclose(b'\xffstrip', callback)
437 if topic:
446 if topic:
438 callback.topic = topic
447 callback.topic = topic
439 callback.addnodes(nodelist)
448 callback.addnodes(nodelist)
440
449
441
450
442 def stripmanifest(repo, striprev, tr, files):
451 def stripmanifest(repo, striprev, tr, files):
443 for revlog in manifestrevlogs(repo):
452 for revlog in manifestrevlogs(repo):
444 revlog.strip(striprev, tr)
453 revlog.strip(striprev, tr)
445
454
446
455
447 def manifestrevlogs(repo):
456 def manifestrevlogs(repo):
448 yield repo.manifestlog.getstorage(b'')
457 yield repo.manifestlog.getstorage(b'')
449 if scmutil.istreemanifest(repo):
458 if scmutil.istreemanifest(repo):
450 # This logic is safe if treemanifest isn't enabled, but also
459 # This logic is safe if treemanifest isn't enabled, but also
451 # pointless, so we skip it if treemanifest isn't enabled.
460 # pointless, so we skip it if treemanifest isn't enabled.
452 for t, unencoded, size in repo.store.datafiles():
461 for t, unencoded, size in repo.store.datafiles():
453 if unencoded.startswith(b'meta/') and unencoded.endswith(
462 if unencoded.startswith(b'meta/') and unencoded.endswith(
454 b'00manifest.i'
463 b'00manifest.i'
455 ):
464 ):
456 dir = unencoded[5:-12]
465 dir = unencoded[5:-12]
457 yield repo.manifestlog.getstorage(dir)
466 yield repo.manifestlog.getstorage(dir)
458
467
459
468
460 def rebuildfncache(ui, repo, only_data=False):
469 def rebuildfncache(ui, repo, only_data=False):
461 """Rebuilds the fncache file from repo history.
470 """Rebuilds the fncache file from repo history.
462
471
463 Missing entries will be added. Extra entries will be removed.
472 Missing entries will be added. Extra entries will be removed.
464 """
473 """
465 repo = repo.unfiltered()
474 repo = repo.unfiltered()
466
475
467 if requirements.FNCACHE_REQUIREMENT not in repo.requirements:
476 if requirements.FNCACHE_REQUIREMENT not in repo.requirements:
468 ui.warn(
477 ui.warn(
469 _(
478 _(
470 b'(not rebuilding fncache because repository does not '
479 b'(not rebuilding fncache because repository does not '
471 b'support fncache)\n'
480 b'support fncache)\n'
472 )
481 )
473 )
482 )
474 return
483 return
475
484
476 with repo.lock():
485 with repo.lock():
477 fnc = repo.store.fncache
486 fnc = repo.store.fncache
478 fnc.ensureloaded(warn=ui.warn)
487 fnc.ensureloaded(warn=ui.warn)
479
488
480 oldentries = set(fnc.entries)
489 oldentries = set(fnc.entries)
481 newentries = set()
490 newentries = set()
482 seenfiles = set()
491 seenfiles = set()
483
492
484 if only_data:
493 if only_data:
485 # Trust the listing of .i from the fncache, but not the .d. This is
494 # Trust the listing of .i from the fncache, but not the .d. This is
486 # much faster, because we only need to stat every possible .d files,
495 # much faster, because we only need to stat every possible .d files,
487 # instead of reading the full changelog
496 # instead of reading the full changelog
488 for f in fnc:
497 for f in fnc:
489 if f[:5] == b'data/' and f[-2:] == b'.i':
498 if f[:5] == b'data/' and f[-2:] == b'.i':
490 seenfiles.add(f[5:-2])
499 seenfiles.add(f[5:-2])
491 newentries.add(f)
500 newentries.add(f)
492 dataf = f[:-2] + b'.d'
501 dataf = f[:-2] + b'.d'
493 if repo.store._exists(dataf):
502 if repo.store._exists(dataf):
494 newentries.add(dataf)
503 newentries.add(dataf)
495 else:
504 else:
496 progress = ui.makeprogress(
505 progress = ui.makeprogress(
497 _(b'rebuilding'), unit=_(b'changesets'), total=len(repo)
506 _(b'rebuilding'), unit=_(b'changesets'), total=len(repo)
498 )
507 )
499 for rev in repo:
508 for rev in repo:
500 progress.update(rev)
509 progress.update(rev)
501
510
502 ctx = repo[rev]
511 ctx = repo[rev]
503 for f in ctx.files():
512 for f in ctx.files():
504 # This is to minimize I/O.
513 # This is to minimize I/O.
505 if f in seenfiles:
514 if f in seenfiles:
506 continue
515 continue
507 seenfiles.add(f)
516 seenfiles.add(f)
508
517
509 i = b'data/%s.i' % f
518 i = b'data/%s.i' % f
510 d = b'data/%s.d' % f
519 d = b'data/%s.d' % f
511
520
512 if repo.store._exists(i):
521 if repo.store._exists(i):
513 newentries.add(i)
522 newentries.add(i)
514 if repo.store._exists(d):
523 if repo.store._exists(d):
515 newentries.add(d)
524 newentries.add(d)
516
525
517 progress.complete()
526 progress.complete()
518
527
519 if requirements.TREEMANIFEST_REQUIREMENT in repo.requirements:
528 if requirements.TREEMANIFEST_REQUIREMENT in repo.requirements:
520 # This logic is safe if treemanifest isn't enabled, but also
529 # This logic is safe if treemanifest isn't enabled, but also
521 # pointless, so we skip it if treemanifest isn't enabled.
530 # pointless, so we skip it if treemanifest isn't enabled.
522 for dir in pathutil.dirs(seenfiles):
531 for dir in pathutil.dirs(seenfiles):
523 i = b'meta/%s/00manifest.i' % dir
532 i = b'meta/%s/00manifest.i' % dir
524 d = b'meta/%s/00manifest.d' % dir
533 d = b'meta/%s/00manifest.d' % dir
525
534
526 if repo.store._exists(i):
535 if repo.store._exists(i):
527 newentries.add(i)
536 newentries.add(i)
528 if repo.store._exists(d):
537 if repo.store._exists(d):
529 newentries.add(d)
538 newentries.add(d)
530
539
531 addcount = len(newentries - oldentries)
540 addcount = len(newentries - oldentries)
532 removecount = len(oldentries - newentries)
541 removecount = len(oldentries - newentries)
533 for p in sorted(oldentries - newentries):
542 for p in sorted(oldentries - newentries):
534 ui.write(_(b'removing %s\n') % p)
543 ui.write(_(b'removing %s\n') % p)
535 for p in sorted(newentries - oldentries):
544 for p in sorted(newentries - oldentries):
536 ui.write(_(b'adding %s\n') % p)
545 ui.write(_(b'adding %s\n') % p)
537
546
538 if addcount or removecount:
547 if addcount or removecount:
539 ui.write(
548 ui.write(
540 _(b'%d items added, %d removed from fncache\n')
549 _(b'%d items added, %d removed from fncache\n')
541 % (addcount, removecount)
550 % (addcount, removecount)
542 )
551 )
543 fnc.entries = newentries
552 fnc.entries = newentries
544 fnc._dirty = True
553 fnc._dirty = True
545
554
546 with repo.transaction(b'fncache') as tr:
555 with repo.transaction(b'fncache') as tr:
547 fnc.write(tr)
556 fnc.write(tr)
548 else:
557 else:
549 ui.write(_(b'fncache already up to date\n'))
558 ui.write(_(b'fncache already up to date\n'))
550
559
551
560
552 def deleteobsmarkers(obsstore, indices):
561 def deleteobsmarkers(obsstore, indices):
553 """Delete some obsmarkers from obsstore and return how many were deleted
562 """Delete some obsmarkers from obsstore and return how many were deleted
554
563
555 'indices' is a list of ints which are the indices
564 'indices' is a list of ints which are the indices
556 of the markers to be deleted.
565 of the markers to be deleted.
557
566
558 Every invocation of this function completely rewrites the obsstore file,
567 Every invocation of this function completely rewrites the obsstore file,
559 skipping the markers we want to be removed. The new temporary file is
568 skipping the markers we want to be removed. The new temporary file is
560 created, remaining markers are written there and on .close() this file
569 created, remaining markers are written there and on .close() this file
561 gets atomically renamed to obsstore, thus guaranteeing consistency."""
570 gets atomically renamed to obsstore, thus guaranteeing consistency."""
562 if not indices:
571 if not indices:
563 # we don't want to rewrite the obsstore with the same content
572 # we don't want to rewrite the obsstore with the same content
564 return
573 return
565
574
566 left = []
575 left = []
567 current = obsstore._all
576 current = obsstore._all
568 n = 0
577 n = 0
569 for i, m in enumerate(current):
578 for i, m in enumerate(current):
570 if i in indices:
579 if i in indices:
571 n += 1
580 n += 1
572 continue
581 continue
573 left.append(m)
582 left.append(m)
574
583
575 newobsstorefile = obsstore.svfs(b'obsstore', b'w', atomictemp=True)
584 newobsstorefile = obsstore.svfs(b'obsstore', b'w', atomictemp=True)
576 for bytes in obsolete.encodemarkers(left, True, obsstore._version):
585 for bytes in obsolete.encodemarkers(left, True, obsstore._version):
577 newobsstorefile.write(bytes)
586 newobsstorefile.write(bytes)
578 newobsstorefile.close()
587 newobsstorefile.close()
579 return n
588 return n
General Comments 0
You need to be logged in to leave comments. Login now