##// END OF EJS Templates
bundle2: remove restriction around sidedata...
Raphaël Gomès -
r47845:1680c947 default
parent child Browse files
Show More
@@ -1,2594 +1,2587 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import collections
150 import collections
151 import errno
151 import errno
152 import os
152 import os
153 import re
153 import re
154 import string
154 import string
155 import struct
155 import struct
156 import sys
156 import sys
157
157
158 from .i18n import _
158 from .i18n import _
159 from .node import (
159 from .node import (
160 hex,
160 hex,
161 short,
161 short,
162 )
162 )
163 from . import (
163 from . import (
164 bookmarks,
164 bookmarks,
165 changegroup,
165 changegroup,
166 encoding,
166 encoding,
167 error,
167 error,
168 obsolete,
168 obsolete,
169 phases,
169 phases,
170 pushkey,
170 pushkey,
171 pycompat,
171 pycompat,
172 requirements,
172 requirements,
173 scmutil,
173 scmutil,
174 streamclone,
174 streamclone,
175 tags,
175 tags,
176 url,
176 url,
177 util,
177 util,
178 )
178 )
179 from .utils import (
179 from .utils import (
180 stringutil,
180 stringutil,
181 urlutil,
181 urlutil,
182 )
182 )
183
183
184 urlerr = util.urlerr
184 urlerr = util.urlerr
185 urlreq = util.urlreq
185 urlreq = util.urlreq
186
186
187 _pack = struct.pack
187 _pack = struct.pack
188 _unpack = struct.unpack
188 _unpack = struct.unpack
189
189
190 _fstreamparamsize = b'>i'
190 _fstreamparamsize = b'>i'
191 _fpartheadersize = b'>i'
191 _fpartheadersize = b'>i'
192 _fparttypesize = b'>B'
192 _fparttypesize = b'>B'
193 _fpartid = b'>I'
193 _fpartid = b'>I'
194 _fpayloadsize = b'>i'
194 _fpayloadsize = b'>i'
195 _fpartparamcount = b'>BB'
195 _fpartparamcount = b'>BB'
196
196
197 preferedchunksize = 32768
197 preferedchunksize = 32768
198
198
199 _parttypeforbidden = re.compile(b'[^a-zA-Z0-9_:-]')
199 _parttypeforbidden = re.compile(b'[^a-zA-Z0-9_:-]')
200
200
201
201
202 def outdebug(ui, message):
202 def outdebug(ui, message):
203 """debug regarding output stream (bundling)"""
203 """debug regarding output stream (bundling)"""
204 if ui.configbool(b'devel', b'bundle2.debug'):
204 if ui.configbool(b'devel', b'bundle2.debug'):
205 ui.debug(b'bundle2-output: %s\n' % message)
205 ui.debug(b'bundle2-output: %s\n' % message)
206
206
207
207
208 def indebug(ui, message):
208 def indebug(ui, message):
209 """debug on input stream (unbundling)"""
209 """debug on input stream (unbundling)"""
210 if ui.configbool(b'devel', b'bundle2.debug'):
210 if ui.configbool(b'devel', b'bundle2.debug'):
211 ui.debug(b'bundle2-input: %s\n' % message)
211 ui.debug(b'bundle2-input: %s\n' % message)
212
212
213
213
214 def validateparttype(parttype):
214 def validateparttype(parttype):
215 """raise ValueError if a parttype contains invalid character"""
215 """raise ValueError if a parttype contains invalid character"""
216 if _parttypeforbidden.search(parttype):
216 if _parttypeforbidden.search(parttype):
217 raise ValueError(parttype)
217 raise ValueError(parttype)
218
218
219
219
220 def _makefpartparamsizes(nbparams):
220 def _makefpartparamsizes(nbparams):
221 """return a struct format to read part parameter sizes
221 """return a struct format to read part parameter sizes
222
222
223 The number parameters is variable so we need to build that format
223 The number parameters is variable so we need to build that format
224 dynamically.
224 dynamically.
225 """
225 """
226 return b'>' + (b'BB' * nbparams)
226 return b'>' + (b'BB' * nbparams)
227
227
228
228
229 parthandlermapping = {}
229 parthandlermapping = {}
230
230
231
231
232 def parthandler(parttype, params=()):
232 def parthandler(parttype, params=()):
233 """decorator that register a function as a bundle2 part handler
233 """decorator that register a function as a bundle2 part handler
234
234
235 eg::
235 eg::
236
236
237 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
237 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
238 def myparttypehandler(...):
238 def myparttypehandler(...):
239 '''process a part of type "my part".'''
239 '''process a part of type "my part".'''
240 ...
240 ...
241 """
241 """
242 validateparttype(parttype)
242 validateparttype(parttype)
243
243
244 def _decorator(func):
244 def _decorator(func):
245 lparttype = parttype.lower() # enforce lower case matching.
245 lparttype = parttype.lower() # enforce lower case matching.
246 assert lparttype not in parthandlermapping
246 assert lparttype not in parthandlermapping
247 parthandlermapping[lparttype] = func
247 parthandlermapping[lparttype] = func
248 func.params = frozenset(params)
248 func.params = frozenset(params)
249 return func
249 return func
250
250
251 return _decorator
251 return _decorator
252
252
253
253
254 class unbundlerecords(object):
254 class unbundlerecords(object):
255 """keep record of what happens during and unbundle
255 """keep record of what happens during and unbundle
256
256
257 New records are added using `records.add('cat', obj)`. Where 'cat' is a
257 New records are added using `records.add('cat', obj)`. Where 'cat' is a
258 category of record and obj is an arbitrary object.
258 category of record and obj is an arbitrary object.
259
259
260 `records['cat']` will return all entries of this category 'cat'.
260 `records['cat']` will return all entries of this category 'cat'.
261
261
262 Iterating on the object itself will yield `('category', obj)` tuples
262 Iterating on the object itself will yield `('category', obj)` tuples
263 for all entries.
263 for all entries.
264
264
265 All iterations happens in chronological order.
265 All iterations happens in chronological order.
266 """
266 """
267
267
268 def __init__(self):
268 def __init__(self):
269 self._categories = {}
269 self._categories = {}
270 self._sequences = []
270 self._sequences = []
271 self._replies = {}
271 self._replies = {}
272
272
273 def add(self, category, entry, inreplyto=None):
273 def add(self, category, entry, inreplyto=None):
274 """add a new record of a given category.
274 """add a new record of a given category.
275
275
276 The entry can then be retrieved in the list returned by
276 The entry can then be retrieved in the list returned by
277 self['category']."""
277 self['category']."""
278 self._categories.setdefault(category, []).append(entry)
278 self._categories.setdefault(category, []).append(entry)
279 self._sequences.append((category, entry))
279 self._sequences.append((category, entry))
280 if inreplyto is not None:
280 if inreplyto is not None:
281 self.getreplies(inreplyto).add(category, entry)
281 self.getreplies(inreplyto).add(category, entry)
282
282
283 def getreplies(self, partid):
283 def getreplies(self, partid):
284 """get the records that are replies to a specific part"""
284 """get the records that are replies to a specific part"""
285 return self._replies.setdefault(partid, unbundlerecords())
285 return self._replies.setdefault(partid, unbundlerecords())
286
286
287 def __getitem__(self, cat):
287 def __getitem__(self, cat):
288 return tuple(self._categories.get(cat, ()))
288 return tuple(self._categories.get(cat, ()))
289
289
290 def __iter__(self):
290 def __iter__(self):
291 return iter(self._sequences)
291 return iter(self._sequences)
292
292
293 def __len__(self):
293 def __len__(self):
294 return len(self._sequences)
294 return len(self._sequences)
295
295
296 def __nonzero__(self):
296 def __nonzero__(self):
297 return bool(self._sequences)
297 return bool(self._sequences)
298
298
299 __bool__ = __nonzero__
299 __bool__ = __nonzero__
300
300
301
301
302 class bundleoperation(object):
302 class bundleoperation(object):
303 """an object that represents a single bundling process
303 """an object that represents a single bundling process
304
304
305 Its purpose is to carry unbundle-related objects and states.
305 Its purpose is to carry unbundle-related objects and states.
306
306
307 A new object should be created at the beginning of each bundle processing.
307 A new object should be created at the beginning of each bundle processing.
308 The object is to be returned by the processing function.
308 The object is to be returned by the processing function.
309
309
310 The object has very little content now it will ultimately contain:
310 The object has very little content now it will ultimately contain:
311 * an access to the repo the bundle is applied to,
311 * an access to the repo the bundle is applied to,
312 * a ui object,
312 * a ui object,
313 * a way to retrieve a transaction to add changes to the repo,
313 * a way to retrieve a transaction to add changes to the repo,
314 * a way to record the result of processing each part,
314 * a way to record the result of processing each part,
315 * a way to construct a bundle response when applicable.
315 * a way to construct a bundle response when applicable.
316 """
316 """
317
317
318 def __init__(self, repo, transactiongetter, captureoutput=True, source=b''):
318 def __init__(self, repo, transactiongetter, captureoutput=True, source=b''):
319 self.repo = repo
319 self.repo = repo
320 self.ui = repo.ui
320 self.ui = repo.ui
321 self.records = unbundlerecords()
321 self.records = unbundlerecords()
322 self.reply = None
322 self.reply = None
323 self.captureoutput = captureoutput
323 self.captureoutput = captureoutput
324 self.hookargs = {}
324 self.hookargs = {}
325 self._gettransaction = transactiongetter
325 self._gettransaction = transactiongetter
326 # carries value that can modify part behavior
326 # carries value that can modify part behavior
327 self.modes = {}
327 self.modes = {}
328 self.source = source
328 self.source = source
329
329
330 def gettransaction(self):
330 def gettransaction(self):
331 transaction = self._gettransaction()
331 transaction = self._gettransaction()
332
332
333 if self.hookargs:
333 if self.hookargs:
334 # the ones added to the transaction supercede those added
334 # the ones added to the transaction supercede those added
335 # to the operation.
335 # to the operation.
336 self.hookargs.update(transaction.hookargs)
336 self.hookargs.update(transaction.hookargs)
337 transaction.hookargs = self.hookargs
337 transaction.hookargs = self.hookargs
338
338
339 # mark the hookargs as flushed. further attempts to add to
339 # mark the hookargs as flushed. further attempts to add to
340 # hookargs will result in an abort.
340 # hookargs will result in an abort.
341 self.hookargs = None
341 self.hookargs = None
342
342
343 return transaction
343 return transaction
344
344
345 def addhookargs(self, hookargs):
345 def addhookargs(self, hookargs):
346 if self.hookargs is None:
346 if self.hookargs is None:
347 raise error.ProgrammingError(
347 raise error.ProgrammingError(
348 b'attempted to add hookargs to '
348 b'attempted to add hookargs to '
349 b'operation after transaction started'
349 b'operation after transaction started'
350 )
350 )
351 self.hookargs.update(hookargs)
351 self.hookargs.update(hookargs)
352
352
353
353
354 class TransactionUnavailable(RuntimeError):
354 class TransactionUnavailable(RuntimeError):
355 pass
355 pass
356
356
357
357
358 def _notransaction():
358 def _notransaction():
359 """default method to get a transaction while processing a bundle
359 """default method to get a transaction while processing a bundle
360
360
361 Raise an exception to highlight the fact that no transaction was expected
361 Raise an exception to highlight the fact that no transaction was expected
362 to be created"""
362 to be created"""
363 raise TransactionUnavailable()
363 raise TransactionUnavailable()
364
364
365
365
366 def applybundle(repo, unbundler, tr, source, url=None, **kwargs):
366 def applybundle(repo, unbundler, tr, source, url=None, **kwargs):
367 # transform me into unbundler.apply() as soon as the freeze is lifted
367 # transform me into unbundler.apply() as soon as the freeze is lifted
368 if isinstance(unbundler, unbundle20):
368 if isinstance(unbundler, unbundle20):
369 tr.hookargs[b'bundle2'] = b'1'
369 tr.hookargs[b'bundle2'] = b'1'
370 if source is not None and b'source' not in tr.hookargs:
370 if source is not None and b'source' not in tr.hookargs:
371 tr.hookargs[b'source'] = source
371 tr.hookargs[b'source'] = source
372 if url is not None and b'url' not in tr.hookargs:
372 if url is not None and b'url' not in tr.hookargs:
373 tr.hookargs[b'url'] = url
373 tr.hookargs[b'url'] = url
374 return processbundle(repo, unbundler, lambda: tr, source=source)
374 return processbundle(repo, unbundler, lambda: tr, source=source)
375 else:
375 else:
376 # the transactiongetter won't be used, but we might as well set it
376 # the transactiongetter won't be used, but we might as well set it
377 op = bundleoperation(repo, lambda: tr, source=source)
377 op = bundleoperation(repo, lambda: tr, source=source)
378 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
378 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
379 return op
379 return op
380
380
381
381
382 class partiterator(object):
382 class partiterator(object):
383 def __init__(self, repo, op, unbundler):
383 def __init__(self, repo, op, unbundler):
384 self.repo = repo
384 self.repo = repo
385 self.op = op
385 self.op = op
386 self.unbundler = unbundler
386 self.unbundler = unbundler
387 self.iterator = None
387 self.iterator = None
388 self.count = 0
388 self.count = 0
389 self.current = None
389 self.current = None
390
390
391 def __enter__(self):
391 def __enter__(self):
392 def func():
392 def func():
393 itr = enumerate(self.unbundler.iterparts(), 1)
393 itr = enumerate(self.unbundler.iterparts(), 1)
394 for count, p in itr:
394 for count, p in itr:
395 self.count = count
395 self.count = count
396 self.current = p
396 self.current = p
397 yield p
397 yield p
398 p.consume()
398 p.consume()
399 self.current = None
399 self.current = None
400
400
401 self.iterator = func()
401 self.iterator = func()
402 return self.iterator
402 return self.iterator
403
403
404 def __exit__(self, type, exc, tb):
404 def __exit__(self, type, exc, tb):
405 if not self.iterator:
405 if not self.iterator:
406 return
406 return
407
407
408 # Only gracefully abort in a normal exception situation. User aborts
408 # Only gracefully abort in a normal exception situation. User aborts
409 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
409 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
410 # and should not gracefully cleanup.
410 # and should not gracefully cleanup.
411 if isinstance(exc, Exception):
411 if isinstance(exc, Exception):
412 # Any exceptions seeking to the end of the bundle at this point are
412 # Any exceptions seeking to the end of the bundle at this point are
413 # almost certainly related to the underlying stream being bad.
413 # almost certainly related to the underlying stream being bad.
414 # And, chances are that the exception we're handling is related to
414 # And, chances are that the exception we're handling is related to
415 # getting in that bad state. So, we swallow the seeking error and
415 # getting in that bad state. So, we swallow the seeking error and
416 # re-raise the original error.
416 # re-raise the original error.
417 seekerror = False
417 seekerror = False
418 try:
418 try:
419 if self.current:
419 if self.current:
420 # consume the part content to not corrupt the stream.
420 # consume the part content to not corrupt the stream.
421 self.current.consume()
421 self.current.consume()
422
422
423 for part in self.iterator:
423 for part in self.iterator:
424 # consume the bundle content
424 # consume the bundle content
425 part.consume()
425 part.consume()
426 except Exception:
426 except Exception:
427 seekerror = True
427 seekerror = True
428
428
429 # Small hack to let caller code distinguish exceptions from bundle2
429 # Small hack to let caller code distinguish exceptions from bundle2
430 # processing from processing the old format. This is mostly needed
430 # processing from processing the old format. This is mostly needed
431 # to handle different return codes to unbundle according to the type
431 # to handle different return codes to unbundle according to the type
432 # of bundle. We should probably clean up or drop this return code
432 # of bundle. We should probably clean up or drop this return code
433 # craziness in a future version.
433 # craziness in a future version.
434 exc.duringunbundle2 = True
434 exc.duringunbundle2 = True
435 salvaged = []
435 salvaged = []
436 replycaps = None
436 replycaps = None
437 if self.op.reply is not None:
437 if self.op.reply is not None:
438 salvaged = self.op.reply.salvageoutput()
438 salvaged = self.op.reply.salvageoutput()
439 replycaps = self.op.reply.capabilities
439 replycaps = self.op.reply.capabilities
440 exc._replycaps = replycaps
440 exc._replycaps = replycaps
441 exc._bundle2salvagedoutput = salvaged
441 exc._bundle2salvagedoutput = salvaged
442
442
443 # Re-raising from a variable loses the original stack. So only use
443 # Re-raising from a variable loses the original stack. So only use
444 # that form if we need to.
444 # that form if we need to.
445 if seekerror:
445 if seekerror:
446 raise exc
446 raise exc
447
447
448 self.repo.ui.debug(
448 self.repo.ui.debug(
449 b'bundle2-input-bundle: %i parts total\n' % self.count
449 b'bundle2-input-bundle: %i parts total\n' % self.count
450 )
450 )
451
451
452
452
453 def processbundle(repo, unbundler, transactiongetter=None, op=None, source=b''):
453 def processbundle(repo, unbundler, transactiongetter=None, op=None, source=b''):
454 """This function process a bundle, apply effect to/from a repo
454 """This function process a bundle, apply effect to/from a repo
455
455
456 It iterates over each part then searches for and uses the proper handling
456 It iterates over each part then searches for and uses the proper handling
457 code to process the part. Parts are processed in order.
457 code to process the part. Parts are processed in order.
458
458
459 Unknown Mandatory part will abort the process.
459 Unknown Mandatory part will abort the process.
460
460
461 It is temporarily possible to provide a prebuilt bundleoperation to the
461 It is temporarily possible to provide a prebuilt bundleoperation to the
462 function. This is used to ensure output is properly propagated in case of
462 function. This is used to ensure output is properly propagated in case of
463 an error during the unbundling. This output capturing part will likely be
463 an error during the unbundling. This output capturing part will likely be
464 reworked and this ability will probably go away in the process.
464 reworked and this ability will probably go away in the process.
465 """
465 """
466 if op is None:
466 if op is None:
467 if transactiongetter is None:
467 if transactiongetter is None:
468 transactiongetter = _notransaction
468 transactiongetter = _notransaction
469 op = bundleoperation(repo, transactiongetter, source=source)
469 op = bundleoperation(repo, transactiongetter, source=source)
470 # todo:
470 # todo:
471 # - replace this is a init function soon.
471 # - replace this is a init function soon.
472 # - exception catching
472 # - exception catching
473 unbundler.params
473 unbundler.params
474 if repo.ui.debugflag:
474 if repo.ui.debugflag:
475 msg = [b'bundle2-input-bundle:']
475 msg = [b'bundle2-input-bundle:']
476 if unbundler.params:
476 if unbundler.params:
477 msg.append(b' %i params' % len(unbundler.params))
477 msg.append(b' %i params' % len(unbundler.params))
478 if op._gettransaction is None or op._gettransaction is _notransaction:
478 if op._gettransaction is None or op._gettransaction is _notransaction:
479 msg.append(b' no-transaction')
479 msg.append(b' no-transaction')
480 else:
480 else:
481 msg.append(b' with-transaction')
481 msg.append(b' with-transaction')
482 msg.append(b'\n')
482 msg.append(b'\n')
483 repo.ui.debug(b''.join(msg))
483 repo.ui.debug(b''.join(msg))
484
484
485 processparts(repo, op, unbundler)
485 processparts(repo, op, unbundler)
486
486
487 return op
487 return op
488
488
489
489
490 def processparts(repo, op, unbundler):
490 def processparts(repo, op, unbundler):
491 with partiterator(repo, op, unbundler) as parts:
491 with partiterator(repo, op, unbundler) as parts:
492 for part in parts:
492 for part in parts:
493 _processpart(op, part)
493 _processpart(op, part)
494
494
495
495
496 def _processchangegroup(op, cg, tr, source, url, **kwargs):
496 def _processchangegroup(op, cg, tr, source, url, **kwargs):
497 ret = cg.apply(op.repo, tr, source, url, **kwargs)
497 ret = cg.apply(op.repo, tr, source, url, **kwargs)
498 op.records.add(
498 op.records.add(
499 b'changegroup',
499 b'changegroup',
500 {
500 {
501 b'return': ret,
501 b'return': ret,
502 },
502 },
503 )
503 )
504 return ret
504 return ret
505
505
506
506
507 def _gethandler(op, part):
507 def _gethandler(op, part):
508 status = b'unknown' # used by debug output
508 status = b'unknown' # used by debug output
509 try:
509 try:
510 handler = parthandlermapping.get(part.type)
510 handler = parthandlermapping.get(part.type)
511 if handler is None:
511 if handler is None:
512 status = b'unsupported-type'
512 status = b'unsupported-type'
513 raise error.BundleUnknownFeatureError(parttype=part.type)
513 raise error.BundleUnknownFeatureError(parttype=part.type)
514 indebug(op.ui, b'found a handler for part %s' % part.type)
514 indebug(op.ui, b'found a handler for part %s' % part.type)
515 unknownparams = part.mandatorykeys - handler.params
515 unknownparams = part.mandatorykeys - handler.params
516 if unknownparams:
516 if unknownparams:
517 unknownparams = list(unknownparams)
517 unknownparams = list(unknownparams)
518 unknownparams.sort()
518 unknownparams.sort()
519 status = b'unsupported-params (%s)' % b', '.join(unknownparams)
519 status = b'unsupported-params (%s)' % b', '.join(unknownparams)
520 raise error.BundleUnknownFeatureError(
520 raise error.BundleUnknownFeatureError(
521 parttype=part.type, params=unknownparams
521 parttype=part.type, params=unknownparams
522 )
522 )
523 status = b'supported'
523 status = b'supported'
524 except error.BundleUnknownFeatureError as exc:
524 except error.BundleUnknownFeatureError as exc:
525 if part.mandatory: # mandatory parts
525 if part.mandatory: # mandatory parts
526 raise
526 raise
527 indebug(op.ui, b'ignoring unsupported advisory part %s' % exc)
527 indebug(op.ui, b'ignoring unsupported advisory part %s' % exc)
528 return # skip to part processing
528 return # skip to part processing
529 finally:
529 finally:
530 if op.ui.debugflag:
530 if op.ui.debugflag:
531 msg = [b'bundle2-input-part: "%s"' % part.type]
531 msg = [b'bundle2-input-part: "%s"' % part.type]
532 if not part.mandatory:
532 if not part.mandatory:
533 msg.append(b' (advisory)')
533 msg.append(b' (advisory)')
534 nbmp = len(part.mandatorykeys)
534 nbmp = len(part.mandatorykeys)
535 nbap = len(part.params) - nbmp
535 nbap = len(part.params) - nbmp
536 if nbmp or nbap:
536 if nbmp or nbap:
537 msg.append(b' (params:')
537 msg.append(b' (params:')
538 if nbmp:
538 if nbmp:
539 msg.append(b' %i mandatory' % nbmp)
539 msg.append(b' %i mandatory' % nbmp)
540 if nbap:
540 if nbap:
541 msg.append(b' %i advisory' % nbmp)
541 msg.append(b' %i advisory' % nbmp)
542 msg.append(b')')
542 msg.append(b')')
543 msg.append(b' %s\n' % status)
543 msg.append(b' %s\n' % status)
544 op.ui.debug(b''.join(msg))
544 op.ui.debug(b''.join(msg))
545
545
546 return handler
546 return handler
547
547
548
548
549 def _processpart(op, part):
549 def _processpart(op, part):
550 """process a single part from a bundle
550 """process a single part from a bundle
551
551
552 The part is guaranteed to have been fully consumed when the function exits
552 The part is guaranteed to have been fully consumed when the function exits
553 (even if an exception is raised)."""
553 (even if an exception is raised)."""
554 handler = _gethandler(op, part)
554 handler = _gethandler(op, part)
555 if handler is None:
555 if handler is None:
556 return
556 return
557
557
558 # handler is called outside the above try block so that we don't
558 # handler is called outside the above try block so that we don't
559 # risk catching KeyErrors from anything other than the
559 # risk catching KeyErrors from anything other than the
560 # parthandlermapping lookup (any KeyError raised by handler()
560 # parthandlermapping lookup (any KeyError raised by handler()
561 # itself represents a defect of a different variety).
561 # itself represents a defect of a different variety).
562 output = None
562 output = None
563 if op.captureoutput and op.reply is not None:
563 if op.captureoutput and op.reply is not None:
564 op.ui.pushbuffer(error=True, subproc=True)
564 op.ui.pushbuffer(error=True, subproc=True)
565 output = b''
565 output = b''
566 try:
566 try:
567 handler(op, part)
567 handler(op, part)
568 finally:
568 finally:
569 if output is not None:
569 if output is not None:
570 output = op.ui.popbuffer()
570 output = op.ui.popbuffer()
571 if output:
571 if output:
572 outpart = op.reply.newpart(b'output', data=output, mandatory=False)
572 outpart = op.reply.newpart(b'output', data=output, mandatory=False)
573 outpart.addparam(
573 outpart.addparam(
574 b'in-reply-to', pycompat.bytestr(part.id), mandatory=False
574 b'in-reply-to', pycompat.bytestr(part.id), mandatory=False
575 )
575 )
576
576
577
577
578 def decodecaps(blob):
578 def decodecaps(blob):
579 """decode a bundle2 caps bytes blob into a dictionary
579 """decode a bundle2 caps bytes blob into a dictionary
580
580
581 The blob is a list of capabilities (one per line)
581 The blob is a list of capabilities (one per line)
582 Capabilities may have values using a line of the form::
582 Capabilities may have values using a line of the form::
583
583
584 capability=value1,value2,value3
584 capability=value1,value2,value3
585
585
586 The values are always a list."""
586 The values are always a list."""
587 caps = {}
587 caps = {}
588 for line in blob.splitlines():
588 for line in blob.splitlines():
589 if not line:
589 if not line:
590 continue
590 continue
591 if b'=' not in line:
591 if b'=' not in line:
592 key, vals = line, ()
592 key, vals = line, ()
593 else:
593 else:
594 key, vals = line.split(b'=', 1)
594 key, vals = line.split(b'=', 1)
595 vals = vals.split(b',')
595 vals = vals.split(b',')
596 key = urlreq.unquote(key)
596 key = urlreq.unquote(key)
597 vals = [urlreq.unquote(v) for v in vals]
597 vals = [urlreq.unquote(v) for v in vals]
598 caps[key] = vals
598 caps[key] = vals
599 return caps
599 return caps
600
600
601
601
602 def encodecaps(caps):
602 def encodecaps(caps):
603 """encode a bundle2 caps dictionary into a bytes blob"""
603 """encode a bundle2 caps dictionary into a bytes blob"""
604 chunks = []
604 chunks = []
605 for ca in sorted(caps):
605 for ca in sorted(caps):
606 vals = caps[ca]
606 vals = caps[ca]
607 ca = urlreq.quote(ca)
607 ca = urlreq.quote(ca)
608 vals = [urlreq.quote(v) for v in vals]
608 vals = [urlreq.quote(v) for v in vals]
609 if vals:
609 if vals:
610 ca = b"%s=%s" % (ca, b','.join(vals))
610 ca = b"%s=%s" % (ca, b','.join(vals))
611 chunks.append(ca)
611 chunks.append(ca)
612 return b'\n'.join(chunks)
612 return b'\n'.join(chunks)
613
613
614
614
615 bundletypes = {
615 bundletypes = {
616 b"": (b"", b'UN'), # only when using unbundle on ssh and old http servers
616 b"": (b"", b'UN'), # only when using unbundle on ssh and old http servers
617 # since the unification ssh accepts a header but there
617 # since the unification ssh accepts a header but there
618 # is no capability signaling it.
618 # is no capability signaling it.
619 b"HG20": (), # special-cased below
619 b"HG20": (), # special-cased below
620 b"HG10UN": (b"HG10UN", b'UN'),
620 b"HG10UN": (b"HG10UN", b'UN'),
621 b"HG10BZ": (b"HG10", b'BZ'),
621 b"HG10BZ": (b"HG10", b'BZ'),
622 b"HG10GZ": (b"HG10GZ", b'GZ'),
622 b"HG10GZ": (b"HG10GZ", b'GZ'),
623 }
623 }
624
624
625 # hgweb uses this list to communicate its preferred type
625 # hgweb uses this list to communicate its preferred type
626 bundlepriority = [b'HG10GZ', b'HG10BZ', b'HG10UN']
626 bundlepriority = [b'HG10GZ', b'HG10BZ', b'HG10UN']
627
627
628
628
629 class bundle20(object):
629 class bundle20(object):
630 """represent an outgoing bundle2 container
630 """represent an outgoing bundle2 container
631
631
632 Use the `addparam` method to add stream level parameter. and `newpart` to
632 Use the `addparam` method to add stream level parameter. and `newpart` to
633 populate it. Then call `getchunks` to retrieve all the binary chunks of
633 populate it. Then call `getchunks` to retrieve all the binary chunks of
634 data that compose the bundle2 container."""
634 data that compose the bundle2 container."""
635
635
636 _magicstring = b'HG20'
636 _magicstring = b'HG20'
637
637
638 def __init__(self, ui, capabilities=()):
638 def __init__(self, ui, capabilities=()):
639 self.ui = ui
639 self.ui = ui
640 self._params = []
640 self._params = []
641 self._parts = []
641 self._parts = []
642 self.capabilities = dict(capabilities)
642 self.capabilities = dict(capabilities)
643 self._compengine = util.compengines.forbundletype(b'UN')
643 self._compengine = util.compengines.forbundletype(b'UN')
644 self._compopts = None
644 self._compopts = None
645 # If compression is being handled by a consumer of the raw
645 # If compression is being handled by a consumer of the raw
646 # data (e.g. the wire protocol), unsetting this flag tells
646 # data (e.g. the wire protocol), unsetting this flag tells
647 # consumers that the bundle is best left uncompressed.
647 # consumers that the bundle is best left uncompressed.
648 self.prefercompressed = True
648 self.prefercompressed = True
649
649
650 def setcompression(self, alg, compopts=None):
650 def setcompression(self, alg, compopts=None):
651 """setup core part compression to <alg>"""
651 """setup core part compression to <alg>"""
652 if alg in (None, b'UN'):
652 if alg in (None, b'UN'):
653 return
653 return
654 assert not any(n.lower() == b'compression' for n, v in self._params)
654 assert not any(n.lower() == b'compression' for n, v in self._params)
655 self.addparam(b'Compression', alg)
655 self.addparam(b'Compression', alg)
656 self._compengine = util.compengines.forbundletype(alg)
656 self._compengine = util.compengines.forbundletype(alg)
657 self._compopts = compopts
657 self._compopts = compopts
658
658
659 @property
659 @property
660 def nbparts(self):
660 def nbparts(self):
661 """total number of parts added to the bundler"""
661 """total number of parts added to the bundler"""
662 return len(self._parts)
662 return len(self._parts)
663
663
664 # methods used to defines the bundle2 content
664 # methods used to defines the bundle2 content
665 def addparam(self, name, value=None):
665 def addparam(self, name, value=None):
666 """add a stream level parameter"""
666 """add a stream level parameter"""
667 if not name:
667 if not name:
668 raise error.ProgrammingError(b'empty parameter name')
668 raise error.ProgrammingError(b'empty parameter name')
669 if name[0:1] not in pycompat.bytestr(
669 if name[0:1] not in pycompat.bytestr(
670 string.ascii_letters # pytype: disable=wrong-arg-types
670 string.ascii_letters # pytype: disable=wrong-arg-types
671 ):
671 ):
672 raise error.ProgrammingError(
672 raise error.ProgrammingError(
673 b'non letter first character: %s' % name
673 b'non letter first character: %s' % name
674 )
674 )
675 self._params.append((name, value))
675 self._params.append((name, value))
676
676
677 def addpart(self, part):
677 def addpart(self, part):
678 """add a new part to the bundle2 container
678 """add a new part to the bundle2 container
679
679
680 Parts contains the actual applicative payload."""
680 Parts contains the actual applicative payload."""
681 assert part.id is None
681 assert part.id is None
682 part.id = len(self._parts) # very cheap counter
682 part.id = len(self._parts) # very cheap counter
683 self._parts.append(part)
683 self._parts.append(part)
684
684
685 def newpart(self, typeid, *args, **kwargs):
685 def newpart(self, typeid, *args, **kwargs):
686 """create a new part and add it to the containers
686 """create a new part and add it to the containers
687
687
688 As the part is directly added to the containers. For now, this means
688 As the part is directly added to the containers. For now, this means
689 that any failure to properly initialize the part after calling
689 that any failure to properly initialize the part after calling
690 ``newpart`` should result in a failure of the whole bundling process.
690 ``newpart`` should result in a failure of the whole bundling process.
691
691
692 You can still fall back to manually create and add if you need better
692 You can still fall back to manually create and add if you need better
693 control."""
693 control."""
694 part = bundlepart(typeid, *args, **kwargs)
694 part = bundlepart(typeid, *args, **kwargs)
695 self.addpart(part)
695 self.addpart(part)
696 return part
696 return part
697
697
698 # methods used to generate the bundle2 stream
698 # methods used to generate the bundle2 stream
699 def getchunks(self):
699 def getchunks(self):
700 if self.ui.debugflag:
700 if self.ui.debugflag:
701 msg = [b'bundle2-output-bundle: "%s",' % self._magicstring]
701 msg = [b'bundle2-output-bundle: "%s",' % self._magicstring]
702 if self._params:
702 if self._params:
703 msg.append(b' (%i params)' % len(self._params))
703 msg.append(b' (%i params)' % len(self._params))
704 msg.append(b' %i parts total\n' % len(self._parts))
704 msg.append(b' %i parts total\n' % len(self._parts))
705 self.ui.debug(b''.join(msg))
705 self.ui.debug(b''.join(msg))
706 outdebug(self.ui, b'start emission of %s stream' % self._magicstring)
706 outdebug(self.ui, b'start emission of %s stream' % self._magicstring)
707 yield self._magicstring
707 yield self._magicstring
708 param = self._paramchunk()
708 param = self._paramchunk()
709 outdebug(self.ui, b'bundle parameter: %s' % param)
709 outdebug(self.ui, b'bundle parameter: %s' % param)
710 yield _pack(_fstreamparamsize, len(param))
710 yield _pack(_fstreamparamsize, len(param))
711 if param:
711 if param:
712 yield param
712 yield param
713 for chunk in self._compengine.compressstream(
713 for chunk in self._compengine.compressstream(
714 self._getcorechunk(), self._compopts
714 self._getcorechunk(), self._compopts
715 ):
715 ):
716 yield chunk
716 yield chunk
717
717
718 def _paramchunk(self):
718 def _paramchunk(self):
719 """return a encoded version of all stream parameters"""
719 """return a encoded version of all stream parameters"""
720 blocks = []
720 blocks = []
721 for par, value in self._params:
721 for par, value in self._params:
722 par = urlreq.quote(par)
722 par = urlreq.quote(par)
723 if value is not None:
723 if value is not None:
724 value = urlreq.quote(value)
724 value = urlreq.quote(value)
725 par = b'%s=%s' % (par, value)
725 par = b'%s=%s' % (par, value)
726 blocks.append(par)
726 blocks.append(par)
727 return b' '.join(blocks)
727 return b' '.join(blocks)
728
728
729 def _getcorechunk(self):
729 def _getcorechunk(self):
730 """yield chunk for the core part of the bundle
730 """yield chunk for the core part of the bundle
731
731
732 (all but headers and parameters)"""
732 (all but headers and parameters)"""
733 outdebug(self.ui, b'start of parts')
733 outdebug(self.ui, b'start of parts')
734 for part in self._parts:
734 for part in self._parts:
735 outdebug(self.ui, b'bundle part: "%s"' % part.type)
735 outdebug(self.ui, b'bundle part: "%s"' % part.type)
736 for chunk in part.getchunks(ui=self.ui):
736 for chunk in part.getchunks(ui=self.ui):
737 yield chunk
737 yield chunk
738 outdebug(self.ui, b'end of bundle')
738 outdebug(self.ui, b'end of bundle')
739 yield _pack(_fpartheadersize, 0)
739 yield _pack(_fpartheadersize, 0)
740
740
741 def salvageoutput(self):
741 def salvageoutput(self):
742 """return a list with a copy of all output parts in the bundle
742 """return a list with a copy of all output parts in the bundle
743
743
744 This is meant to be used during error handling to make sure we preserve
744 This is meant to be used during error handling to make sure we preserve
745 server output"""
745 server output"""
746 salvaged = []
746 salvaged = []
747 for part in self._parts:
747 for part in self._parts:
748 if part.type.startswith(b'output'):
748 if part.type.startswith(b'output'):
749 salvaged.append(part.copy())
749 salvaged.append(part.copy())
750 return salvaged
750 return salvaged
751
751
752
752
753 class unpackermixin(object):
753 class unpackermixin(object):
754 """A mixin to extract bytes and struct data from a stream"""
754 """A mixin to extract bytes and struct data from a stream"""
755
755
756 def __init__(self, fp):
756 def __init__(self, fp):
757 self._fp = fp
757 self._fp = fp
758
758
759 def _unpack(self, format):
759 def _unpack(self, format):
760 """unpack this struct format from the stream
760 """unpack this struct format from the stream
761
761
762 This method is meant for internal usage by the bundle2 protocol only.
762 This method is meant for internal usage by the bundle2 protocol only.
763 They directly manipulate the low level stream including bundle2 level
763 They directly manipulate the low level stream including bundle2 level
764 instruction.
764 instruction.
765
765
766 Do not use it to implement higher-level logic or methods."""
766 Do not use it to implement higher-level logic or methods."""
767 data = self._readexact(struct.calcsize(format))
767 data = self._readexact(struct.calcsize(format))
768 return _unpack(format, data)
768 return _unpack(format, data)
769
769
770 def _readexact(self, size):
770 def _readexact(self, size):
771 """read exactly <size> bytes from the stream
771 """read exactly <size> bytes from the stream
772
772
773 This method is meant for internal usage by the bundle2 protocol only.
773 This method is meant for internal usage by the bundle2 protocol only.
774 They directly manipulate the low level stream including bundle2 level
774 They directly manipulate the low level stream including bundle2 level
775 instruction.
775 instruction.
776
776
777 Do not use it to implement higher-level logic or methods."""
777 Do not use it to implement higher-level logic or methods."""
778 return changegroup.readexactly(self._fp, size)
778 return changegroup.readexactly(self._fp, size)
779
779
780
780
781 def getunbundler(ui, fp, magicstring=None):
781 def getunbundler(ui, fp, magicstring=None):
782 """return a valid unbundler object for a given magicstring"""
782 """return a valid unbundler object for a given magicstring"""
783 if magicstring is None:
783 if magicstring is None:
784 magicstring = changegroup.readexactly(fp, 4)
784 magicstring = changegroup.readexactly(fp, 4)
785 magic, version = magicstring[0:2], magicstring[2:4]
785 magic, version = magicstring[0:2], magicstring[2:4]
786 if magic != b'HG':
786 if magic != b'HG':
787 ui.debug(
787 ui.debug(
788 b"error: invalid magic: %r (version %r), should be 'HG'\n"
788 b"error: invalid magic: %r (version %r), should be 'HG'\n"
789 % (magic, version)
789 % (magic, version)
790 )
790 )
791 raise error.Abort(_(b'not a Mercurial bundle'))
791 raise error.Abort(_(b'not a Mercurial bundle'))
792 unbundlerclass = formatmap.get(version)
792 unbundlerclass = formatmap.get(version)
793 if unbundlerclass is None:
793 if unbundlerclass is None:
794 raise error.Abort(_(b'unknown bundle version %s') % version)
794 raise error.Abort(_(b'unknown bundle version %s') % version)
795 unbundler = unbundlerclass(ui, fp)
795 unbundler = unbundlerclass(ui, fp)
796 indebug(ui, b'start processing of %s stream' % magicstring)
796 indebug(ui, b'start processing of %s stream' % magicstring)
797 return unbundler
797 return unbundler
798
798
799
799
800 class unbundle20(unpackermixin):
800 class unbundle20(unpackermixin):
801 """interpret a bundle2 stream
801 """interpret a bundle2 stream
802
802
803 This class is fed with a binary stream and yields parts through its
803 This class is fed with a binary stream and yields parts through its
804 `iterparts` methods."""
804 `iterparts` methods."""
805
805
806 _magicstring = b'HG20'
806 _magicstring = b'HG20'
807
807
808 def __init__(self, ui, fp):
808 def __init__(self, ui, fp):
809 """If header is specified, we do not read it out of the stream."""
809 """If header is specified, we do not read it out of the stream."""
810 self.ui = ui
810 self.ui = ui
811 self._compengine = util.compengines.forbundletype(b'UN')
811 self._compengine = util.compengines.forbundletype(b'UN')
812 self._compressed = None
812 self._compressed = None
813 super(unbundle20, self).__init__(fp)
813 super(unbundle20, self).__init__(fp)
814
814
815 @util.propertycache
815 @util.propertycache
816 def params(self):
816 def params(self):
817 """dictionary of stream level parameters"""
817 """dictionary of stream level parameters"""
818 indebug(self.ui, b'reading bundle2 stream parameters')
818 indebug(self.ui, b'reading bundle2 stream parameters')
819 params = {}
819 params = {}
820 paramssize = self._unpack(_fstreamparamsize)[0]
820 paramssize = self._unpack(_fstreamparamsize)[0]
821 if paramssize < 0:
821 if paramssize < 0:
822 raise error.BundleValueError(
822 raise error.BundleValueError(
823 b'negative bundle param size: %i' % paramssize
823 b'negative bundle param size: %i' % paramssize
824 )
824 )
825 if paramssize:
825 if paramssize:
826 params = self._readexact(paramssize)
826 params = self._readexact(paramssize)
827 params = self._processallparams(params)
827 params = self._processallparams(params)
828 return params
828 return params
829
829
830 def _processallparams(self, paramsblock):
830 def _processallparams(self, paramsblock):
831 """"""
831 """"""
832 params = util.sortdict()
832 params = util.sortdict()
833 for p in paramsblock.split(b' '):
833 for p in paramsblock.split(b' '):
834 p = p.split(b'=', 1)
834 p = p.split(b'=', 1)
835 p = [urlreq.unquote(i) for i in p]
835 p = [urlreq.unquote(i) for i in p]
836 if len(p) < 2:
836 if len(p) < 2:
837 p.append(None)
837 p.append(None)
838 self._processparam(*p)
838 self._processparam(*p)
839 params[p[0]] = p[1]
839 params[p[0]] = p[1]
840 return params
840 return params
841
841
842 def _processparam(self, name, value):
842 def _processparam(self, name, value):
843 """process a parameter, applying its effect if needed
843 """process a parameter, applying its effect if needed
844
844
845 Parameter starting with a lower case letter are advisory and will be
845 Parameter starting with a lower case letter are advisory and will be
846 ignored when unknown. Those starting with an upper case letter are
846 ignored when unknown. Those starting with an upper case letter are
847 mandatory and will this function will raise a KeyError when unknown.
847 mandatory and will this function will raise a KeyError when unknown.
848
848
849 Note: no option are currently supported. Any input will be either
849 Note: no option are currently supported. Any input will be either
850 ignored or failing.
850 ignored or failing.
851 """
851 """
852 if not name:
852 if not name:
853 raise ValueError('empty parameter name')
853 raise ValueError('empty parameter name')
854 if name[0:1] not in pycompat.bytestr(
854 if name[0:1] not in pycompat.bytestr(
855 string.ascii_letters # pytype: disable=wrong-arg-types
855 string.ascii_letters # pytype: disable=wrong-arg-types
856 ):
856 ):
857 raise ValueError('non letter first character: %s' % name)
857 raise ValueError('non letter first character: %s' % name)
858 try:
858 try:
859 handler = b2streamparamsmap[name.lower()]
859 handler = b2streamparamsmap[name.lower()]
860 except KeyError:
860 except KeyError:
861 if name[0:1].islower():
861 if name[0:1].islower():
862 indebug(self.ui, b"ignoring unknown parameter %s" % name)
862 indebug(self.ui, b"ignoring unknown parameter %s" % name)
863 else:
863 else:
864 raise error.BundleUnknownFeatureError(params=(name,))
864 raise error.BundleUnknownFeatureError(params=(name,))
865 else:
865 else:
866 handler(self, name, value)
866 handler(self, name, value)
867
867
868 def _forwardchunks(self):
868 def _forwardchunks(self):
869 """utility to transfer a bundle2 as binary
869 """utility to transfer a bundle2 as binary
870
870
871 This is made necessary by the fact the 'getbundle' command over 'ssh'
871 This is made necessary by the fact the 'getbundle' command over 'ssh'
872 have no way to know then the reply end, relying on the bundle to be
872 have no way to know then the reply end, relying on the bundle to be
873 interpreted to know its end. This is terrible and we are sorry, but we
873 interpreted to know its end. This is terrible and we are sorry, but we
874 needed to move forward to get general delta enabled.
874 needed to move forward to get general delta enabled.
875 """
875 """
876 yield self._magicstring
876 yield self._magicstring
877 assert 'params' not in vars(self)
877 assert 'params' not in vars(self)
878 paramssize = self._unpack(_fstreamparamsize)[0]
878 paramssize = self._unpack(_fstreamparamsize)[0]
879 if paramssize < 0:
879 if paramssize < 0:
880 raise error.BundleValueError(
880 raise error.BundleValueError(
881 b'negative bundle param size: %i' % paramssize
881 b'negative bundle param size: %i' % paramssize
882 )
882 )
883 if paramssize:
883 if paramssize:
884 params = self._readexact(paramssize)
884 params = self._readexact(paramssize)
885 self._processallparams(params)
885 self._processallparams(params)
886 # The payload itself is decompressed below, so drop
886 # The payload itself is decompressed below, so drop
887 # the compression parameter passed down to compensate.
887 # the compression parameter passed down to compensate.
888 outparams = []
888 outparams = []
889 for p in params.split(b' '):
889 for p in params.split(b' '):
890 k, v = p.split(b'=', 1)
890 k, v = p.split(b'=', 1)
891 if k.lower() != b'compression':
891 if k.lower() != b'compression':
892 outparams.append(p)
892 outparams.append(p)
893 outparams = b' '.join(outparams)
893 outparams = b' '.join(outparams)
894 yield _pack(_fstreamparamsize, len(outparams))
894 yield _pack(_fstreamparamsize, len(outparams))
895 yield outparams
895 yield outparams
896 else:
896 else:
897 yield _pack(_fstreamparamsize, paramssize)
897 yield _pack(_fstreamparamsize, paramssize)
898 # From there, payload might need to be decompressed
898 # From there, payload might need to be decompressed
899 self._fp = self._compengine.decompressorreader(self._fp)
899 self._fp = self._compengine.decompressorreader(self._fp)
900 emptycount = 0
900 emptycount = 0
901 while emptycount < 2:
901 while emptycount < 2:
902 # so we can brainlessly loop
902 # so we can brainlessly loop
903 assert _fpartheadersize == _fpayloadsize
903 assert _fpartheadersize == _fpayloadsize
904 size = self._unpack(_fpartheadersize)[0]
904 size = self._unpack(_fpartheadersize)[0]
905 yield _pack(_fpartheadersize, size)
905 yield _pack(_fpartheadersize, size)
906 if size:
906 if size:
907 emptycount = 0
907 emptycount = 0
908 else:
908 else:
909 emptycount += 1
909 emptycount += 1
910 continue
910 continue
911 if size == flaginterrupt:
911 if size == flaginterrupt:
912 continue
912 continue
913 elif size < 0:
913 elif size < 0:
914 raise error.BundleValueError(b'negative chunk size: %i')
914 raise error.BundleValueError(b'negative chunk size: %i')
915 yield self._readexact(size)
915 yield self._readexact(size)
916
916
917 def iterparts(self, seekable=False):
917 def iterparts(self, seekable=False):
918 """yield all parts contained in the stream"""
918 """yield all parts contained in the stream"""
919 cls = seekableunbundlepart if seekable else unbundlepart
919 cls = seekableunbundlepart if seekable else unbundlepart
920 # make sure param have been loaded
920 # make sure param have been loaded
921 self.params
921 self.params
922 # From there, payload need to be decompressed
922 # From there, payload need to be decompressed
923 self._fp = self._compengine.decompressorreader(self._fp)
923 self._fp = self._compengine.decompressorreader(self._fp)
924 indebug(self.ui, b'start extraction of bundle2 parts')
924 indebug(self.ui, b'start extraction of bundle2 parts')
925 headerblock = self._readpartheader()
925 headerblock = self._readpartheader()
926 while headerblock is not None:
926 while headerblock is not None:
927 part = cls(self.ui, headerblock, self._fp)
927 part = cls(self.ui, headerblock, self._fp)
928 yield part
928 yield part
929 # Ensure part is fully consumed so we can start reading the next
929 # Ensure part is fully consumed so we can start reading the next
930 # part.
930 # part.
931 part.consume()
931 part.consume()
932
932
933 headerblock = self._readpartheader()
933 headerblock = self._readpartheader()
934 indebug(self.ui, b'end of bundle2 stream')
934 indebug(self.ui, b'end of bundle2 stream')
935
935
936 def _readpartheader(self):
936 def _readpartheader(self):
937 """reads a part header size and return the bytes blob
937 """reads a part header size and return the bytes blob
938
938
939 returns None if empty"""
939 returns None if empty"""
940 headersize = self._unpack(_fpartheadersize)[0]
940 headersize = self._unpack(_fpartheadersize)[0]
941 if headersize < 0:
941 if headersize < 0:
942 raise error.BundleValueError(
942 raise error.BundleValueError(
943 b'negative part header size: %i' % headersize
943 b'negative part header size: %i' % headersize
944 )
944 )
945 indebug(self.ui, b'part header size: %i' % headersize)
945 indebug(self.ui, b'part header size: %i' % headersize)
946 if headersize:
946 if headersize:
947 return self._readexact(headersize)
947 return self._readexact(headersize)
948 return None
948 return None
949
949
950 def compressed(self):
950 def compressed(self):
951 self.params # load params
951 self.params # load params
952 return self._compressed
952 return self._compressed
953
953
954 def close(self):
954 def close(self):
955 """close underlying file"""
955 """close underlying file"""
956 if util.safehasattr(self._fp, 'close'):
956 if util.safehasattr(self._fp, 'close'):
957 return self._fp.close()
957 return self._fp.close()
958
958
959
959
960 formatmap = {b'20': unbundle20}
960 formatmap = {b'20': unbundle20}
961
961
962 b2streamparamsmap = {}
962 b2streamparamsmap = {}
963
963
964
964
965 def b2streamparamhandler(name):
965 def b2streamparamhandler(name):
966 """register a handler for a stream level parameter"""
966 """register a handler for a stream level parameter"""
967
967
968 def decorator(func):
968 def decorator(func):
969 assert name not in formatmap
969 assert name not in formatmap
970 b2streamparamsmap[name] = func
970 b2streamparamsmap[name] = func
971 return func
971 return func
972
972
973 return decorator
973 return decorator
974
974
975
975
976 @b2streamparamhandler(b'compression')
976 @b2streamparamhandler(b'compression')
977 def processcompression(unbundler, param, value):
977 def processcompression(unbundler, param, value):
978 """read compression parameter and install payload decompression"""
978 """read compression parameter and install payload decompression"""
979 if value not in util.compengines.supportedbundletypes:
979 if value not in util.compengines.supportedbundletypes:
980 raise error.BundleUnknownFeatureError(params=(param,), values=(value,))
980 raise error.BundleUnknownFeatureError(params=(param,), values=(value,))
981 unbundler._compengine = util.compengines.forbundletype(value)
981 unbundler._compengine = util.compengines.forbundletype(value)
982 if value is not None:
982 if value is not None:
983 unbundler._compressed = True
983 unbundler._compressed = True
984
984
985
985
986 class bundlepart(object):
986 class bundlepart(object):
987 """A bundle2 part contains application level payload
987 """A bundle2 part contains application level payload
988
988
989 The part `type` is used to route the part to the application level
989 The part `type` is used to route the part to the application level
990 handler.
990 handler.
991
991
992 The part payload is contained in ``part.data``. It could be raw bytes or a
992 The part payload is contained in ``part.data``. It could be raw bytes or a
993 generator of byte chunks.
993 generator of byte chunks.
994
994
995 You can add parameters to the part using the ``addparam`` method.
995 You can add parameters to the part using the ``addparam`` method.
996 Parameters can be either mandatory (default) or advisory. Remote side
996 Parameters can be either mandatory (default) or advisory. Remote side
997 should be able to safely ignore the advisory ones.
997 should be able to safely ignore the advisory ones.
998
998
999 Both data and parameters cannot be modified after the generation has begun.
999 Both data and parameters cannot be modified after the generation has begun.
1000 """
1000 """
1001
1001
1002 def __init__(
1002 def __init__(
1003 self,
1003 self,
1004 parttype,
1004 parttype,
1005 mandatoryparams=(),
1005 mandatoryparams=(),
1006 advisoryparams=(),
1006 advisoryparams=(),
1007 data=b'',
1007 data=b'',
1008 mandatory=True,
1008 mandatory=True,
1009 ):
1009 ):
1010 validateparttype(parttype)
1010 validateparttype(parttype)
1011 self.id = None
1011 self.id = None
1012 self.type = parttype
1012 self.type = parttype
1013 self._data = data
1013 self._data = data
1014 self._mandatoryparams = list(mandatoryparams)
1014 self._mandatoryparams = list(mandatoryparams)
1015 self._advisoryparams = list(advisoryparams)
1015 self._advisoryparams = list(advisoryparams)
1016 # checking for duplicated entries
1016 # checking for duplicated entries
1017 self._seenparams = set()
1017 self._seenparams = set()
1018 for pname, __ in self._mandatoryparams + self._advisoryparams:
1018 for pname, __ in self._mandatoryparams + self._advisoryparams:
1019 if pname in self._seenparams:
1019 if pname in self._seenparams:
1020 raise error.ProgrammingError(b'duplicated params: %s' % pname)
1020 raise error.ProgrammingError(b'duplicated params: %s' % pname)
1021 self._seenparams.add(pname)
1021 self._seenparams.add(pname)
1022 # status of the part's generation:
1022 # status of the part's generation:
1023 # - None: not started,
1023 # - None: not started,
1024 # - False: currently generated,
1024 # - False: currently generated,
1025 # - True: generation done.
1025 # - True: generation done.
1026 self._generated = None
1026 self._generated = None
1027 self.mandatory = mandatory
1027 self.mandatory = mandatory
1028
1028
1029 def __repr__(self):
1029 def __repr__(self):
1030 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
1030 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
1031 return '<%s object at %x; id: %s; type: %s; mandatory: %s>' % (
1031 return '<%s object at %x; id: %s; type: %s; mandatory: %s>' % (
1032 cls,
1032 cls,
1033 id(self),
1033 id(self),
1034 self.id,
1034 self.id,
1035 self.type,
1035 self.type,
1036 self.mandatory,
1036 self.mandatory,
1037 )
1037 )
1038
1038
1039 def copy(self):
1039 def copy(self):
1040 """return a copy of the part
1040 """return a copy of the part
1041
1041
1042 The new part have the very same content but no partid assigned yet.
1042 The new part have the very same content but no partid assigned yet.
1043 Parts with generated data cannot be copied."""
1043 Parts with generated data cannot be copied."""
1044 assert not util.safehasattr(self.data, 'next')
1044 assert not util.safehasattr(self.data, 'next')
1045 return self.__class__(
1045 return self.__class__(
1046 self.type,
1046 self.type,
1047 self._mandatoryparams,
1047 self._mandatoryparams,
1048 self._advisoryparams,
1048 self._advisoryparams,
1049 self._data,
1049 self._data,
1050 self.mandatory,
1050 self.mandatory,
1051 )
1051 )
1052
1052
1053 # methods used to defines the part content
1053 # methods used to defines the part content
1054 @property
1054 @property
1055 def data(self):
1055 def data(self):
1056 return self._data
1056 return self._data
1057
1057
1058 @data.setter
1058 @data.setter
1059 def data(self, data):
1059 def data(self, data):
1060 if self._generated is not None:
1060 if self._generated is not None:
1061 raise error.ReadOnlyPartError(b'part is being generated')
1061 raise error.ReadOnlyPartError(b'part is being generated')
1062 self._data = data
1062 self._data = data
1063
1063
1064 @property
1064 @property
1065 def mandatoryparams(self):
1065 def mandatoryparams(self):
1066 # make it an immutable tuple to force people through ``addparam``
1066 # make it an immutable tuple to force people through ``addparam``
1067 return tuple(self._mandatoryparams)
1067 return tuple(self._mandatoryparams)
1068
1068
1069 @property
1069 @property
1070 def advisoryparams(self):
1070 def advisoryparams(self):
1071 # make it an immutable tuple to force people through ``addparam``
1071 # make it an immutable tuple to force people through ``addparam``
1072 return tuple(self._advisoryparams)
1072 return tuple(self._advisoryparams)
1073
1073
1074 def addparam(self, name, value=b'', mandatory=True):
1074 def addparam(self, name, value=b'', mandatory=True):
1075 """add a parameter to the part
1075 """add a parameter to the part
1076
1076
1077 If 'mandatory' is set to True, the remote handler must claim support
1077 If 'mandatory' is set to True, the remote handler must claim support
1078 for this parameter or the unbundling will be aborted.
1078 for this parameter or the unbundling will be aborted.
1079
1079
1080 The 'name' and 'value' cannot exceed 255 bytes each.
1080 The 'name' and 'value' cannot exceed 255 bytes each.
1081 """
1081 """
1082 if self._generated is not None:
1082 if self._generated is not None:
1083 raise error.ReadOnlyPartError(b'part is being generated')
1083 raise error.ReadOnlyPartError(b'part is being generated')
1084 if name in self._seenparams:
1084 if name in self._seenparams:
1085 raise ValueError(b'duplicated params: %s' % name)
1085 raise ValueError(b'duplicated params: %s' % name)
1086 self._seenparams.add(name)
1086 self._seenparams.add(name)
1087 params = self._advisoryparams
1087 params = self._advisoryparams
1088 if mandatory:
1088 if mandatory:
1089 params = self._mandatoryparams
1089 params = self._mandatoryparams
1090 params.append((name, value))
1090 params.append((name, value))
1091
1091
1092 # methods used to generates the bundle2 stream
1092 # methods used to generates the bundle2 stream
1093 def getchunks(self, ui):
1093 def getchunks(self, ui):
1094 if self._generated is not None:
1094 if self._generated is not None:
1095 raise error.ProgrammingError(b'part can only be consumed once')
1095 raise error.ProgrammingError(b'part can only be consumed once')
1096 self._generated = False
1096 self._generated = False
1097
1097
1098 if ui.debugflag:
1098 if ui.debugflag:
1099 msg = [b'bundle2-output-part: "%s"' % self.type]
1099 msg = [b'bundle2-output-part: "%s"' % self.type]
1100 if not self.mandatory:
1100 if not self.mandatory:
1101 msg.append(b' (advisory)')
1101 msg.append(b' (advisory)')
1102 nbmp = len(self.mandatoryparams)
1102 nbmp = len(self.mandatoryparams)
1103 nbap = len(self.advisoryparams)
1103 nbap = len(self.advisoryparams)
1104 if nbmp or nbap:
1104 if nbmp or nbap:
1105 msg.append(b' (params:')
1105 msg.append(b' (params:')
1106 if nbmp:
1106 if nbmp:
1107 msg.append(b' %i mandatory' % nbmp)
1107 msg.append(b' %i mandatory' % nbmp)
1108 if nbap:
1108 if nbap:
1109 msg.append(b' %i advisory' % nbmp)
1109 msg.append(b' %i advisory' % nbmp)
1110 msg.append(b')')
1110 msg.append(b')')
1111 if not self.data:
1111 if not self.data:
1112 msg.append(b' empty payload')
1112 msg.append(b' empty payload')
1113 elif util.safehasattr(self.data, 'next') or util.safehasattr(
1113 elif util.safehasattr(self.data, 'next') or util.safehasattr(
1114 self.data, b'__next__'
1114 self.data, b'__next__'
1115 ):
1115 ):
1116 msg.append(b' streamed payload')
1116 msg.append(b' streamed payload')
1117 else:
1117 else:
1118 msg.append(b' %i bytes payload' % len(self.data))
1118 msg.append(b' %i bytes payload' % len(self.data))
1119 msg.append(b'\n')
1119 msg.append(b'\n')
1120 ui.debug(b''.join(msg))
1120 ui.debug(b''.join(msg))
1121
1121
1122 #### header
1122 #### header
1123 if self.mandatory:
1123 if self.mandatory:
1124 parttype = self.type.upper()
1124 parttype = self.type.upper()
1125 else:
1125 else:
1126 parttype = self.type.lower()
1126 parttype = self.type.lower()
1127 outdebug(ui, b'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1127 outdebug(ui, b'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1128 ## parttype
1128 ## parttype
1129 header = [
1129 header = [
1130 _pack(_fparttypesize, len(parttype)),
1130 _pack(_fparttypesize, len(parttype)),
1131 parttype,
1131 parttype,
1132 _pack(_fpartid, self.id),
1132 _pack(_fpartid, self.id),
1133 ]
1133 ]
1134 ## parameters
1134 ## parameters
1135 # count
1135 # count
1136 manpar = self.mandatoryparams
1136 manpar = self.mandatoryparams
1137 advpar = self.advisoryparams
1137 advpar = self.advisoryparams
1138 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1138 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1139 # size
1139 # size
1140 parsizes = []
1140 parsizes = []
1141 for key, value in manpar:
1141 for key, value in manpar:
1142 parsizes.append(len(key))
1142 parsizes.append(len(key))
1143 parsizes.append(len(value))
1143 parsizes.append(len(value))
1144 for key, value in advpar:
1144 for key, value in advpar:
1145 parsizes.append(len(key))
1145 parsizes.append(len(key))
1146 parsizes.append(len(value))
1146 parsizes.append(len(value))
1147 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1147 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1148 header.append(paramsizes)
1148 header.append(paramsizes)
1149 # key, value
1149 # key, value
1150 for key, value in manpar:
1150 for key, value in manpar:
1151 header.append(key)
1151 header.append(key)
1152 header.append(value)
1152 header.append(value)
1153 for key, value in advpar:
1153 for key, value in advpar:
1154 header.append(key)
1154 header.append(key)
1155 header.append(value)
1155 header.append(value)
1156 ## finalize header
1156 ## finalize header
1157 try:
1157 try:
1158 headerchunk = b''.join(header)
1158 headerchunk = b''.join(header)
1159 except TypeError:
1159 except TypeError:
1160 raise TypeError(
1160 raise TypeError(
1161 'Found a non-bytes trying to '
1161 'Found a non-bytes trying to '
1162 'build bundle part header: %r' % header
1162 'build bundle part header: %r' % header
1163 )
1163 )
1164 outdebug(ui, b'header chunk size: %i' % len(headerchunk))
1164 outdebug(ui, b'header chunk size: %i' % len(headerchunk))
1165 yield _pack(_fpartheadersize, len(headerchunk))
1165 yield _pack(_fpartheadersize, len(headerchunk))
1166 yield headerchunk
1166 yield headerchunk
1167 ## payload
1167 ## payload
1168 try:
1168 try:
1169 for chunk in self._payloadchunks():
1169 for chunk in self._payloadchunks():
1170 outdebug(ui, b'payload chunk size: %i' % len(chunk))
1170 outdebug(ui, b'payload chunk size: %i' % len(chunk))
1171 yield _pack(_fpayloadsize, len(chunk))
1171 yield _pack(_fpayloadsize, len(chunk))
1172 yield chunk
1172 yield chunk
1173 except GeneratorExit:
1173 except GeneratorExit:
1174 # GeneratorExit means that nobody is listening for our
1174 # GeneratorExit means that nobody is listening for our
1175 # results anyway, so just bail quickly rather than trying
1175 # results anyway, so just bail quickly rather than trying
1176 # to produce an error part.
1176 # to produce an error part.
1177 ui.debug(b'bundle2-generatorexit\n')
1177 ui.debug(b'bundle2-generatorexit\n')
1178 raise
1178 raise
1179 except BaseException as exc:
1179 except BaseException as exc:
1180 bexc = stringutil.forcebytestr(exc)
1180 bexc = stringutil.forcebytestr(exc)
1181 # backup exception data for later
1181 # backup exception data for later
1182 ui.debug(
1182 ui.debug(
1183 b'bundle2-input-stream-interrupt: encoding exception %s' % bexc
1183 b'bundle2-input-stream-interrupt: encoding exception %s' % bexc
1184 )
1184 )
1185 tb = sys.exc_info()[2]
1185 tb = sys.exc_info()[2]
1186 msg = b'unexpected error: %s' % bexc
1186 msg = b'unexpected error: %s' % bexc
1187 interpart = bundlepart(
1187 interpart = bundlepart(
1188 b'error:abort', [(b'message', msg)], mandatory=False
1188 b'error:abort', [(b'message', msg)], mandatory=False
1189 )
1189 )
1190 interpart.id = 0
1190 interpart.id = 0
1191 yield _pack(_fpayloadsize, -1)
1191 yield _pack(_fpayloadsize, -1)
1192 for chunk in interpart.getchunks(ui=ui):
1192 for chunk in interpart.getchunks(ui=ui):
1193 yield chunk
1193 yield chunk
1194 outdebug(ui, b'closing payload chunk')
1194 outdebug(ui, b'closing payload chunk')
1195 # abort current part payload
1195 # abort current part payload
1196 yield _pack(_fpayloadsize, 0)
1196 yield _pack(_fpayloadsize, 0)
1197 pycompat.raisewithtb(exc, tb)
1197 pycompat.raisewithtb(exc, tb)
1198 # end of payload
1198 # end of payload
1199 outdebug(ui, b'closing payload chunk')
1199 outdebug(ui, b'closing payload chunk')
1200 yield _pack(_fpayloadsize, 0)
1200 yield _pack(_fpayloadsize, 0)
1201 self._generated = True
1201 self._generated = True
1202
1202
1203 def _payloadchunks(self):
1203 def _payloadchunks(self):
1204 """yield chunks of a the part payload
1204 """yield chunks of a the part payload
1205
1205
1206 Exists to handle the different methods to provide data to a part."""
1206 Exists to handle the different methods to provide data to a part."""
1207 # we only support fixed size data now.
1207 # we only support fixed size data now.
1208 # This will be improved in the future.
1208 # This will be improved in the future.
1209 if util.safehasattr(self.data, 'next') or util.safehasattr(
1209 if util.safehasattr(self.data, 'next') or util.safehasattr(
1210 self.data, b'__next__'
1210 self.data, b'__next__'
1211 ):
1211 ):
1212 buff = util.chunkbuffer(self.data)
1212 buff = util.chunkbuffer(self.data)
1213 chunk = buff.read(preferedchunksize)
1213 chunk = buff.read(preferedchunksize)
1214 while chunk:
1214 while chunk:
1215 yield chunk
1215 yield chunk
1216 chunk = buff.read(preferedchunksize)
1216 chunk = buff.read(preferedchunksize)
1217 elif len(self.data):
1217 elif len(self.data):
1218 yield self.data
1218 yield self.data
1219
1219
1220
1220
1221 flaginterrupt = -1
1221 flaginterrupt = -1
1222
1222
1223
1223
1224 class interrupthandler(unpackermixin):
1224 class interrupthandler(unpackermixin):
1225 """read one part and process it with restricted capability
1225 """read one part and process it with restricted capability
1226
1226
1227 This allows to transmit exception raised on the producer size during part
1227 This allows to transmit exception raised on the producer size during part
1228 iteration while the consumer is reading a part.
1228 iteration while the consumer is reading a part.
1229
1229
1230 Part processed in this manner only have access to a ui object,"""
1230 Part processed in this manner only have access to a ui object,"""
1231
1231
1232 def __init__(self, ui, fp):
1232 def __init__(self, ui, fp):
1233 super(interrupthandler, self).__init__(fp)
1233 super(interrupthandler, self).__init__(fp)
1234 self.ui = ui
1234 self.ui = ui
1235
1235
1236 def _readpartheader(self):
1236 def _readpartheader(self):
1237 """reads a part header size and return the bytes blob
1237 """reads a part header size and return the bytes blob
1238
1238
1239 returns None if empty"""
1239 returns None if empty"""
1240 headersize = self._unpack(_fpartheadersize)[0]
1240 headersize = self._unpack(_fpartheadersize)[0]
1241 if headersize < 0:
1241 if headersize < 0:
1242 raise error.BundleValueError(
1242 raise error.BundleValueError(
1243 b'negative part header size: %i' % headersize
1243 b'negative part header size: %i' % headersize
1244 )
1244 )
1245 indebug(self.ui, b'part header size: %i\n' % headersize)
1245 indebug(self.ui, b'part header size: %i\n' % headersize)
1246 if headersize:
1246 if headersize:
1247 return self._readexact(headersize)
1247 return self._readexact(headersize)
1248 return None
1248 return None
1249
1249
1250 def __call__(self):
1250 def __call__(self):
1251
1251
1252 self.ui.debug(
1252 self.ui.debug(
1253 b'bundle2-input-stream-interrupt: opening out of band context\n'
1253 b'bundle2-input-stream-interrupt: opening out of band context\n'
1254 )
1254 )
1255 indebug(self.ui, b'bundle2 stream interruption, looking for a part.')
1255 indebug(self.ui, b'bundle2 stream interruption, looking for a part.')
1256 headerblock = self._readpartheader()
1256 headerblock = self._readpartheader()
1257 if headerblock is None:
1257 if headerblock is None:
1258 indebug(self.ui, b'no part found during interruption.')
1258 indebug(self.ui, b'no part found during interruption.')
1259 return
1259 return
1260 part = unbundlepart(self.ui, headerblock, self._fp)
1260 part = unbundlepart(self.ui, headerblock, self._fp)
1261 op = interruptoperation(self.ui)
1261 op = interruptoperation(self.ui)
1262 hardabort = False
1262 hardabort = False
1263 try:
1263 try:
1264 _processpart(op, part)
1264 _processpart(op, part)
1265 except (SystemExit, KeyboardInterrupt):
1265 except (SystemExit, KeyboardInterrupt):
1266 hardabort = True
1266 hardabort = True
1267 raise
1267 raise
1268 finally:
1268 finally:
1269 if not hardabort:
1269 if not hardabort:
1270 part.consume()
1270 part.consume()
1271 self.ui.debug(
1271 self.ui.debug(
1272 b'bundle2-input-stream-interrupt: closing out of band context\n'
1272 b'bundle2-input-stream-interrupt: closing out of band context\n'
1273 )
1273 )
1274
1274
1275
1275
1276 class interruptoperation(object):
1276 class interruptoperation(object):
1277 """A limited operation to be use by part handler during interruption
1277 """A limited operation to be use by part handler during interruption
1278
1278
1279 It only have access to an ui object.
1279 It only have access to an ui object.
1280 """
1280 """
1281
1281
1282 def __init__(self, ui):
1282 def __init__(self, ui):
1283 self.ui = ui
1283 self.ui = ui
1284 self.reply = None
1284 self.reply = None
1285 self.captureoutput = False
1285 self.captureoutput = False
1286
1286
1287 @property
1287 @property
1288 def repo(self):
1288 def repo(self):
1289 raise error.ProgrammingError(b'no repo access from stream interruption')
1289 raise error.ProgrammingError(b'no repo access from stream interruption')
1290
1290
1291 def gettransaction(self):
1291 def gettransaction(self):
1292 raise TransactionUnavailable(b'no repo access from stream interruption')
1292 raise TransactionUnavailable(b'no repo access from stream interruption')
1293
1293
1294
1294
1295 def decodepayloadchunks(ui, fh):
1295 def decodepayloadchunks(ui, fh):
1296 """Reads bundle2 part payload data into chunks.
1296 """Reads bundle2 part payload data into chunks.
1297
1297
1298 Part payload data consists of framed chunks. This function takes
1298 Part payload data consists of framed chunks. This function takes
1299 a file handle and emits those chunks.
1299 a file handle and emits those chunks.
1300 """
1300 """
1301 dolog = ui.configbool(b'devel', b'bundle2.debug')
1301 dolog = ui.configbool(b'devel', b'bundle2.debug')
1302 debug = ui.debug
1302 debug = ui.debug
1303
1303
1304 headerstruct = struct.Struct(_fpayloadsize)
1304 headerstruct = struct.Struct(_fpayloadsize)
1305 headersize = headerstruct.size
1305 headersize = headerstruct.size
1306 unpack = headerstruct.unpack
1306 unpack = headerstruct.unpack
1307
1307
1308 readexactly = changegroup.readexactly
1308 readexactly = changegroup.readexactly
1309 read = fh.read
1309 read = fh.read
1310
1310
1311 chunksize = unpack(readexactly(fh, headersize))[0]
1311 chunksize = unpack(readexactly(fh, headersize))[0]
1312 indebug(ui, b'payload chunk size: %i' % chunksize)
1312 indebug(ui, b'payload chunk size: %i' % chunksize)
1313
1313
1314 # changegroup.readexactly() is inlined below for performance.
1314 # changegroup.readexactly() is inlined below for performance.
1315 while chunksize:
1315 while chunksize:
1316 if chunksize >= 0:
1316 if chunksize >= 0:
1317 s = read(chunksize)
1317 s = read(chunksize)
1318 if len(s) < chunksize:
1318 if len(s) < chunksize:
1319 raise error.Abort(
1319 raise error.Abort(
1320 _(
1320 _(
1321 b'stream ended unexpectedly '
1321 b'stream ended unexpectedly '
1322 b' (got %d bytes, expected %d)'
1322 b' (got %d bytes, expected %d)'
1323 )
1323 )
1324 % (len(s), chunksize)
1324 % (len(s), chunksize)
1325 )
1325 )
1326
1326
1327 yield s
1327 yield s
1328 elif chunksize == flaginterrupt:
1328 elif chunksize == flaginterrupt:
1329 # Interrupt "signal" detected. The regular stream is interrupted
1329 # Interrupt "signal" detected. The regular stream is interrupted
1330 # and a bundle2 part follows. Consume it.
1330 # and a bundle2 part follows. Consume it.
1331 interrupthandler(ui, fh)()
1331 interrupthandler(ui, fh)()
1332 else:
1332 else:
1333 raise error.BundleValueError(
1333 raise error.BundleValueError(
1334 b'negative payload chunk size: %s' % chunksize
1334 b'negative payload chunk size: %s' % chunksize
1335 )
1335 )
1336
1336
1337 s = read(headersize)
1337 s = read(headersize)
1338 if len(s) < headersize:
1338 if len(s) < headersize:
1339 raise error.Abort(
1339 raise error.Abort(
1340 _(b'stream ended unexpectedly (got %d bytes, expected %d)')
1340 _(b'stream ended unexpectedly (got %d bytes, expected %d)')
1341 % (len(s), chunksize)
1341 % (len(s), chunksize)
1342 )
1342 )
1343
1343
1344 chunksize = unpack(s)[0]
1344 chunksize = unpack(s)[0]
1345
1345
1346 # indebug() inlined for performance.
1346 # indebug() inlined for performance.
1347 if dolog:
1347 if dolog:
1348 debug(b'bundle2-input: payload chunk size: %i\n' % chunksize)
1348 debug(b'bundle2-input: payload chunk size: %i\n' % chunksize)
1349
1349
1350
1350
1351 class unbundlepart(unpackermixin):
1351 class unbundlepart(unpackermixin):
1352 """a bundle part read from a bundle"""
1352 """a bundle part read from a bundle"""
1353
1353
1354 def __init__(self, ui, header, fp):
1354 def __init__(self, ui, header, fp):
1355 super(unbundlepart, self).__init__(fp)
1355 super(unbundlepart, self).__init__(fp)
1356 self._seekable = util.safehasattr(fp, 'seek') and util.safehasattr(
1356 self._seekable = util.safehasattr(fp, 'seek') and util.safehasattr(
1357 fp, b'tell'
1357 fp, b'tell'
1358 )
1358 )
1359 self.ui = ui
1359 self.ui = ui
1360 # unbundle state attr
1360 # unbundle state attr
1361 self._headerdata = header
1361 self._headerdata = header
1362 self._headeroffset = 0
1362 self._headeroffset = 0
1363 self._initialized = False
1363 self._initialized = False
1364 self.consumed = False
1364 self.consumed = False
1365 # part data
1365 # part data
1366 self.id = None
1366 self.id = None
1367 self.type = None
1367 self.type = None
1368 self.mandatoryparams = None
1368 self.mandatoryparams = None
1369 self.advisoryparams = None
1369 self.advisoryparams = None
1370 self.params = None
1370 self.params = None
1371 self.mandatorykeys = ()
1371 self.mandatorykeys = ()
1372 self._readheader()
1372 self._readheader()
1373 self._mandatory = None
1373 self._mandatory = None
1374 self._pos = 0
1374 self._pos = 0
1375
1375
1376 def _fromheader(self, size):
1376 def _fromheader(self, size):
1377 """return the next <size> byte from the header"""
1377 """return the next <size> byte from the header"""
1378 offset = self._headeroffset
1378 offset = self._headeroffset
1379 data = self._headerdata[offset : (offset + size)]
1379 data = self._headerdata[offset : (offset + size)]
1380 self._headeroffset = offset + size
1380 self._headeroffset = offset + size
1381 return data
1381 return data
1382
1382
1383 def _unpackheader(self, format):
1383 def _unpackheader(self, format):
1384 """read given format from header
1384 """read given format from header
1385
1385
1386 This automatically compute the size of the format to read."""
1386 This automatically compute the size of the format to read."""
1387 data = self._fromheader(struct.calcsize(format))
1387 data = self._fromheader(struct.calcsize(format))
1388 return _unpack(format, data)
1388 return _unpack(format, data)
1389
1389
1390 def _initparams(self, mandatoryparams, advisoryparams):
1390 def _initparams(self, mandatoryparams, advisoryparams):
1391 """internal function to setup all logic related parameters"""
1391 """internal function to setup all logic related parameters"""
1392 # make it read only to prevent people touching it by mistake.
1392 # make it read only to prevent people touching it by mistake.
1393 self.mandatoryparams = tuple(mandatoryparams)
1393 self.mandatoryparams = tuple(mandatoryparams)
1394 self.advisoryparams = tuple(advisoryparams)
1394 self.advisoryparams = tuple(advisoryparams)
1395 # user friendly UI
1395 # user friendly UI
1396 self.params = util.sortdict(self.mandatoryparams)
1396 self.params = util.sortdict(self.mandatoryparams)
1397 self.params.update(self.advisoryparams)
1397 self.params.update(self.advisoryparams)
1398 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1398 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1399
1399
1400 def _readheader(self):
1400 def _readheader(self):
1401 """read the header and setup the object"""
1401 """read the header and setup the object"""
1402 typesize = self._unpackheader(_fparttypesize)[0]
1402 typesize = self._unpackheader(_fparttypesize)[0]
1403 self.type = self._fromheader(typesize)
1403 self.type = self._fromheader(typesize)
1404 indebug(self.ui, b'part type: "%s"' % self.type)
1404 indebug(self.ui, b'part type: "%s"' % self.type)
1405 self.id = self._unpackheader(_fpartid)[0]
1405 self.id = self._unpackheader(_fpartid)[0]
1406 indebug(self.ui, b'part id: "%s"' % pycompat.bytestr(self.id))
1406 indebug(self.ui, b'part id: "%s"' % pycompat.bytestr(self.id))
1407 # extract mandatory bit from type
1407 # extract mandatory bit from type
1408 self.mandatory = self.type != self.type.lower()
1408 self.mandatory = self.type != self.type.lower()
1409 self.type = self.type.lower()
1409 self.type = self.type.lower()
1410 ## reading parameters
1410 ## reading parameters
1411 # param count
1411 # param count
1412 mancount, advcount = self._unpackheader(_fpartparamcount)
1412 mancount, advcount = self._unpackheader(_fpartparamcount)
1413 indebug(self.ui, b'part parameters: %i' % (mancount + advcount))
1413 indebug(self.ui, b'part parameters: %i' % (mancount + advcount))
1414 # param size
1414 # param size
1415 fparamsizes = _makefpartparamsizes(mancount + advcount)
1415 fparamsizes = _makefpartparamsizes(mancount + advcount)
1416 paramsizes = self._unpackheader(fparamsizes)
1416 paramsizes = self._unpackheader(fparamsizes)
1417 # make it a list of couple again
1417 # make it a list of couple again
1418 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1418 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1419 # split mandatory from advisory
1419 # split mandatory from advisory
1420 mansizes = paramsizes[:mancount]
1420 mansizes = paramsizes[:mancount]
1421 advsizes = paramsizes[mancount:]
1421 advsizes = paramsizes[mancount:]
1422 # retrieve param value
1422 # retrieve param value
1423 manparams = []
1423 manparams = []
1424 for key, value in mansizes:
1424 for key, value in mansizes:
1425 manparams.append((self._fromheader(key), self._fromheader(value)))
1425 manparams.append((self._fromheader(key), self._fromheader(value)))
1426 advparams = []
1426 advparams = []
1427 for key, value in advsizes:
1427 for key, value in advsizes:
1428 advparams.append((self._fromheader(key), self._fromheader(value)))
1428 advparams.append((self._fromheader(key), self._fromheader(value)))
1429 self._initparams(manparams, advparams)
1429 self._initparams(manparams, advparams)
1430 ## part payload
1430 ## part payload
1431 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1431 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1432 # we read the data, tell it
1432 # we read the data, tell it
1433 self._initialized = True
1433 self._initialized = True
1434
1434
1435 def _payloadchunks(self):
1435 def _payloadchunks(self):
1436 """Generator of decoded chunks in the payload."""
1436 """Generator of decoded chunks in the payload."""
1437 return decodepayloadchunks(self.ui, self._fp)
1437 return decodepayloadchunks(self.ui, self._fp)
1438
1438
1439 def consume(self):
1439 def consume(self):
1440 """Read the part payload until completion.
1440 """Read the part payload until completion.
1441
1441
1442 By consuming the part data, the underlying stream read offset will
1442 By consuming the part data, the underlying stream read offset will
1443 be advanced to the next part (or end of stream).
1443 be advanced to the next part (or end of stream).
1444 """
1444 """
1445 if self.consumed:
1445 if self.consumed:
1446 return
1446 return
1447
1447
1448 chunk = self.read(32768)
1448 chunk = self.read(32768)
1449 while chunk:
1449 while chunk:
1450 self._pos += len(chunk)
1450 self._pos += len(chunk)
1451 chunk = self.read(32768)
1451 chunk = self.read(32768)
1452
1452
1453 def read(self, size=None):
1453 def read(self, size=None):
1454 """read payload data"""
1454 """read payload data"""
1455 if not self._initialized:
1455 if not self._initialized:
1456 self._readheader()
1456 self._readheader()
1457 if size is None:
1457 if size is None:
1458 data = self._payloadstream.read()
1458 data = self._payloadstream.read()
1459 else:
1459 else:
1460 data = self._payloadstream.read(size)
1460 data = self._payloadstream.read(size)
1461 self._pos += len(data)
1461 self._pos += len(data)
1462 if size is None or len(data) < size:
1462 if size is None or len(data) < size:
1463 if not self.consumed and self._pos:
1463 if not self.consumed and self._pos:
1464 self.ui.debug(
1464 self.ui.debug(
1465 b'bundle2-input-part: total payload size %i\n' % self._pos
1465 b'bundle2-input-part: total payload size %i\n' % self._pos
1466 )
1466 )
1467 self.consumed = True
1467 self.consumed = True
1468 return data
1468 return data
1469
1469
1470
1470
1471 class seekableunbundlepart(unbundlepart):
1471 class seekableunbundlepart(unbundlepart):
1472 """A bundle2 part in a bundle that is seekable.
1472 """A bundle2 part in a bundle that is seekable.
1473
1473
1474 Regular ``unbundlepart`` instances can only be read once. This class
1474 Regular ``unbundlepart`` instances can only be read once. This class
1475 extends ``unbundlepart`` to enable bi-directional seeking within the
1475 extends ``unbundlepart`` to enable bi-directional seeking within the
1476 part.
1476 part.
1477
1477
1478 Bundle2 part data consists of framed chunks. Offsets when seeking
1478 Bundle2 part data consists of framed chunks. Offsets when seeking
1479 refer to the decoded data, not the offsets in the underlying bundle2
1479 refer to the decoded data, not the offsets in the underlying bundle2
1480 stream.
1480 stream.
1481
1481
1482 To facilitate quickly seeking within the decoded data, instances of this
1482 To facilitate quickly seeking within the decoded data, instances of this
1483 class maintain a mapping between offsets in the underlying stream and
1483 class maintain a mapping between offsets in the underlying stream and
1484 the decoded payload. This mapping will consume memory in proportion
1484 the decoded payload. This mapping will consume memory in proportion
1485 to the number of chunks within the payload (which almost certainly
1485 to the number of chunks within the payload (which almost certainly
1486 increases in proportion with the size of the part).
1486 increases in proportion with the size of the part).
1487 """
1487 """
1488
1488
1489 def __init__(self, ui, header, fp):
1489 def __init__(self, ui, header, fp):
1490 # (payload, file) offsets for chunk starts.
1490 # (payload, file) offsets for chunk starts.
1491 self._chunkindex = []
1491 self._chunkindex = []
1492
1492
1493 super(seekableunbundlepart, self).__init__(ui, header, fp)
1493 super(seekableunbundlepart, self).__init__(ui, header, fp)
1494
1494
1495 def _payloadchunks(self, chunknum=0):
1495 def _payloadchunks(self, chunknum=0):
1496 '''seek to specified chunk and start yielding data'''
1496 '''seek to specified chunk and start yielding data'''
1497 if len(self._chunkindex) == 0:
1497 if len(self._chunkindex) == 0:
1498 assert chunknum == 0, b'Must start with chunk 0'
1498 assert chunknum == 0, b'Must start with chunk 0'
1499 self._chunkindex.append((0, self._tellfp()))
1499 self._chunkindex.append((0, self._tellfp()))
1500 else:
1500 else:
1501 assert chunknum < len(self._chunkindex), (
1501 assert chunknum < len(self._chunkindex), (
1502 b'Unknown chunk %d' % chunknum
1502 b'Unknown chunk %d' % chunknum
1503 )
1503 )
1504 self._seekfp(self._chunkindex[chunknum][1])
1504 self._seekfp(self._chunkindex[chunknum][1])
1505
1505
1506 pos = self._chunkindex[chunknum][0]
1506 pos = self._chunkindex[chunknum][0]
1507
1507
1508 for chunk in decodepayloadchunks(self.ui, self._fp):
1508 for chunk in decodepayloadchunks(self.ui, self._fp):
1509 chunknum += 1
1509 chunknum += 1
1510 pos += len(chunk)
1510 pos += len(chunk)
1511 if chunknum == len(self._chunkindex):
1511 if chunknum == len(self._chunkindex):
1512 self._chunkindex.append((pos, self._tellfp()))
1512 self._chunkindex.append((pos, self._tellfp()))
1513
1513
1514 yield chunk
1514 yield chunk
1515
1515
1516 def _findchunk(self, pos):
1516 def _findchunk(self, pos):
1517 '''for a given payload position, return a chunk number and offset'''
1517 '''for a given payload position, return a chunk number and offset'''
1518 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1518 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1519 if ppos == pos:
1519 if ppos == pos:
1520 return chunk, 0
1520 return chunk, 0
1521 elif ppos > pos:
1521 elif ppos > pos:
1522 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1522 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1523 raise ValueError(b'Unknown chunk')
1523 raise ValueError(b'Unknown chunk')
1524
1524
1525 def tell(self):
1525 def tell(self):
1526 return self._pos
1526 return self._pos
1527
1527
1528 def seek(self, offset, whence=os.SEEK_SET):
1528 def seek(self, offset, whence=os.SEEK_SET):
1529 if whence == os.SEEK_SET:
1529 if whence == os.SEEK_SET:
1530 newpos = offset
1530 newpos = offset
1531 elif whence == os.SEEK_CUR:
1531 elif whence == os.SEEK_CUR:
1532 newpos = self._pos + offset
1532 newpos = self._pos + offset
1533 elif whence == os.SEEK_END:
1533 elif whence == os.SEEK_END:
1534 if not self.consumed:
1534 if not self.consumed:
1535 # Can't use self.consume() here because it advances self._pos.
1535 # Can't use self.consume() here because it advances self._pos.
1536 chunk = self.read(32768)
1536 chunk = self.read(32768)
1537 while chunk:
1537 while chunk:
1538 chunk = self.read(32768)
1538 chunk = self.read(32768)
1539 newpos = self._chunkindex[-1][0] - offset
1539 newpos = self._chunkindex[-1][0] - offset
1540 else:
1540 else:
1541 raise ValueError(b'Unknown whence value: %r' % (whence,))
1541 raise ValueError(b'Unknown whence value: %r' % (whence,))
1542
1542
1543 if newpos > self._chunkindex[-1][0] and not self.consumed:
1543 if newpos > self._chunkindex[-1][0] and not self.consumed:
1544 # Can't use self.consume() here because it advances self._pos.
1544 # Can't use self.consume() here because it advances self._pos.
1545 chunk = self.read(32768)
1545 chunk = self.read(32768)
1546 while chunk:
1546 while chunk:
1547 chunk = self.read(32668)
1547 chunk = self.read(32668)
1548
1548
1549 if not 0 <= newpos <= self._chunkindex[-1][0]:
1549 if not 0 <= newpos <= self._chunkindex[-1][0]:
1550 raise ValueError(b'Offset out of range')
1550 raise ValueError(b'Offset out of range')
1551
1551
1552 if self._pos != newpos:
1552 if self._pos != newpos:
1553 chunk, internaloffset = self._findchunk(newpos)
1553 chunk, internaloffset = self._findchunk(newpos)
1554 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1554 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1555 adjust = self.read(internaloffset)
1555 adjust = self.read(internaloffset)
1556 if len(adjust) != internaloffset:
1556 if len(adjust) != internaloffset:
1557 raise error.Abort(_(b'Seek failed\n'))
1557 raise error.Abort(_(b'Seek failed\n'))
1558 self._pos = newpos
1558 self._pos = newpos
1559
1559
1560 def _seekfp(self, offset, whence=0):
1560 def _seekfp(self, offset, whence=0):
1561 """move the underlying file pointer
1561 """move the underlying file pointer
1562
1562
1563 This method is meant for internal usage by the bundle2 protocol only.
1563 This method is meant for internal usage by the bundle2 protocol only.
1564 They directly manipulate the low level stream including bundle2 level
1564 They directly manipulate the low level stream including bundle2 level
1565 instruction.
1565 instruction.
1566
1566
1567 Do not use it to implement higher-level logic or methods."""
1567 Do not use it to implement higher-level logic or methods."""
1568 if self._seekable:
1568 if self._seekable:
1569 return self._fp.seek(offset, whence)
1569 return self._fp.seek(offset, whence)
1570 else:
1570 else:
1571 raise NotImplementedError(_(b'File pointer is not seekable'))
1571 raise NotImplementedError(_(b'File pointer is not seekable'))
1572
1572
1573 def _tellfp(self):
1573 def _tellfp(self):
1574 """return the file offset, or None if file is not seekable
1574 """return the file offset, or None if file is not seekable
1575
1575
1576 This method is meant for internal usage by the bundle2 protocol only.
1576 This method is meant for internal usage by the bundle2 protocol only.
1577 They directly manipulate the low level stream including bundle2 level
1577 They directly manipulate the low level stream including bundle2 level
1578 instruction.
1578 instruction.
1579
1579
1580 Do not use it to implement higher-level logic or methods."""
1580 Do not use it to implement higher-level logic or methods."""
1581 if self._seekable:
1581 if self._seekable:
1582 try:
1582 try:
1583 return self._fp.tell()
1583 return self._fp.tell()
1584 except IOError as e:
1584 except IOError as e:
1585 if e.errno == errno.ESPIPE:
1585 if e.errno == errno.ESPIPE:
1586 self._seekable = False
1586 self._seekable = False
1587 else:
1587 else:
1588 raise
1588 raise
1589 return None
1589 return None
1590
1590
1591
1591
1592 # These are only the static capabilities.
1592 # These are only the static capabilities.
1593 # Check the 'getrepocaps' function for the rest.
1593 # Check the 'getrepocaps' function for the rest.
1594 capabilities = {
1594 capabilities = {
1595 b'HG20': (),
1595 b'HG20': (),
1596 b'bookmarks': (),
1596 b'bookmarks': (),
1597 b'error': (b'abort', b'unsupportedcontent', b'pushraced', b'pushkey'),
1597 b'error': (b'abort', b'unsupportedcontent', b'pushraced', b'pushkey'),
1598 b'listkeys': (),
1598 b'listkeys': (),
1599 b'pushkey': (),
1599 b'pushkey': (),
1600 b'digests': tuple(sorted(util.DIGESTS.keys())),
1600 b'digests': tuple(sorted(util.DIGESTS.keys())),
1601 b'remote-changegroup': (b'http', b'https'),
1601 b'remote-changegroup': (b'http', b'https'),
1602 b'hgtagsfnodes': (),
1602 b'hgtagsfnodes': (),
1603 b'phases': (b'heads',),
1603 b'phases': (b'heads',),
1604 b'stream': (b'v2',),
1604 b'stream': (b'v2',),
1605 }
1605 }
1606
1606
1607
1607
1608 def getrepocaps(repo, allowpushback=False, role=None):
1608 def getrepocaps(repo, allowpushback=False, role=None):
1609 """return the bundle2 capabilities for a given repo
1609 """return the bundle2 capabilities for a given repo
1610
1610
1611 Exists to allow extensions (like evolution) to mutate the capabilities.
1611 Exists to allow extensions (like evolution) to mutate the capabilities.
1612
1612
1613 The returned value is used for servers advertising their capabilities as
1613 The returned value is used for servers advertising their capabilities as
1614 well as clients advertising their capabilities to servers as part of
1614 well as clients advertising their capabilities to servers as part of
1615 bundle2 requests. The ``role`` argument specifies which is which.
1615 bundle2 requests. The ``role`` argument specifies which is which.
1616 """
1616 """
1617 if role not in (b'client', b'server'):
1617 if role not in (b'client', b'server'):
1618 raise error.ProgrammingError(b'role argument must be client or server')
1618 raise error.ProgrammingError(b'role argument must be client or server')
1619
1619
1620 caps = capabilities.copy()
1620 caps = capabilities.copy()
1621 caps[b'changegroup'] = tuple(
1621 caps[b'changegroup'] = tuple(
1622 sorted(changegroup.supportedincomingversions(repo))
1622 sorted(changegroup.supportedincomingversions(repo))
1623 )
1623 )
1624 if obsolete.isenabled(repo, obsolete.exchangeopt):
1624 if obsolete.isenabled(repo, obsolete.exchangeopt):
1625 supportedformat = tuple(b'V%i' % v for v in obsolete.formats)
1625 supportedformat = tuple(b'V%i' % v for v in obsolete.formats)
1626 caps[b'obsmarkers'] = supportedformat
1626 caps[b'obsmarkers'] = supportedformat
1627 if allowpushback:
1627 if allowpushback:
1628 caps[b'pushback'] = ()
1628 caps[b'pushback'] = ()
1629 cpmode = repo.ui.config(b'server', b'concurrent-push-mode')
1629 cpmode = repo.ui.config(b'server', b'concurrent-push-mode')
1630 if cpmode == b'check-related':
1630 if cpmode == b'check-related':
1631 caps[b'checkheads'] = (b'related',)
1631 caps[b'checkheads'] = (b'related',)
1632 if b'phases' in repo.ui.configlist(b'devel', b'legacy.exchange'):
1632 if b'phases' in repo.ui.configlist(b'devel', b'legacy.exchange'):
1633 caps.pop(b'phases')
1633 caps.pop(b'phases')
1634
1634
1635 # Don't advertise stream clone support in server mode if not configured.
1635 # Don't advertise stream clone support in server mode if not configured.
1636 if role == b'server':
1636 if role == b'server':
1637 streamsupported = repo.ui.configbool(
1637 streamsupported = repo.ui.configbool(
1638 b'server', b'uncompressed', untrusted=True
1638 b'server', b'uncompressed', untrusted=True
1639 )
1639 )
1640 featuresupported = repo.ui.configbool(b'server', b'bundle2.stream')
1640 featuresupported = repo.ui.configbool(b'server', b'bundle2.stream')
1641
1641
1642 if not streamsupported or not featuresupported:
1642 if not streamsupported or not featuresupported:
1643 caps.pop(b'stream')
1643 caps.pop(b'stream')
1644 # Else always advertise support on client, because payload support
1644 # Else always advertise support on client, because payload support
1645 # should always be advertised.
1645 # should always be advertised.
1646
1646
1647 # b'rev-branch-cache is no longer advertised, but still supported
1647 # b'rev-branch-cache is no longer advertised, but still supported
1648 # for legacy clients.
1648 # for legacy clients.
1649
1649
1650 return caps
1650 return caps
1651
1651
1652
1652
1653 def bundle2caps(remote):
1653 def bundle2caps(remote):
1654 """return the bundle capabilities of a peer as dict"""
1654 """return the bundle capabilities of a peer as dict"""
1655 raw = remote.capable(b'bundle2')
1655 raw = remote.capable(b'bundle2')
1656 if not raw and raw != b'':
1656 if not raw and raw != b'':
1657 return {}
1657 return {}
1658 capsblob = urlreq.unquote(remote.capable(b'bundle2'))
1658 capsblob = urlreq.unquote(remote.capable(b'bundle2'))
1659 return decodecaps(capsblob)
1659 return decodecaps(capsblob)
1660
1660
1661
1661
1662 def obsmarkersversion(caps):
1662 def obsmarkersversion(caps):
1663 """extract the list of supported obsmarkers versions from a bundle2caps dict"""
1663 """extract the list of supported obsmarkers versions from a bundle2caps dict"""
1664 obscaps = caps.get(b'obsmarkers', ())
1664 obscaps = caps.get(b'obsmarkers', ())
1665 return [int(c[1:]) for c in obscaps if c.startswith(b'V')]
1665 return [int(c[1:]) for c in obscaps if c.startswith(b'V')]
1666
1666
1667
1667
1668 def writenewbundle(
1668 def writenewbundle(
1669 ui,
1669 ui,
1670 repo,
1670 repo,
1671 source,
1671 source,
1672 filename,
1672 filename,
1673 bundletype,
1673 bundletype,
1674 outgoing,
1674 outgoing,
1675 opts,
1675 opts,
1676 vfs=None,
1676 vfs=None,
1677 compression=None,
1677 compression=None,
1678 compopts=None,
1678 compopts=None,
1679 ):
1679 ):
1680 if bundletype.startswith(b'HG10'):
1680 if bundletype.startswith(b'HG10'):
1681 cg = changegroup.makechangegroup(repo, outgoing, b'01', source)
1681 cg = changegroup.makechangegroup(repo, outgoing, b'01', source)
1682 return writebundle(
1682 return writebundle(
1683 ui,
1683 ui,
1684 cg,
1684 cg,
1685 filename,
1685 filename,
1686 bundletype,
1686 bundletype,
1687 vfs=vfs,
1687 vfs=vfs,
1688 compression=compression,
1688 compression=compression,
1689 compopts=compopts,
1689 compopts=compopts,
1690 )
1690 )
1691 elif not bundletype.startswith(b'HG20'):
1691 elif not bundletype.startswith(b'HG20'):
1692 raise error.ProgrammingError(b'unknown bundle type: %s' % bundletype)
1692 raise error.ProgrammingError(b'unknown bundle type: %s' % bundletype)
1693
1693
1694 caps = {}
1694 caps = {}
1695 if b'obsolescence' in opts:
1695 if b'obsolescence' in opts:
1696 caps[b'obsmarkers'] = (b'V1',)
1696 caps[b'obsmarkers'] = (b'V1',)
1697 bundle = bundle20(ui, caps)
1697 bundle = bundle20(ui, caps)
1698 bundle.setcompression(compression, compopts)
1698 bundle.setcompression(compression, compopts)
1699 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1699 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1700 chunkiter = bundle.getchunks()
1700 chunkiter = bundle.getchunks()
1701
1701
1702 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1702 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1703
1703
1704
1704
1705 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1705 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1706 # We should eventually reconcile this logic with the one behind
1706 # We should eventually reconcile this logic with the one behind
1707 # 'exchange.getbundle2partsgenerator'.
1707 # 'exchange.getbundle2partsgenerator'.
1708 #
1708 #
1709 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1709 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1710 # different right now. So we keep them separated for now for the sake of
1710 # different right now. So we keep them separated for now for the sake of
1711 # simplicity.
1711 # simplicity.
1712
1712
1713 # we might not always want a changegroup in such bundle, for example in
1713 # we might not always want a changegroup in such bundle, for example in
1714 # stream bundles
1714 # stream bundles
1715 if opts.get(b'changegroup', True):
1715 if opts.get(b'changegroup', True):
1716 cgversion = opts.get(b'cg.version')
1716 cgversion = opts.get(b'cg.version')
1717 if cgversion is None:
1717 if cgversion is None:
1718 cgversion = changegroup.safeversion(repo)
1718 cgversion = changegroup.safeversion(repo)
1719 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1719 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1720 part = bundler.newpart(b'changegroup', data=cg.getchunks())
1720 part = bundler.newpart(b'changegroup', data=cg.getchunks())
1721 part.addparam(b'version', cg.version)
1721 part.addparam(b'version', cg.version)
1722 if b'clcount' in cg.extras:
1722 if b'clcount' in cg.extras:
1723 part.addparam(
1723 part.addparam(
1724 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1724 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1725 )
1725 )
1726 if opts.get(b'phases') and repo.revs(
1726 if opts.get(b'phases') and repo.revs(
1727 b'%ln and secret()', outgoing.ancestorsof
1727 b'%ln and secret()', outgoing.ancestorsof
1728 ):
1728 ):
1729 part.addparam(
1729 part.addparam(
1730 b'targetphase', b'%d' % phases.secret, mandatory=False
1730 b'targetphase', b'%d' % phases.secret, mandatory=False
1731 )
1731 )
1732 if b'exp-sidedata-flag' in repo.requirements:
1732 if b'exp-sidedata-flag' in repo.requirements:
1733 part.addparam(b'exp-sidedata', b'1')
1733 part.addparam(b'exp-sidedata', b'1')
1734
1734
1735 if opts.get(b'streamv2', False):
1735 if opts.get(b'streamv2', False):
1736 addpartbundlestream2(bundler, repo, stream=True)
1736 addpartbundlestream2(bundler, repo, stream=True)
1737
1737
1738 if opts.get(b'tagsfnodescache', True):
1738 if opts.get(b'tagsfnodescache', True):
1739 addparttagsfnodescache(repo, bundler, outgoing)
1739 addparttagsfnodescache(repo, bundler, outgoing)
1740
1740
1741 if opts.get(b'revbranchcache', True):
1741 if opts.get(b'revbranchcache', True):
1742 addpartrevbranchcache(repo, bundler, outgoing)
1742 addpartrevbranchcache(repo, bundler, outgoing)
1743
1743
1744 if opts.get(b'obsolescence', False):
1744 if opts.get(b'obsolescence', False):
1745 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1745 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1746 buildobsmarkerspart(
1746 buildobsmarkerspart(
1747 bundler,
1747 bundler,
1748 obsmarkers,
1748 obsmarkers,
1749 mandatory=opts.get(b'obsolescence-mandatory', True),
1749 mandatory=opts.get(b'obsolescence-mandatory', True),
1750 )
1750 )
1751
1751
1752 if opts.get(b'phases', False):
1752 if opts.get(b'phases', False):
1753 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1753 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1754 phasedata = phases.binaryencode(headsbyphase)
1754 phasedata = phases.binaryencode(headsbyphase)
1755 bundler.newpart(b'phase-heads', data=phasedata)
1755 bundler.newpart(b'phase-heads', data=phasedata)
1756
1756
1757
1757
1758 def addparttagsfnodescache(repo, bundler, outgoing):
1758 def addparttagsfnodescache(repo, bundler, outgoing):
1759 # we include the tags fnode cache for the bundle changeset
1759 # we include the tags fnode cache for the bundle changeset
1760 # (as an optional parts)
1760 # (as an optional parts)
1761 cache = tags.hgtagsfnodescache(repo.unfiltered())
1761 cache = tags.hgtagsfnodescache(repo.unfiltered())
1762 chunks = []
1762 chunks = []
1763
1763
1764 # .hgtags fnodes are only relevant for head changesets. While we could
1764 # .hgtags fnodes are only relevant for head changesets. While we could
1765 # transfer values for all known nodes, there will likely be little to
1765 # transfer values for all known nodes, there will likely be little to
1766 # no benefit.
1766 # no benefit.
1767 #
1767 #
1768 # We don't bother using a generator to produce output data because
1768 # We don't bother using a generator to produce output data because
1769 # a) we only have 40 bytes per head and even esoteric numbers of heads
1769 # a) we only have 40 bytes per head and even esoteric numbers of heads
1770 # consume little memory (1M heads is 40MB) b) we don't want to send the
1770 # consume little memory (1M heads is 40MB) b) we don't want to send the
1771 # part if we don't have entries and knowing if we have entries requires
1771 # part if we don't have entries and knowing if we have entries requires
1772 # cache lookups.
1772 # cache lookups.
1773 for node in outgoing.ancestorsof:
1773 for node in outgoing.ancestorsof:
1774 # Don't compute missing, as this may slow down serving.
1774 # Don't compute missing, as this may slow down serving.
1775 fnode = cache.getfnode(node, computemissing=False)
1775 fnode = cache.getfnode(node, computemissing=False)
1776 if fnode:
1776 if fnode:
1777 chunks.extend([node, fnode])
1777 chunks.extend([node, fnode])
1778
1778
1779 if chunks:
1779 if chunks:
1780 bundler.newpart(b'hgtagsfnodes', data=b''.join(chunks))
1780 bundler.newpart(b'hgtagsfnodes', data=b''.join(chunks))
1781
1781
1782
1782
1783 def addpartrevbranchcache(repo, bundler, outgoing):
1783 def addpartrevbranchcache(repo, bundler, outgoing):
1784 # we include the rev branch cache for the bundle changeset
1784 # we include the rev branch cache for the bundle changeset
1785 # (as an optional parts)
1785 # (as an optional parts)
1786 cache = repo.revbranchcache()
1786 cache = repo.revbranchcache()
1787 cl = repo.unfiltered().changelog
1787 cl = repo.unfiltered().changelog
1788 branchesdata = collections.defaultdict(lambda: (set(), set()))
1788 branchesdata = collections.defaultdict(lambda: (set(), set()))
1789 for node in outgoing.missing:
1789 for node in outgoing.missing:
1790 branch, close = cache.branchinfo(cl.rev(node))
1790 branch, close = cache.branchinfo(cl.rev(node))
1791 branchesdata[branch][close].add(node)
1791 branchesdata[branch][close].add(node)
1792
1792
1793 def generate():
1793 def generate():
1794 for branch, (nodes, closed) in sorted(branchesdata.items()):
1794 for branch, (nodes, closed) in sorted(branchesdata.items()):
1795 utf8branch = encoding.fromlocal(branch)
1795 utf8branch = encoding.fromlocal(branch)
1796 yield rbcstruct.pack(len(utf8branch), len(nodes), len(closed))
1796 yield rbcstruct.pack(len(utf8branch), len(nodes), len(closed))
1797 yield utf8branch
1797 yield utf8branch
1798 for n in sorted(nodes):
1798 for n in sorted(nodes):
1799 yield n
1799 yield n
1800 for n in sorted(closed):
1800 for n in sorted(closed):
1801 yield n
1801 yield n
1802
1802
1803 bundler.newpart(b'cache:rev-branch-cache', data=generate(), mandatory=False)
1803 bundler.newpart(b'cache:rev-branch-cache', data=generate(), mandatory=False)
1804
1804
1805
1805
1806 def _formatrequirementsspec(requirements):
1806 def _formatrequirementsspec(requirements):
1807 requirements = [req for req in requirements if req != b"shared"]
1807 requirements = [req for req in requirements if req != b"shared"]
1808 return urlreq.quote(b','.join(sorted(requirements)))
1808 return urlreq.quote(b','.join(sorted(requirements)))
1809
1809
1810
1810
1811 def _formatrequirementsparams(requirements):
1811 def _formatrequirementsparams(requirements):
1812 requirements = _formatrequirementsspec(requirements)
1812 requirements = _formatrequirementsspec(requirements)
1813 params = b"%s%s" % (urlreq.quote(b"requirements="), requirements)
1813 params = b"%s%s" % (urlreq.quote(b"requirements="), requirements)
1814 return params
1814 return params
1815
1815
1816
1816
1817 def format_remote_wanted_sidedata(repo):
1817 def format_remote_wanted_sidedata(repo):
1818 """Formats a repo's wanted sidedata categories into a bytestring for
1818 """Formats a repo's wanted sidedata categories into a bytestring for
1819 capabilities exchange."""
1819 capabilities exchange."""
1820 wanted = b""
1820 wanted = b""
1821 if repo._wanted_sidedata:
1821 if repo._wanted_sidedata:
1822 wanted = b','.join(
1822 wanted = b','.join(
1823 pycompat.bytestr(c) for c in sorted(repo._wanted_sidedata)
1823 pycompat.bytestr(c) for c in sorted(repo._wanted_sidedata)
1824 )
1824 )
1825 return wanted
1825 return wanted
1826
1826
1827
1827
1828 def read_remote_wanted_sidedata(remote):
1828 def read_remote_wanted_sidedata(remote):
1829 sidedata_categories = remote.capable(b'exp-wanted-sidedata')
1829 sidedata_categories = remote.capable(b'exp-wanted-sidedata')
1830 return read_wanted_sidedata(sidedata_categories)
1830 return read_wanted_sidedata(sidedata_categories)
1831
1831
1832
1832
1833 def read_wanted_sidedata(formatted):
1833 def read_wanted_sidedata(formatted):
1834 if formatted:
1834 if formatted:
1835 return set(formatted.split(b','))
1835 return set(formatted.split(b','))
1836 return set()
1836 return set()
1837
1837
1838
1838
1839 def addpartbundlestream2(bundler, repo, **kwargs):
1839 def addpartbundlestream2(bundler, repo, **kwargs):
1840 if not kwargs.get('stream', False):
1840 if not kwargs.get('stream', False):
1841 return
1841 return
1842
1842
1843 if not streamclone.allowservergeneration(repo):
1843 if not streamclone.allowservergeneration(repo):
1844 raise error.Abort(
1844 raise error.Abort(
1845 _(
1845 _(
1846 b'stream data requested but server does not allow '
1846 b'stream data requested but server does not allow '
1847 b'this feature'
1847 b'this feature'
1848 ),
1848 ),
1849 hint=_(
1849 hint=_(
1850 b'well-behaved clients should not be '
1850 b'well-behaved clients should not be '
1851 b'requesting stream data from servers not '
1851 b'requesting stream data from servers not '
1852 b'advertising it; the client may be buggy'
1852 b'advertising it; the client may be buggy'
1853 ),
1853 ),
1854 )
1854 )
1855
1855
1856 # Stream clones don't compress well. And compression undermines a
1856 # Stream clones don't compress well. And compression undermines a
1857 # goal of stream clones, which is to be fast. Communicate the desire
1857 # goal of stream clones, which is to be fast. Communicate the desire
1858 # to avoid compression to consumers of the bundle.
1858 # to avoid compression to consumers of the bundle.
1859 bundler.prefercompressed = False
1859 bundler.prefercompressed = False
1860
1860
1861 # get the includes and excludes
1861 # get the includes and excludes
1862 includepats = kwargs.get('includepats')
1862 includepats = kwargs.get('includepats')
1863 excludepats = kwargs.get('excludepats')
1863 excludepats = kwargs.get('excludepats')
1864
1864
1865 narrowstream = repo.ui.configbool(
1865 narrowstream = repo.ui.configbool(
1866 b'experimental', b'server.stream-narrow-clones'
1866 b'experimental', b'server.stream-narrow-clones'
1867 )
1867 )
1868
1868
1869 if (includepats or excludepats) and not narrowstream:
1869 if (includepats or excludepats) and not narrowstream:
1870 raise error.Abort(_(b'server does not support narrow stream clones'))
1870 raise error.Abort(_(b'server does not support narrow stream clones'))
1871
1871
1872 includeobsmarkers = False
1872 includeobsmarkers = False
1873 if repo.obsstore:
1873 if repo.obsstore:
1874 remoteversions = obsmarkersversion(bundler.capabilities)
1874 remoteversions = obsmarkersversion(bundler.capabilities)
1875 if not remoteversions:
1875 if not remoteversions:
1876 raise error.Abort(
1876 raise error.Abort(
1877 _(
1877 _(
1878 b'server has obsolescence markers, but client '
1878 b'server has obsolescence markers, but client '
1879 b'cannot receive them via stream clone'
1879 b'cannot receive them via stream clone'
1880 )
1880 )
1881 )
1881 )
1882 elif repo.obsstore._version in remoteversions:
1882 elif repo.obsstore._version in remoteversions:
1883 includeobsmarkers = True
1883 includeobsmarkers = True
1884
1884
1885 filecount, bytecount, it = streamclone.generatev2(
1885 filecount, bytecount, it = streamclone.generatev2(
1886 repo, includepats, excludepats, includeobsmarkers
1886 repo, includepats, excludepats, includeobsmarkers
1887 )
1887 )
1888 requirements = _formatrequirementsspec(repo.requirements)
1888 requirements = _formatrequirementsspec(repo.requirements)
1889 part = bundler.newpart(b'stream2', data=it)
1889 part = bundler.newpart(b'stream2', data=it)
1890 part.addparam(b'bytecount', b'%d' % bytecount, mandatory=True)
1890 part.addparam(b'bytecount', b'%d' % bytecount, mandatory=True)
1891 part.addparam(b'filecount', b'%d' % filecount, mandatory=True)
1891 part.addparam(b'filecount', b'%d' % filecount, mandatory=True)
1892 part.addparam(b'requirements', requirements, mandatory=True)
1892 part.addparam(b'requirements', requirements, mandatory=True)
1893
1893
1894
1894
1895 def buildobsmarkerspart(bundler, markers, mandatory=True):
1895 def buildobsmarkerspart(bundler, markers, mandatory=True):
1896 """add an obsmarker part to the bundler with <markers>
1896 """add an obsmarker part to the bundler with <markers>
1897
1897
1898 No part is created if markers is empty.
1898 No part is created if markers is empty.
1899 Raises ValueError if the bundler doesn't support any known obsmarker format.
1899 Raises ValueError if the bundler doesn't support any known obsmarker format.
1900 """
1900 """
1901 if not markers:
1901 if not markers:
1902 return None
1902 return None
1903
1903
1904 remoteversions = obsmarkersversion(bundler.capabilities)
1904 remoteversions = obsmarkersversion(bundler.capabilities)
1905 version = obsolete.commonversion(remoteversions)
1905 version = obsolete.commonversion(remoteversions)
1906 if version is None:
1906 if version is None:
1907 raise ValueError(b'bundler does not support common obsmarker format')
1907 raise ValueError(b'bundler does not support common obsmarker format')
1908 stream = obsolete.encodemarkers(markers, True, version=version)
1908 stream = obsolete.encodemarkers(markers, True, version=version)
1909 return bundler.newpart(b'obsmarkers', data=stream, mandatory=mandatory)
1909 return bundler.newpart(b'obsmarkers', data=stream, mandatory=mandatory)
1910
1910
1911
1911
1912 def writebundle(
1912 def writebundle(
1913 ui, cg, filename, bundletype, vfs=None, compression=None, compopts=None
1913 ui, cg, filename, bundletype, vfs=None, compression=None, compopts=None
1914 ):
1914 ):
1915 """Write a bundle file and return its filename.
1915 """Write a bundle file and return its filename.
1916
1916
1917 Existing files will not be overwritten.
1917 Existing files will not be overwritten.
1918 If no filename is specified, a temporary file is created.
1918 If no filename is specified, a temporary file is created.
1919 bz2 compression can be turned off.
1919 bz2 compression can be turned off.
1920 The bundle file will be deleted in case of errors.
1920 The bundle file will be deleted in case of errors.
1921 """
1921 """
1922
1922
1923 if bundletype == b"HG20":
1923 if bundletype == b"HG20":
1924 bundle = bundle20(ui)
1924 bundle = bundle20(ui)
1925 bundle.setcompression(compression, compopts)
1925 bundle.setcompression(compression, compopts)
1926 part = bundle.newpart(b'changegroup', data=cg.getchunks())
1926 part = bundle.newpart(b'changegroup', data=cg.getchunks())
1927 part.addparam(b'version', cg.version)
1927 part.addparam(b'version', cg.version)
1928 if b'clcount' in cg.extras:
1928 if b'clcount' in cg.extras:
1929 part.addparam(
1929 part.addparam(
1930 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1930 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1931 )
1931 )
1932 chunkiter = bundle.getchunks()
1932 chunkiter = bundle.getchunks()
1933 else:
1933 else:
1934 # compression argument is only for the bundle2 case
1934 # compression argument is only for the bundle2 case
1935 assert compression is None
1935 assert compression is None
1936 if cg.version != b'01':
1936 if cg.version != b'01':
1937 raise error.Abort(
1937 raise error.Abort(
1938 _(b'old bundle types only supports v1 changegroups')
1938 _(b'old bundle types only supports v1 changegroups')
1939 )
1939 )
1940 header, comp = bundletypes[bundletype]
1940 header, comp = bundletypes[bundletype]
1941 if comp not in util.compengines.supportedbundletypes:
1941 if comp not in util.compengines.supportedbundletypes:
1942 raise error.Abort(_(b'unknown stream compression type: %s') % comp)
1942 raise error.Abort(_(b'unknown stream compression type: %s') % comp)
1943 compengine = util.compengines.forbundletype(comp)
1943 compengine = util.compengines.forbundletype(comp)
1944
1944
1945 def chunkiter():
1945 def chunkiter():
1946 yield header
1946 yield header
1947 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1947 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1948 yield chunk
1948 yield chunk
1949
1949
1950 chunkiter = chunkiter()
1950 chunkiter = chunkiter()
1951
1951
1952 # parse the changegroup data, otherwise we will block
1952 # parse the changegroup data, otherwise we will block
1953 # in case of sshrepo because we don't know the end of the stream
1953 # in case of sshrepo because we don't know the end of the stream
1954 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1954 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1955
1955
1956
1956
1957 def combinechangegroupresults(op):
1957 def combinechangegroupresults(op):
1958 """logic to combine 0 or more addchangegroup results into one"""
1958 """logic to combine 0 or more addchangegroup results into one"""
1959 results = [r.get(b'return', 0) for r in op.records[b'changegroup']]
1959 results = [r.get(b'return', 0) for r in op.records[b'changegroup']]
1960 changedheads = 0
1960 changedheads = 0
1961 result = 1
1961 result = 1
1962 for ret in results:
1962 for ret in results:
1963 # If any changegroup result is 0, return 0
1963 # If any changegroup result is 0, return 0
1964 if ret == 0:
1964 if ret == 0:
1965 result = 0
1965 result = 0
1966 break
1966 break
1967 if ret < -1:
1967 if ret < -1:
1968 changedheads += ret + 1
1968 changedheads += ret + 1
1969 elif ret > 1:
1969 elif ret > 1:
1970 changedheads += ret - 1
1970 changedheads += ret - 1
1971 if changedheads > 0:
1971 if changedheads > 0:
1972 result = 1 + changedheads
1972 result = 1 + changedheads
1973 elif changedheads < 0:
1973 elif changedheads < 0:
1974 result = -1 + changedheads
1974 result = -1 + changedheads
1975 return result
1975 return result
1976
1976
1977
1977
1978 @parthandler(
1978 @parthandler(
1979 b'changegroup',
1979 b'changegroup',
1980 (
1980 (
1981 b'version',
1981 b'version',
1982 b'nbchanges',
1982 b'nbchanges',
1983 b'exp-sidedata',
1983 b'exp-sidedata',
1984 b'exp-wanted-sidedata',
1984 b'exp-wanted-sidedata',
1985 b'treemanifest',
1985 b'treemanifest',
1986 b'targetphase',
1986 b'targetphase',
1987 ),
1987 ),
1988 )
1988 )
1989 def handlechangegroup(op, inpart):
1989 def handlechangegroup(op, inpart):
1990 """apply a changegroup part on the repo"""
1990 """apply a changegroup part on the repo"""
1991 from . import localrepo
1991 from . import localrepo
1992
1992
1993 tr = op.gettransaction()
1993 tr = op.gettransaction()
1994 unpackerversion = inpart.params.get(b'version', b'01')
1994 unpackerversion = inpart.params.get(b'version', b'01')
1995 # We should raise an appropriate exception here
1995 # We should raise an appropriate exception here
1996 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1996 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1997 # the source and url passed here are overwritten by the one contained in
1997 # the source and url passed here are overwritten by the one contained in
1998 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1998 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1999 nbchangesets = None
1999 nbchangesets = None
2000 if b'nbchanges' in inpart.params:
2000 if b'nbchanges' in inpart.params:
2001 nbchangesets = int(inpart.params.get(b'nbchanges'))
2001 nbchangesets = int(inpart.params.get(b'nbchanges'))
2002 if b'treemanifest' in inpart.params and not scmutil.istreemanifest(op.repo):
2002 if b'treemanifest' in inpart.params and not scmutil.istreemanifest(op.repo):
2003 if len(op.repo.changelog) != 0:
2003 if len(op.repo.changelog) != 0:
2004 raise error.Abort(
2004 raise error.Abort(
2005 _(
2005 _(
2006 b"bundle contains tree manifests, but local repo is "
2006 b"bundle contains tree manifests, but local repo is "
2007 b"non-empty and does not use tree manifests"
2007 b"non-empty and does not use tree manifests"
2008 )
2008 )
2009 )
2009 )
2010 op.repo.requirements.add(requirements.TREEMANIFEST_REQUIREMENT)
2010 op.repo.requirements.add(requirements.TREEMANIFEST_REQUIREMENT)
2011 op.repo.svfs.options = localrepo.resolvestorevfsoptions(
2011 op.repo.svfs.options = localrepo.resolvestorevfsoptions(
2012 op.repo.ui, op.repo.requirements, op.repo.features
2012 op.repo.ui, op.repo.requirements, op.repo.features
2013 )
2013 )
2014 scmutil.writereporequirements(op.repo)
2014 scmutil.writereporequirements(op.repo)
2015
2015
2016 bundlesidedata = bool(b'exp-sidedata' in inpart.params)
2017 reposidedata = bool(b'exp-sidedata-flag' in op.repo.requirements)
2018 if reposidedata and not bundlesidedata:
2019 msg = b"repository is using sidedata but the bundle source do not"
2020 hint = b'this is currently unsupported'
2021 raise error.Abort(msg, hint=hint)
2022
2023 extrakwargs = {}
2016 extrakwargs = {}
2024 targetphase = inpart.params.get(b'targetphase')
2017 targetphase = inpart.params.get(b'targetphase')
2025 if targetphase is not None:
2018 if targetphase is not None:
2026 extrakwargs['targetphase'] = int(targetphase)
2019 extrakwargs['targetphase'] = int(targetphase)
2027
2020
2028 remote_sidedata = inpart.params.get(b'exp-wanted-sidedata')
2021 remote_sidedata = inpart.params.get(b'exp-wanted-sidedata')
2029 extrakwargs['sidedata_categories'] = read_wanted_sidedata(remote_sidedata)
2022 extrakwargs['sidedata_categories'] = read_wanted_sidedata(remote_sidedata)
2030
2023
2031 ret = _processchangegroup(
2024 ret = _processchangegroup(
2032 op,
2025 op,
2033 cg,
2026 cg,
2034 tr,
2027 tr,
2035 op.source,
2028 op.source,
2036 b'bundle2',
2029 b'bundle2',
2037 expectedtotal=nbchangesets,
2030 expectedtotal=nbchangesets,
2038 **extrakwargs
2031 **extrakwargs
2039 )
2032 )
2040 if op.reply is not None:
2033 if op.reply is not None:
2041 # This is definitely not the final form of this
2034 # This is definitely not the final form of this
2042 # return. But one need to start somewhere.
2035 # return. But one need to start somewhere.
2043 part = op.reply.newpart(b'reply:changegroup', mandatory=False)
2036 part = op.reply.newpart(b'reply:changegroup', mandatory=False)
2044 part.addparam(
2037 part.addparam(
2045 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2038 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2046 )
2039 )
2047 part.addparam(b'return', b'%i' % ret, mandatory=False)
2040 part.addparam(b'return', b'%i' % ret, mandatory=False)
2048 assert not inpart.read()
2041 assert not inpart.read()
2049
2042
2050
2043
2051 _remotechangegroupparams = tuple(
2044 _remotechangegroupparams = tuple(
2052 [b'url', b'size', b'digests']
2045 [b'url', b'size', b'digests']
2053 + [b'digest:%s' % k for k in util.DIGESTS.keys()]
2046 + [b'digest:%s' % k for k in util.DIGESTS.keys()]
2054 )
2047 )
2055
2048
2056
2049
2057 @parthandler(b'remote-changegroup', _remotechangegroupparams)
2050 @parthandler(b'remote-changegroup', _remotechangegroupparams)
2058 def handleremotechangegroup(op, inpart):
2051 def handleremotechangegroup(op, inpart):
2059 """apply a bundle10 on the repo, given an url and validation information
2052 """apply a bundle10 on the repo, given an url and validation information
2060
2053
2061 All the information about the remote bundle to import are given as
2054 All the information about the remote bundle to import are given as
2062 parameters. The parameters include:
2055 parameters. The parameters include:
2063 - url: the url to the bundle10.
2056 - url: the url to the bundle10.
2064 - size: the bundle10 file size. It is used to validate what was
2057 - size: the bundle10 file size. It is used to validate what was
2065 retrieved by the client matches the server knowledge about the bundle.
2058 retrieved by the client matches the server knowledge about the bundle.
2066 - digests: a space separated list of the digest types provided as
2059 - digests: a space separated list of the digest types provided as
2067 parameters.
2060 parameters.
2068 - digest:<digest-type>: the hexadecimal representation of the digest with
2061 - digest:<digest-type>: the hexadecimal representation of the digest with
2069 that name. Like the size, it is used to validate what was retrieved by
2062 that name. Like the size, it is used to validate what was retrieved by
2070 the client matches what the server knows about the bundle.
2063 the client matches what the server knows about the bundle.
2071
2064
2072 When multiple digest types are given, all of them are checked.
2065 When multiple digest types are given, all of them are checked.
2073 """
2066 """
2074 try:
2067 try:
2075 raw_url = inpart.params[b'url']
2068 raw_url = inpart.params[b'url']
2076 except KeyError:
2069 except KeyError:
2077 raise error.Abort(_(b'remote-changegroup: missing "%s" param') % b'url')
2070 raise error.Abort(_(b'remote-changegroup: missing "%s" param') % b'url')
2078 parsed_url = urlutil.url(raw_url)
2071 parsed_url = urlutil.url(raw_url)
2079 if parsed_url.scheme not in capabilities[b'remote-changegroup']:
2072 if parsed_url.scheme not in capabilities[b'remote-changegroup']:
2080 raise error.Abort(
2073 raise error.Abort(
2081 _(b'remote-changegroup does not support %s urls')
2074 _(b'remote-changegroup does not support %s urls')
2082 % parsed_url.scheme
2075 % parsed_url.scheme
2083 )
2076 )
2084
2077
2085 try:
2078 try:
2086 size = int(inpart.params[b'size'])
2079 size = int(inpart.params[b'size'])
2087 except ValueError:
2080 except ValueError:
2088 raise error.Abort(
2081 raise error.Abort(
2089 _(b'remote-changegroup: invalid value for param "%s"') % b'size'
2082 _(b'remote-changegroup: invalid value for param "%s"') % b'size'
2090 )
2083 )
2091 except KeyError:
2084 except KeyError:
2092 raise error.Abort(
2085 raise error.Abort(
2093 _(b'remote-changegroup: missing "%s" param') % b'size'
2086 _(b'remote-changegroup: missing "%s" param') % b'size'
2094 )
2087 )
2095
2088
2096 digests = {}
2089 digests = {}
2097 for typ in inpart.params.get(b'digests', b'').split():
2090 for typ in inpart.params.get(b'digests', b'').split():
2098 param = b'digest:%s' % typ
2091 param = b'digest:%s' % typ
2099 try:
2092 try:
2100 value = inpart.params[param]
2093 value = inpart.params[param]
2101 except KeyError:
2094 except KeyError:
2102 raise error.Abort(
2095 raise error.Abort(
2103 _(b'remote-changegroup: missing "%s" param') % param
2096 _(b'remote-changegroup: missing "%s" param') % param
2104 )
2097 )
2105 digests[typ] = value
2098 digests[typ] = value
2106
2099
2107 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
2100 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
2108
2101
2109 tr = op.gettransaction()
2102 tr = op.gettransaction()
2110 from . import exchange
2103 from . import exchange
2111
2104
2112 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
2105 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
2113 if not isinstance(cg, changegroup.cg1unpacker):
2106 if not isinstance(cg, changegroup.cg1unpacker):
2114 raise error.Abort(
2107 raise error.Abort(
2115 _(b'%s: not a bundle version 1.0') % urlutil.hidepassword(raw_url)
2108 _(b'%s: not a bundle version 1.0') % urlutil.hidepassword(raw_url)
2116 )
2109 )
2117 ret = _processchangegroup(op, cg, tr, op.source, b'bundle2')
2110 ret = _processchangegroup(op, cg, tr, op.source, b'bundle2')
2118 if op.reply is not None:
2111 if op.reply is not None:
2119 # This is definitely not the final form of this
2112 # This is definitely not the final form of this
2120 # return. But one need to start somewhere.
2113 # return. But one need to start somewhere.
2121 part = op.reply.newpart(b'reply:changegroup')
2114 part = op.reply.newpart(b'reply:changegroup')
2122 part.addparam(
2115 part.addparam(
2123 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2116 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2124 )
2117 )
2125 part.addparam(b'return', b'%i' % ret, mandatory=False)
2118 part.addparam(b'return', b'%i' % ret, mandatory=False)
2126 try:
2119 try:
2127 real_part.validate()
2120 real_part.validate()
2128 except error.Abort as e:
2121 except error.Abort as e:
2129 raise error.Abort(
2122 raise error.Abort(
2130 _(b'bundle at %s is corrupted:\n%s')
2123 _(b'bundle at %s is corrupted:\n%s')
2131 % (urlutil.hidepassword(raw_url), e.message)
2124 % (urlutil.hidepassword(raw_url), e.message)
2132 )
2125 )
2133 assert not inpart.read()
2126 assert not inpart.read()
2134
2127
2135
2128
2136 @parthandler(b'reply:changegroup', (b'return', b'in-reply-to'))
2129 @parthandler(b'reply:changegroup', (b'return', b'in-reply-to'))
2137 def handlereplychangegroup(op, inpart):
2130 def handlereplychangegroup(op, inpart):
2138 ret = int(inpart.params[b'return'])
2131 ret = int(inpart.params[b'return'])
2139 replyto = int(inpart.params[b'in-reply-to'])
2132 replyto = int(inpart.params[b'in-reply-to'])
2140 op.records.add(b'changegroup', {b'return': ret}, replyto)
2133 op.records.add(b'changegroup', {b'return': ret}, replyto)
2141
2134
2142
2135
2143 @parthandler(b'check:bookmarks')
2136 @parthandler(b'check:bookmarks')
2144 def handlecheckbookmarks(op, inpart):
2137 def handlecheckbookmarks(op, inpart):
2145 """check location of bookmarks
2138 """check location of bookmarks
2146
2139
2147 This part is to be used to detect push race regarding bookmark, it
2140 This part is to be used to detect push race regarding bookmark, it
2148 contains binary encoded (bookmark, node) tuple. If the local state does
2141 contains binary encoded (bookmark, node) tuple. If the local state does
2149 not marks the one in the part, a PushRaced exception is raised
2142 not marks the one in the part, a PushRaced exception is raised
2150 """
2143 """
2151 bookdata = bookmarks.binarydecode(op.repo, inpart)
2144 bookdata = bookmarks.binarydecode(op.repo, inpart)
2152
2145
2153 msgstandard = (
2146 msgstandard = (
2154 b'remote repository changed while pushing - please try again '
2147 b'remote repository changed while pushing - please try again '
2155 b'(bookmark "%s" move from %s to %s)'
2148 b'(bookmark "%s" move from %s to %s)'
2156 )
2149 )
2157 msgmissing = (
2150 msgmissing = (
2158 b'remote repository changed while pushing - please try again '
2151 b'remote repository changed while pushing - please try again '
2159 b'(bookmark "%s" is missing, expected %s)'
2152 b'(bookmark "%s" is missing, expected %s)'
2160 )
2153 )
2161 msgexist = (
2154 msgexist = (
2162 b'remote repository changed while pushing - please try again '
2155 b'remote repository changed while pushing - please try again '
2163 b'(bookmark "%s" set on %s, expected missing)'
2156 b'(bookmark "%s" set on %s, expected missing)'
2164 )
2157 )
2165 for book, node in bookdata:
2158 for book, node in bookdata:
2166 currentnode = op.repo._bookmarks.get(book)
2159 currentnode = op.repo._bookmarks.get(book)
2167 if currentnode != node:
2160 if currentnode != node:
2168 if node is None:
2161 if node is None:
2169 finalmsg = msgexist % (book, short(currentnode))
2162 finalmsg = msgexist % (book, short(currentnode))
2170 elif currentnode is None:
2163 elif currentnode is None:
2171 finalmsg = msgmissing % (book, short(node))
2164 finalmsg = msgmissing % (book, short(node))
2172 else:
2165 else:
2173 finalmsg = msgstandard % (
2166 finalmsg = msgstandard % (
2174 book,
2167 book,
2175 short(node),
2168 short(node),
2176 short(currentnode),
2169 short(currentnode),
2177 )
2170 )
2178 raise error.PushRaced(finalmsg)
2171 raise error.PushRaced(finalmsg)
2179
2172
2180
2173
2181 @parthandler(b'check:heads')
2174 @parthandler(b'check:heads')
2182 def handlecheckheads(op, inpart):
2175 def handlecheckheads(op, inpart):
2183 """check that head of the repo did not change
2176 """check that head of the repo did not change
2184
2177
2185 This is used to detect a push race when using unbundle.
2178 This is used to detect a push race when using unbundle.
2186 This replaces the "heads" argument of unbundle."""
2179 This replaces the "heads" argument of unbundle."""
2187 h = inpart.read(20)
2180 h = inpart.read(20)
2188 heads = []
2181 heads = []
2189 while len(h) == 20:
2182 while len(h) == 20:
2190 heads.append(h)
2183 heads.append(h)
2191 h = inpart.read(20)
2184 h = inpart.read(20)
2192 assert not h
2185 assert not h
2193 # Trigger a transaction so that we are guaranteed to have the lock now.
2186 # Trigger a transaction so that we are guaranteed to have the lock now.
2194 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2187 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2195 op.gettransaction()
2188 op.gettransaction()
2196 if sorted(heads) != sorted(op.repo.heads()):
2189 if sorted(heads) != sorted(op.repo.heads()):
2197 raise error.PushRaced(
2190 raise error.PushRaced(
2198 b'remote repository changed while pushing - please try again'
2191 b'remote repository changed while pushing - please try again'
2199 )
2192 )
2200
2193
2201
2194
2202 @parthandler(b'check:updated-heads')
2195 @parthandler(b'check:updated-heads')
2203 def handlecheckupdatedheads(op, inpart):
2196 def handlecheckupdatedheads(op, inpart):
2204 """check for race on the heads touched by a push
2197 """check for race on the heads touched by a push
2205
2198
2206 This is similar to 'check:heads' but focus on the heads actually updated
2199 This is similar to 'check:heads' but focus on the heads actually updated
2207 during the push. If other activities happen on unrelated heads, it is
2200 during the push. If other activities happen on unrelated heads, it is
2208 ignored.
2201 ignored.
2209
2202
2210 This allow server with high traffic to avoid push contention as long as
2203 This allow server with high traffic to avoid push contention as long as
2211 unrelated parts of the graph are involved."""
2204 unrelated parts of the graph are involved."""
2212 h = inpart.read(20)
2205 h = inpart.read(20)
2213 heads = []
2206 heads = []
2214 while len(h) == 20:
2207 while len(h) == 20:
2215 heads.append(h)
2208 heads.append(h)
2216 h = inpart.read(20)
2209 h = inpart.read(20)
2217 assert not h
2210 assert not h
2218 # trigger a transaction so that we are guaranteed to have the lock now.
2211 # trigger a transaction so that we are guaranteed to have the lock now.
2219 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2212 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2220 op.gettransaction()
2213 op.gettransaction()
2221
2214
2222 currentheads = set()
2215 currentheads = set()
2223 for ls in op.repo.branchmap().iterheads():
2216 for ls in op.repo.branchmap().iterheads():
2224 currentheads.update(ls)
2217 currentheads.update(ls)
2225
2218
2226 for h in heads:
2219 for h in heads:
2227 if h not in currentheads:
2220 if h not in currentheads:
2228 raise error.PushRaced(
2221 raise error.PushRaced(
2229 b'remote repository changed while pushing - '
2222 b'remote repository changed while pushing - '
2230 b'please try again'
2223 b'please try again'
2231 )
2224 )
2232
2225
2233
2226
2234 @parthandler(b'check:phases')
2227 @parthandler(b'check:phases')
2235 def handlecheckphases(op, inpart):
2228 def handlecheckphases(op, inpart):
2236 """check that phase boundaries of the repository did not change
2229 """check that phase boundaries of the repository did not change
2237
2230
2238 This is used to detect a push race.
2231 This is used to detect a push race.
2239 """
2232 """
2240 phasetonodes = phases.binarydecode(inpart)
2233 phasetonodes = phases.binarydecode(inpart)
2241 unfi = op.repo.unfiltered()
2234 unfi = op.repo.unfiltered()
2242 cl = unfi.changelog
2235 cl = unfi.changelog
2243 phasecache = unfi._phasecache
2236 phasecache = unfi._phasecache
2244 msg = (
2237 msg = (
2245 b'remote repository changed while pushing - please try again '
2238 b'remote repository changed while pushing - please try again '
2246 b'(%s is %s expected %s)'
2239 b'(%s is %s expected %s)'
2247 )
2240 )
2248 for expectedphase, nodes in pycompat.iteritems(phasetonodes):
2241 for expectedphase, nodes in pycompat.iteritems(phasetonodes):
2249 for n in nodes:
2242 for n in nodes:
2250 actualphase = phasecache.phase(unfi, cl.rev(n))
2243 actualphase = phasecache.phase(unfi, cl.rev(n))
2251 if actualphase != expectedphase:
2244 if actualphase != expectedphase:
2252 finalmsg = msg % (
2245 finalmsg = msg % (
2253 short(n),
2246 short(n),
2254 phases.phasenames[actualphase],
2247 phases.phasenames[actualphase],
2255 phases.phasenames[expectedphase],
2248 phases.phasenames[expectedphase],
2256 )
2249 )
2257 raise error.PushRaced(finalmsg)
2250 raise error.PushRaced(finalmsg)
2258
2251
2259
2252
2260 @parthandler(b'output')
2253 @parthandler(b'output')
2261 def handleoutput(op, inpart):
2254 def handleoutput(op, inpart):
2262 """forward output captured on the server to the client"""
2255 """forward output captured on the server to the client"""
2263 for line in inpart.read().splitlines():
2256 for line in inpart.read().splitlines():
2264 op.ui.status(_(b'remote: %s\n') % line)
2257 op.ui.status(_(b'remote: %s\n') % line)
2265
2258
2266
2259
2267 @parthandler(b'replycaps')
2260 @parthandler(b'replycaps')
2268 def handlereplycaps(op, inpart):
2261 def handlereplycaps(op, inpart):
2269 """Notify that a reply bundle should be created
2262 """Notify that a reply bundle should be created
2270
2263
2271 The payload contains the capabilities information for the reply"""
2264 The payload contains the capabilities information for the reply"""
2272 caps = decodecaps(inpart.read())
2265 caps = decodecaps(inpart.read())
2273 if op.reply is None:
2266 if op.reply is None:
2274 op.reply = bundle20(op.ui, caps)
2267 op.reply = bundle20(op.ui, caps)
2275
2268
2276
2269
2277 class AbortFromPart(error.Abort):
2270 class AbortFromPart(error.Abort):
2278 """Sub-class of Abort that denotes an error from a bundle2 part."""
2271 """Sub-class of Abort that denotes an error from a bundle2 part."""
2279
2272
2280
2273
2281 @parthandler(b'error:abort', (b'message', b'hint'))
2274 @parthandler(b'error:abort', (b'message', b'hint'))
2282 def handleerrorabort(op, inpart):
2275 def handleerrorabort(op, inpart):
2283 """Used to transmit abort error over the wire"""
2276 """Used to transmit abort error over the wire"""
2284 raise AbortFromPart(
2277 raise AbortFromPart(
2285 inpart.params[b'message'], hint=inpart.params.get(b'hint')
2278 inpart.params[b'message'], hint=inpart.params.get(b'hint')
2286 )
2279 )
2287
2280
2288
2281
2289 @parthandler(
2282 @parthandler(
2290 b'error:pushkey',
2283 b'error:pushkey',
2291 (b'namespace', b'key', b'new', b'old', b'ret', b'in-reply-to'),
2284 (b'namespace', b'key', b'new', b'old', b'ret', b'in-reply-to'),
2292 )
2285 )
2293 def handleerrorpushkey(op, inpart):
2286 def handleerrorpushkey(op, inpart):
2294 """Used to transmit failure of a mandatory pushkey over the wire"""
2287 """Used to transmit failure of a mandatory pushkey over the wire"""
2295 kwargs = {}
2288 kwargs = {}
2296 for name in (b'namespace', b'key', b'new', b'old', b'ret'):
2289 for name in (b'namespace', b'key', b'new', b'old', b'ret'):
2297 value = inpart.params.get(name)
2290 value = inpart.params.get(name)
2298 if value is not None:
2291 if value is not None:
2299 kwargs[name] = value
2292 kwargs[name] = value
2300 raise error.PushkeyFailed(
2293 raise error.PushkeyFailed(
2301 inpart.params[b'in-reply-to'], **pycompat.strkwargs(kwargs)
2294 inpart.params[b'in-reply-to'], **pycompat.strkwargs(kwargs)
2302 )
2295 )
2303
2296
2304
2297
2305 @parthandler(b'error:unsupportedcontent', (b'parttype', b'params'))
2298 @parthandler(b'error:unsupportedcontent', (b'parttype', b'params'))
2306 def handleerrorunsupportedcontent(op, inpart):
2299 def handleerrorunsupportedcontent(op, inpart):
2307 """Used to transmit unknown content error over the wire"""
2300 """Used to transmit unknown content error over the wire"""
2308 kwargs = {}
2301 kwargs = {}
2309 parttype = inpart.params.get(b'parttype')
2302 parttype = inpart.params.get(b'parttype')
2310 if parttype is not None:
2303 if parttype is not None:
2311 kwargs[b'parttype'] = parttype
2304 kwargs[b'parttype'] = parttype
2312 params = inpart.params.get(b'params')
2305 params = inpart.params.get(b'params')
2313 if params is not None:
2306 if params is not None:
2314 kwargs[b'params'] = params.split(b'\0')
2307 kwargs[b'params'] = params.split(b'\0')
2315
2308
2316 raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
2309 raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
2317
2310
2318
2311
2319 @parthandler(b'error:pushraced', (b'message',))
2312 @parthandler(b'error:pushraced', (b'message',))
2320 def handleerrorpushraced(op, inpart):
2313 def handleerrorpushraced(op, inpart):
2321 """Used to transmit push race error over the wire"""
2314 """Used to transmit push race error over the wire"""
2322 raise error.ResponseError(_(b'push failed:'), inpart.params[b'message'])
2315 raise error.ResponseError(_(b'push failed:'), inpart.params[b'message'])
2323
2316
2324
2317
2325 @parthandler(b'listkeys', (b'namespace',))
2318 @parthandler(b'listkeys', (b'namespace',))
2326 def handlelistkeys(op, inpart):
2319 def handlelistkeys(op, inpart):
2327 """retrieve pushkey namespace content stored in a bundle2"""
2320 """retrieve pushkey namespace content stored in a bundle2"""
2328 namespace = inpart.params[b'namespace']
2321 namespace = inpart.params[b'namespace']
2329 r = pushkey.decodekeys(inpart.read())
2322 r = pushkey.decodekeys(inpart.read())
2330 op.records.add(b'listkeys', (namespace, r))
2323 op.records.add(b'listkeys', (namespace, r))
2331
2324
2332
2325
2333 @parthandler(b'pushkey', (b'namespace', b'key', b'old', b'new'))
2326 @parthandler(b'pushkey', (b'namespace', b'key', b'old', b'new'))
2334 def handlepushkey(op, inpart):
2327 def handlepushkey(op, inpart):
2335 """process a pushkey request"""
2328 """process a pushkey request"""
2336 dec = pushkey.decode
2329 dec = pushkey.decode
2337 namespace = dec(inpart.params[b'namespace'])
2330 namespace = dec(inpart.params[b'namespace'])
2338 key = dec(inpart.params[b'key'])
2331 key = dec(inpart.params[b'key'])
2339 old = dec(inpart.params[b'old'])
2332 old = dec(inpart.params[b'old'])
2340 new = dec(inpart.params[b'new'])
2333 new = dec(inpart.params[b'new'])
2341 # Grab the transaction to ensure that we have the lock before performing the
2334 # Grab the transaction to ensure that we have the lock before performing the
2342 # pushkey.
2335 # pushkey.
2343 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2336 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2344 op.gettransaction()
2337 op.gettransaction()
2345 ret = op.repo.pushkey(namespace, key, old, new)
2338 ret = op.repo.pushkey(namespace, key, old, new)
2346 record = {b'namespace': namespace, b'key': key, b'old': old, b'new': new}
2339 record = {b'namespace': namespace, b'key': key, b'old': old, b'new': new}
2347 op.records.add(b'pushkey', record)
2340 op.records.add(b'pushkey', record)
2348 if op.reply is not None:
2341 if op.reply is not None:
2349 rpart = op.reply.newpart(b'reply:pushkey')
2342 rpart = op.reply.newpart(b'reply:pushkey')
2350 rpart.addparam(
2343 rpart.addparam(
2351 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2344 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2352 )
2345 )
2353 rpart.addparam(b'return', b'%i' % ret, mandatory=False)
2346 rpart.addparam(b'return', b'%i' % ret, mandatory=False)
2354 if inpart.mandatory and not ret:
2347 if inpart.mandatory and not ret:
2355 kwargs = {}
2348 kwargs = {}
2356 for key in (b'namespace', b'key', b'new', b'old', b'ret'):
2349 for key in (b'namespace', b'key', b'new', b'old', b'ret'):
2357 if key in inpart.params:
2350 if key in inpart.params:
2358 kwargs[key] = inpart.params[key]
2351 kwargs[key] = inpart.params[key]
2359 raise error.PushkeyFailed(
2352 raise error.PushkeyFailed(
2360 partid=b'%d' % inpart.id, **pycompat.strkwargs(kwargs)
2353 partid=b'%d' % inpart.id, **pycompat.strkwargs(kwargs)
2361 )
2354 )
2362
2355
2363
2356
2364 @parthandler(b'bookmarks')
2357 @parthandler(b'bookmarks')
2365 def handlebookmark(op, inpart):
2358 def handlebookmark(op, inpart):
2366 """transmit bookmark information
2359 """transmit bookmark information
2367
2360
2368 The part contains binary encoded bookmark information.
2361 The part contains binary encoded bookmark information.
2369
2362
2370 The exact behavior of this part can be controlled by the 'bookmarks' mode
2363 The exact behavior of this part can be controlled by the 'bookmarks' mode
2371 on the bundle operation.
2364 on the bundle operation.
2372
2365
2373 When mode is 'apply' (the default) the bookmark information is applied as
2366 When mode is 'apply' (the default) the bookmark information is applied as
2374 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2367 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2375 issued earlier to check for push races in such update. This behavior is
2368 issued earlier to check for push races in such update. This behavior is
2376 suitable for pushing.
2369 suitable for pushing.
2377
2370
2378 When mode is 'records', the information is recorded into the 'bookmarks'
2371 When mode is 'records', the information is recorded into the 'bookmarks'
2379 records of the bundle operation. This behavior is suitable for pulling.
2372 records of the bundle operation. This behavior is suitable for pulling.
2380 """
2373 """
2381 changes = bookmarks.binarydecode(op.repo, inpart)
2374 changes = bookmarks.binarydecode(op.repo, inpart)
2382
2375
2383 pushkeycompat = op.repo.ui.configbool(
2376 pushkeycompat = op.repo.ui.configbool(
2384 b'server', b'bookmarks-pushkey-compat'
2377 b'server', b'bookmarks-pushkey-compat'
2385 )
2378 )
2386 bookmarksmode = op.modes.get(b'bookmarks', b'apply')
2379 bookmarksmode = op.modes.get(b'bookmarks', b'apply')
2387
2380
2388 if bookmarksmode == b'apply':
2381 if bookmarksmode == b'apply':
2389 tr = op.gettransaction()
2382 tr = op.gettransaction()
2390 bookstore = op.repo._bookmarks
2383 bookstore = op.repo._bookmarks
2391 if pushkeycompat:
2384 if pushkeycompat:
2392 allhooks = []
2385 allhooks = []
2393 for book, node in changes:
2386 for book, node in changes:
2394 hookargs = tr.hookargs.copy()
2387 hookargs = tr.hookargs.copy()
2395 hookargs[b'pushkeycompat'] = b'1'
2388 hookargs[b'pushkeycompat'] = b'1'
2396 hookargs[b'namespace'] = b'bookmarks'
2389 hookargs[b'namespace'] = b'bookmarks'
2397 hookargs[b'key'] = book
2390 hookargs[b'key'] = book
2398 hookargs[b'old'] = hex(bookstore.get(book, b''))
2391 hookargs[b'old'] = hex(bookstore.get(book, b''))
2399 hookargs[b'new'] = hex(node if node is not None else b'')
2392 hookargs[b'new'] = hex(node if node is not None else b'')
2400 allhooks.append(hookargs)
2393 allhooks.append(hookargs)
2401
2394
2402 for hookargs in allhooks:
2395 for hookargs in allhooks:
2403 op.repo.hook(
2396 op.repo.hook(
2404 b'prepushkey', throw=True, **pycompat.strkwargs(hookargs)
2397 b'prepushkey', throw=True, **pycompat.strkwargs(hookargs)
2405 )
2398 )
2406
2399
2407 for book, node in changes:
2400 for book, node in changes:
2408 if bookmarks.isdivergent(book):
2401 if bookmarks.isdivergent(book):
2409 msg = _(b'cannot accept divergent bookmark %s!') % book
2402 msg = _(b'cannot accept divergent bookmark %s!') % book
2410 raise error.Abort(msg)
2403 raise error.Abort(msg)
2411
2404
2412 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2405 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2413
2406
2414 if pushkeycompat:
2407 if pushkeycompat:
2415
2408
2416 def runhook(unused_success):
2409 def runhook(unused_success):
2417 for hookargs in allhooks:
2410 for hookargs in allhooks:
2418 op.repo.hook(b'pushkey', **pycompat.strkwargs(hookargs))
2411 op.repo.hook(b'pushkey', **pycompat.strkwargs(hookargs))
2419
2412
2420 op.repo._afterlock(runhook)
2413 op.repo._afterlock(runhook)
2421
2414
2422 elif bookmarksmode == b'records':
2415 elif bookmarksmode == b'records':
2423 for book, node in changes:
2416 for book, node in changes:
2424 record = {b'bookmark': book, b'node': node}
2417 record = {b'bookmark': book, b'node': node}
2425 op.records.add(b'bookmarks', record)
2418 op.records.add(b'bookmarks', record)
2426 else:
2419 else:
2427 raise error.ProgrammingError(
2420 raise error.ProgrammingError(
2428 b'unkown bookmark mode: %s' % bookmarksmode
2421 b'unkown bookmark mode: %s' % bookmarksmode
2429 )
2422 )
2430
2423
2431
2424
2432 @parthandler(b'phase-heads')
2425 @parthandler(b'phase-heads')
2433 def handlephases(op, inpart):
2426 def handlephases(op, inpart):
2434 """apply phases from bundle part to repo"""
2427 """apply phases from bundle part to repo"""
2435 headsbyphase = phases.binarydecode(inpart)
2428 headsbyphase = phases.binarydecode(inpart)
2436 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2429 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2437
2430
2438
2431
2439 @parthandler(b'reply:pushkey', (b'return', b'in-reply-to'))
2432 @parthandler(b'reply:pushkey', (b'return', b'in-reply-to'))
2440 def handlepushkeyreply(op, inpart):
2433 def handlepushkeyreply(op, inpart):
2441 """retrieve the result of a pushkey request"""
2434 """retrieve the result of a pushkey request"""
2442 ret = int(inpart.params[b'return'])
2435 ret = int(inpart.params[b'return'])
2443 partid = int(inpart.params[b'in-reply-to'])
2436 partid = int(inpart.params[b'in-reply-to'])
2444 op.records.add(b'pushkey', {b'return': ret}, partid)
2437 op.records.add(b'pushkey', {b'return': ret}, partid)
2445
2438
2446
2439
2447 @parthandler(b'obsmarkers')
2440 @parthandler(b'obsmarkers')
2448 def handleobsmarker(op, inpart):
2441 def handleobsmarker(op, inpart):
2449 """add a stream of obsmarkers to the repo"""
2442 """add a stream of obsmarkers to the repo"""
2450 tr = op.gettransaction()
2443 tr = op.gettransaction()
2451 markerdata = inpart.read()
2444 markerdata = inpart.read()
2452 if op.ui.config(b'experimental', b'obsmarkers-exchange-debug'):
2445 if op.ui.config(b'experimental', b'obsmarkers-exchange-debug'):
2453 op.ui.writenoi18n(
2446 op.ui.writenoi18n(
2454 b'obsmarker-exchange: %i bytes received\n' % len(markerdata)
2447 b'obsmarker-exchange: %i bytes received\n' % len(markerdata)
2455 )
2448 )
2456 # The mergemarkers call will crash if marker creation is not enabled.
2449 # The mergemarkers call will crash if marker creation is not enabled.
2457 # we want to avoid this if the part is advisory.
2450 # we want to avoid this if the part is advisory.
2458 if not inpart.mandatory and op.repo.obsstore.readonly:
2451 if not inpart.mandatory and op.repo.obsstore.readonly:
2459 op.repo.ui.debug(
2452 op.repo.ui.debug(
2460 b'ignoring obsolescence markers, feature not enabled\n'
2453 b'ignoring obsolescence markers, feature not enabled\n'
2461 )
2454 )
2462 return
2455 return
2463 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2456 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2464 op.repo.invalidatevolatilesets()
2457 op.repo.invalidatevolatilesets()
2465 op.records.add(b'obsmarkers', {b'new': new})
2458 op.records.add(b'obsmarkers', {b'new': new})
2466 if op.reply is not None:
2459 if op.reply is not None:
2467 rpart = op.reply.newpart(b'reply:obsmarkers')
2460 rpart = op.reply.newpart(b'reply:obsmarkers')
2468 rpart.addparam(
2461 rpart.addparam(
2469 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2462 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2470 )
2463 )
2471 rpart.addparam(b'new', b'%i' % new, mandatory=False)
2464 rpart.addparam(b'new', b'%i' % new, mandatory=False)
2472
2465
2473
2466
2474 @parthandler(b'reply:obsmarkers', (b'new', b'in-reply-to'))
2467 @parthandler(b'reply:obsmarkers', (b'new', b'in-reply-to'))
2475 def handleobsmarkerreply(op, inpart):
2468 def handleobsmarkerreply(op, inpart):
2476 """retrieve the result of a pushkey request"""
2469 """retrieve the result of a pushkey request"""
2477 ret = int(inpart.params[b'new'])
2470 ret = int(inpart.params[b'new'])
2478 partid = int(inpart.params[b'in-reply-to'])
2471 partid = int(inpart.params[b'in-reply-to'])
2479 op.records.add(b'obsmarkers', {b'new': ret}, partid)
2472 op.records.add(b'obsmarkers', {b'new': ret}, partid)
2480
2473
2481
2474
2482 @parthandler(b'hgtagsfnodes')
2475 @parthandler(b'hgtagsfnodes')
2483 def handlehgtagsfnodes(op, inpart):
2476 def handlehgtagsfnodes(op, inpart):
2484 """Applies .hgtags fnodes cache entries to the local repo.
2477 """Applies .hgtags fnodes cache entries to the local repo.
2485
2478
2486 Payload is pairs of 20 byte changeset nodes and filenodes.
2479 Payload is pairs of 20 byte changeset nodes and filenodes.
2487 """
2480 """
2488 # Grab the transaction so we ensure that we have the lock at this point.
2481 # Grab the transaction so we ensure that we have the lock at this point.
2489 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2482 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2490 op.gettransaction()
2483 op.gettransaction()
2491 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2484 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2492
2485
2493 count = 0
2486 count = 0
2494 while True:
2487 while True:
2495 node = inpart.read(20)
2488 node = inpart.read(20)
2496 fnode = inpart.read(20)
2489 fnode = inpart.read(20)
2497 if len(node) < 20 or len(fnode) < 20:
2490 if len(node) < 20 or len(fnode) < 20:
2498 op.ui.debug(b'ignoring incomplete received .hgtags fnodes data\n')
2491 op.ui.debug(b'ignoring incomplete received .hgtags fnodes data\n')
2499 break
2492 break
2500 cache.setfnode(node, fnode)
2493 cache.setfnode(node, fnode)
2501 count += 1
2494 count += 1
2502
2495
2503 cache.write()
2496 cache.write()
2504 op.ui.debug(b'applied %i hgtags fnodes cache entries\n' % count)
2497 op.ui.debug(b'applied %i hgtags fnodes cache entries\n' % count)
2505
2498
2506
2499
2507 rbcstruct = struct.Struct(b'>III')
2500 rbcstruct = struct.Struct(b'>III')
2508
2501
2509
2502
2510 @parthandler(b'cache:rev-branch-cache')
2503 @parthandler(b'cache:rev-branch-cache')
2511 def handlerbc(op, inpart):
2504 def handlerbc(op, inpart):
2512 """Legacy part, ignored for compatibility with bundles from or
2505 """Legacy part, ignored for compatibility with bundles from or
2513 for Mercurial before 5.7. Newer Mercurial computes the cache
2506 for Mercurial before 5.7. Newer Mercurial computes the cache
2514 efficiently enough during unbundling that the additional transfer
2507 efficiently enough during unbundling that the additional transfer
2515 is unnecessary."""
2508 is unnecessary."""
2516
2509
2517
2510
2518 @parthandler(b'pushvars')
2511 @parthandler(b'pushvars')
2519 def bundle2getvars(op, part):
2512 def bundle2getvars(op, part):
2520 '''unbundle a bundle2 containing shellvars on the server'''
2513 '''unbundle a bundle2 containing shellvars on the server'''
2521 # An option to disable unbundling on server-side for security reasons
2514 # An option to disable unbundling on server-side for security reasons
2522 if op.ui.configbool(b'push', b'pushvars.server'):
2515 if op.ui.configbool(b'push', b'pushvars.server'):
2523 hookargs = {}
2516 hookargs = {}
2524 for key, value in part.advisoryparams:
2517 for key, value in part.advisoryparams:
2525 key = key.upper()
2518 key = key.upper()
2526 # We want pushed variables to have USERVAR_ prepended so we know
2519 # We want pushed variables to have USERVAR_ prepended so we know
2527 # they came from the --pushvar flag.
2520 # they came from the --pushvar flag.
2528 key = b"USERVAR_" + key
2521 key = b"USERVAR_" + key
2529 hookargs[key] = value
2522 hookargs[key] = value
2530 op.addhookargs(hookargs)
2523 op.addhookargs(hookargs)
2531
2524
2532
2525
2533 @parthandler(b'stream2', (b'requirements', b'filecount', b'bytecount'))
2526 @parthandler(b'stream2', (b'requirements', b'filecount', b'bytecount'))
2534 def handlestreamv2bundle(op, part):
2527 def handlestreamv2bundle(op, part):
2535
2528
2536 requirements = urlreq.unquote(part.params[b'requirements']).split(b',')
2529 requirements = urlreq.unquote(part.params[b'requirements']).split(b',')
2537 filecount = int(part.params[b'filecount'])
2530 filecount = int(part.params[b'filecount'])
2538 bytecount = int(part.params[b'bytecount'])
2531 bytecount = int(part.params[b'bytecount'])
2539
2532
2540 repo = op.repo
2533 repo = op.repo
2541 if len(repo):
2534 if len(repo):
2542 msg = _(b'cannot apply stream clone to non empty repository')
2535 msg = _(b'cannot apply stream clone to non empty repository')
2543 raise error.Abort(msg)
2536 raise error.Abort(msg)
2544
2537
2545 repo.ui.debug(b'applying stream bundle\n')
2538 repo.ui.debug(b'applying stream bundle\n')
2546 streamclone.applybundlev2(repo, part, filecount, bytecount, requirements)
2539 streamclone.applybundlev2(repo, part, filecount, bytecount, requirements)
2547
2540
2548
2541
2549 def widen_bundle(
2542 def widen_bundle(
2550 bundler, repo, oldmatcher, newmatcher, common, known, cgversion, ellipses
2543 bundler, repo, oldmatcher, newmatcher, common, known, cgversion, ellipses
2551 ):
2544 ):
2552 """generates bundle2 for widening a narrow clone
2545 """generates bundle2 for widening a narrow clone
2553
2546
2554 bundler is the bundle to which data should be added
2547 bundler is the bundle to which data should be added
2555 repo is the localrepository instance
2548 repo is the localrepository instance
2556 oldmatcher matches what the client already has
2549 oldmatcher matches what the client already has
2557 newmatcher matches what the client needs (including what it already has)
2550 newmatcher matches what the client needs (including what it already has)
2558 common is set of common heads between server and client
2551 common is set of common heads between server and client
2559 known is a set of revs known on the client side (used in ellipses)
2552 known is a set of revs known on the client side (used in ellipses)
2560 cgversion is the changegroup version to send
2553 cgversion is the changegroup version to send
2561 ellipses is boolean value telling whether to send ellipses data or not
2554 ellipses is boolean value telling whether to send ellipses data or not
2562
2555
2563 returns bundle2 of the data required for extending
2556 returns bundle2 of the data required for extending
2564 """
2557 """
2565 commonnodes = set()
2558 commonnodes = set()
2566 cl = repo.changelog
2559 cl = repo.changelog
2567 for r in repo.revs(b"::%ln", common):
2560 for r in repo.revs(b"::%ln", common):
2568 commonnodes.add(cl.node(r))
2561 commonnodes.add(cl.node(r))
2569 if commonnodes:
2562 if commonnodes:
2570 packer = changegroup.getbundler(
2563 packer = changegroup.getbundler(
2571 cgversion,
2564 cgversion,
2572 repo,
2565 repo,
2573 oldmatcher=oldmatcher,
2566 oldmatcher=oldmatcher,
2574 matcher=newmatcher,
2567 matcher=newmatcher,
2575 fullnodes=commonnodes,
2568 fullnodes=commonnodes,
2576 )
2569 )
2577 cgdata = packer.generate(
2570 cgdata = packer.generate(
2578 {repo.nullid},
2571 {repo.nullid},
2579 list(commonnodes),
2572 list(commonnodes),
2580 False,
2573 False,
2581 b'narrow_widen',
2574 b'narrow_widen',
2582 changelog=False,
2575 changelog=False,
2583 )
2576 )
2584
2577
2585 part = bundler.newpart(b'changegroup', data=cgdata)
2578 part = bundler.newpart(b'changegroup', data=cgdata)
2586 part.addparam(b'version', cgversion)
2579 part.addparam(b'version', cgversion)
2587 if scmutil.istreemanifest(repo):
2580 if scmutil.istreemanifest(repo):
2588 part.addparam(b'treemanifest', b'1')
2581 part.addparam(b'treemanifest', b'1')
2589 if b'exp-sidedata-flag' in repo.requirements:
2582 if b'exp-sidedata-flag' in repo.requirements:
2590 part.addparam(b'exp-sidedata', b'1')
2583 part.addparam(b'exp-sidedata', b'1')
2591 wanted = format_remote_wanted_sidedata(repo)
2584 wanted = format_remote_wanted_sidedata(repo)
2592 part.addparam(b'exp-wanted-sidedata', wanted)
2585 part.addparam(b'exp-wanted-sidedata', wanted)
2593
2586
2594 return bundler
2587 return bundler
General Comments 0
You need to be logged in to leave comments. Login now