##// END OF EJS Templates
bundle2: drop some outdated comment...
marmoute -
r46781:c511fef3 default
parent child Browse files
Show More
@@ -1,2592 +1,2588
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import collections
150 import collections
151 import errno
151 import errno
152 import os
152 import os
153 import re
153 import re
154 import string
154 import string
155 import struct
155 import struct
156 import sys
156 import sys
157
157
158 from .i18n import _
158 from .i18n import _
159 from .node import (
159 from .node import (
160 hex,
160 hex,
161 nullid,
161 nullid,
162 short,
162 short,
163 )
163 )
164 from . import (
164 from . import (
165 bookmarks,
165 bookmarks,
166 changegroup,
166 changegroup,
167 encoding,
167 encoding,
168 error,
168 error,
169 obsolete,
169 obsolete,
170 phases,
170 phases,
171 pushkey,
171 pushkey,
172 pycompat,
172 pycompat,
173 requirements,
173 requirements,
174 scmutil,
174 scmutil,
175 streamclone,
175 streamclone,
176 tags,
176 tags,
177 url,
177 url,
178 util,
178 util,
179 )
179 )
180 from .utils import stringutil
180 from .utils import stringutil
181
181
182 urlerr = util.urlerr
182 urlerr = util.urlerr
183 urlreq = util.urlreq
183 urlreq = util.urlreq
184
184
185 _pack = struct.pack
185 _pack = struct.pack
186 _unpack = struct.unpack
186 _unpack = struct.unpack
187
187
188 _fstreamparamsize = b'>i'
188 _fstreamparamsize = b'>i'
189 _fpartheadersize = b'>i'
189 _fpartheadersize = b'>i'
190 _fparttypesize = b'>B'
190 _fparttypesize = b'>B'
191 _fpartid = b'>I'
191 _fpartid = b'>I'
192 _fpayloadsize = b'>i'
192 _fpayloadsize = b'>i'
193 _fpartparamcount = b'>BB'
193 _fpartparamcount = b'>BB'
194
194
195 preferedchunksize = 32768
195 preferedchunksize = 32768
196
196
197 _parttypeforbidden = re.compile(b'[^a-zA-Z0-9_:-]')
197 _parttypeforbidden = re.compile(b'[^a-zA-Z0-9_:-]')
198
198
199
199
200 def outdebug(ui, message):
200 def outdebug(ui, message):
201 """debug regarding output stream (bundling)"""
201 """debug regarding output stream (bundling)"""
202 if ui.configbool(b'devel', b'bundle2.debug'):
202 if ui.configbool(b'devel', b'bundle2.debug'):
203 ui.debug(b'bundle2-output: %s\n' % message)
203 ui.debug(b'bundle2-output: %s\n' % message)
204
204
205
205
206 def indebug(ui, message):
206 def indebug(ui, message):
207 """debug on input stream (unbundling)"""
207 """debug on input stream (unbundling)"""
208 if ui.configbool(b'devel', b'bundle2.debug'):
208 if ui.configbool(b'devel', b'bundle2.debug'):
209 ui.debug(b'bundle2-input: %s\n' % message)
209 ui.debug(b'bundle2-input: %s\n' % message)
210
210
211
211
212 def validateparttype(parttype):
212 def validateparttype(parttype):
213 """raise ValueError if a parttype contains invalid character"""
213 """raise ValueError if a parttype contains invalid character"""
214 if _parttypeforbidden.search(parttype):
214 if _parttypeforbidden.search(parttype):
215 raise ValueError(parttype)
215 raise ValueError(parttype)
216
216
217
217
218 def _makefpartparamsizes(nbparams):
218 def _makefpartparamsizes(nbparams):
219 """return a struct format to read part parameter sizes
219 """return a struct format to read part parameter sizes
220
220
221 The number parameters is variable so we need to build that format
221 The number parameters is variable so we need to build that format
222 dynamically.
222 dynamically.
223 """
223 """
224 return b'>' + (b'BB' * nbparams)
224 return b'>' + (b'BB' * nbparams)
225
225
226
226
227 parthandlermapping = {}
227 parthandlermapping = {}
228
228
229
229
230 def parthandler(parttype, params=()):
230 def parthandler(parttype, params=()):
231 """decorator that register a function as a bundle2 part handler
231 """decorator that register a function as a bundle2 part handler
232
232
233 eg::
233 eg::
234
234
235 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
235 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
236 def myparttypehandler(...):
236 def myparttypehandler(...):
237 '''process a part of type "my part".'''
237 '''process a part of type "my part".'''
238 ...
238 ...
239 """
239 """
240 validateparttype(parttype)
240 validateparttype(parttype)
241
241
242 def _decorator(func):
242 def _decorator(func):
243 lparttype = parttype.lower() # enforce lower case matching.
243 lparttype = parttype.lower() # enforce lower case matching.
244 assert lparttype not in parthandlermapping
244 assert lparttype not in parthandlermapping
245 parthandlermapping[lparttype] = func
245 parthandlermapping[lparttype] = func
246 func.params = frozenset(params)
246 func.params = frozenset(params)
247 return func
247 return func
248
248
249 return _decorator
249 return _decorator
250
250
251
251
252 class unbundlerecords(object):
252 class unbundlerecords(object):
253 """keep record of what happens during and unbundle
253 """keep record of what happens during and unbundle
254
254
255 New records are added using `records.add('cat', obj)`. Where 'cat' is a
255 New records are added using `records.add('cat', obj)`. Where 'cat' is a
256 category of record and obj is an arbitrary object.
256 category of record and obj is an arbitrary object.
257
257
258 `records['cat']` will return all entries of this category 'cat'.
258 `records['cat']` will return all entries of this category 'cat'.
259
259
260 Iterating on the object itself will yield `('category', obj)` tuples
260 Iterating on the object itself will yield `('category', obj)` tuples
261 for all entries.
261 for all entries.
262
262
263 All iterations happens in chronological order.
263 All iterations happens in chronological order.
264 """
264 """
265
265
266 def __init__(self):
266 def __init__(self):
267 self._categories = {}
267 self._categories = {}
268 self._sequences = []
268 self._sequences = []
269 self._replies = {}
269 self._replies = {}
270
270
271 def add(self, category, entry, inreplyto=None):
271 def add(self, category, entry, inreplyto=None):
272 """add a new record of a given category.
272 """add a new record of a given category.
273
273
274 The entry can then be retrieved in the list returned by
274 The entry can then be retrieved in the list returned by
275 self['category']."""
275 self['category']."""
276 self._categories.setdefault(category, []).append(entry)
276 self._categories.setdefault(category, []).append(entry)
277 self._sequences.append((category, entry))
277 self._sequences.append((category, entry))
278 if inreplyto is not None:
278 if inreplyto is not None:
279 self.getreplies(inreplyto).add(category, entry)
279 self.getreplies(inreplyto).add(category, entry)
280
280
281 def getreplies(self, partid):
281 def getreplies(self, partid):
282 """get the records that are replies to a specific part"""
282 """get the records that are replies to a specific part"""
283 return self._replies.setdefault(partid, unbundlerecords())
283 return self._replies.setdefault(partid, unbundlerecords())
284
284
285 def __getitem__(self, cat):
285 def __getitem__(self, cat):
286 return tuple(self._categories.get(cat, ()))
286 return tuple(self._categories.get(cat, ()))
287
287
288 def __iter__(self):
288 def __iter__(self):
289 return iter(self._sequences)
289 return iter(self._sequences)
290
290
291 def __len__(self):
291 def __len__(self):
292 return len(self._sequences)
292 return len(self._sequences)
293
293
294 def __nonzero__(self):
294 def __nonzero__(self):
295 return bool(self._sequences)
295 return bool(self._sequences)
296
296
297 __bool__ = __nonzero__
297 __bool__ = __nonzero__
298
298
299
299
300 class bundleoperation(object):
300 class bundleoperation(object):
301 """an object that represents a single bundling process
301 """an object that represents a single bundling process
302
302
303 Its purpose is to carry unbundle-related objects and states.
303 Its purpose is to carry unbundle-related objects and states.
304
304
305 A new object should be created at the beginning of each bundle processing.
305 A new object should be created at the beginning of each bundle processing.
306 The object is to be returned by the processing function.
306 The object is to be returned by the processing function.
307
307
308 The object has very little content now it will ultimately contain:
308 The object has very little content now it will ultimately contain:
309 * an access to the repo the bundle is applied to,
309 * an access to the repo the bundle is applied to,
310 * a ui object,
310 * a ui object,
311 * a way to retrieve a transaction to add changes to the repo,
311 * a way to retrieve a transaction to add changes to the repo,
312 * a way to record the result of processing each part,
312 * a way to record the result of processing each part,
313 * a way to construct a bundle response when applicable.
313 * a way to construct a bundle response when applicable.
314 """
314 """
315
315
316 def __init__(self, repo, transactiongetter, captureoutput=True, source=b''):
316 def __init__(self, repo, transactiongetter, captureoutput=True, source=b''):
317 self.repo = repo
317 self.repo = repo
318 self.ui = repo.ui
318 self.ui = repo.ui
319 self.records = unbundlerecords()
319 self.records = unbundlerecords()
320 self.reply = None
320 self.reply = None
321 self.captureoutput = captureoutput
321 self.captureoutput = captureoutput
322 self.hookargs = {}
322 self.hookargs = {}
323 self._gettransaction = transactiongetter
323 self._gettransaction = transactiongetter
324 # carries value that can modify part behavior
324 # carries value that can modify part behavior
325 self.modes = {}
325 self.modes = {}
326 self.source = source
326 self.source = source
327
327
328 def gettransaction(self):
328 def gettransaction(self):
329 transaction = self._gettransaction()
329 transaction = self._gettransaction()
330
330
331 if self.hookargs:
331 if self.hookargs:
332 # the ones added to the transaction supercede those added
332 # the ones added to the transaction supercede those added
333 # to the operation.
333 # to the operation.
334 self.hookargs.update(transaction.hookargs)
334 self.hookargs.update(transaction.hookargs)
335 transaction.hookargs = self.hookargs
335 transaction.hookargs = self.hookargs
336
336
337 # mark the hookargs as flushed. further attempts to add to
337 # mark the hookargs as flushed. further attempts to add to
338 # hookargs will result in an abort.
338 # hookargs will result in an abort.
339 self.hookargs = None
339 self.hookargs = None
340
340
341 return transaction
341 return transaction
342
342
343 def addhookargs(self, hookargs):
343 def addhookargs(self, hookargs):
344 if self.hookargs is None:
344 if self.hookargs is None:
345 raise error.ProgrammingError(
345 raise error.ProgrammingError(
346 b'attempted to add hookargs to '
346 b'attempted to add hookargs to '
347 b'operation after transaction started'
347 b'operation after transaction started'
348 )
348 )
349 self.hookargs.update(hookargs)
349 self.hookargs.update(hookargs)
350
350
351
351
352 class TransactionUnavailable(RuntimeError):
352 class TransactionUnavailable(RuntimeError):
353 pass
353 pass
354
354
355
355
356 def _notransaction():
356 def _notransaction():
357 """default method to get a transaction while processing a bundle
357 """default method to get a transaction while processing a bundle
358
358
359 Raise an exception to highlight the fact that no transaction was expected
359 Raise an exception to highlight the fact that no transaction was expected
360 to be created"""
360 to be created"""
361 raise TransactionUnavailable()
361 raise TransactionUnavailable()
362
362
363
363
364 def applybundle(repo, unbundler, tr, source, url=None, **kwargs):
364 def applybundle(repo, unbundler, tr, source, url=None, **kwargs):
365 # transform me into unbundler.apply() as soon as the freeze is lifted
365 # transform me into unbundler.apply() as soon as the freeze is lifted
366 if isinstance(unbundler, unbundle20):
366 if isinstance(unbundler, unbundle20):
367 tr.hookargs[b'bundle2'] = b'1'
367 tr.hookargs[b'bundle2'] = b'1'
368 if source is not None and b'source' not in tr.hookargs:
368 if source is not None and b'source' not in tr.hookargs:
369 tr.hookargs[b'source'] = source
369 tr.hookargs[b'source'] = source
370 if url is not None and b'url' not in tr.hookargs:
370 if url is not None and b'url' not in tr.hookargs:
371 tr.hookargs[b'url'] = url
371 tr.hookargs[b'url'] = url
372 return processbundle(repo, unbundler, lambda: tr, source=source)
372 return processbundle(repo, unbundler, lambda: tr, source=source)
373 else:
373 else:
374 # the transactiongetter won't be used, but we might as well set it
374 # the transactiongetter won't be used, but we might as well set it
375 op = bundleoperation(repo, lambda: tr, source=source)
375 op = bundleoperation(repo, lambda: tr, source=source)
376 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
376 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
377 return op
377 return op
378
378
379
379
380 class partiterator(object):
380 class partiterator(object):
381 def __init__(self, repo, op, unbundler):
381 def __init__(self, repo, op, unbundler):
382 self.repo = repo
382 self.repo = repo
383 self.op = op
383 self.op = op
384 self.unbundler = unbundler
384 self.unbundler = unbundler
385 self.iterator = None
385 self.iterator = None
386 self.count = 0
386 self.count = 0
387 self.current = None
387 self.current = None
388
388
389 def __enter__(self):
389 def __enter__(self):
390 def func():
390 def func():
391 itr = enumerate(self.unbundler.iterparts(), 1)
391 itr = enumerate(self.unbundler.iterparts(), 1)
392 for count, p in itr:
392 for count, p in itr:
393 self.count = count
393 self.count = count
394 self.current = p
394 self.current = p
395 yield p
395 yield p
396 p.consume()
396 p.consume()
397 self.current = None
397 self.current = None
398
398
399 self.iterator = func()
399 self.iterator = func()
400 return self.iterator
400 return self.iterator
401
401
402 def __exit__(self, type, exc, tb):
402 def __exit__(self, type, exc, tb):
403 if not self.iterator:
403 if not self.iterator:
404 return
404 return
405
405
406 # Only gracefully abort in a normal exception situation. User aborts
406 # Only gracefully abort in a normal exception situation. User aborts
407 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
407 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
408 # and should not gracefully cleanup.
408 # and should not gracefully cleanup.
409 if isinstance(exc, Exception):
409 if isinstance(exc, Exception):
410 # Any exceptions seeking to the end of the bundle at this point are
410 # Any exceptions seeking to the end of the bundle at this point are
411 # almost certainly related to the underlying stream being bad.
411 # almost certainly related to the underlying stream being bad.
412 # And, chances are that the exception we're handling is related to
412 # And, chances are that the exception we're handling is related to
413 # getting in that bad state. So, we swallow the seeking error and
413 # getting in that bad state. So, we swallow the seeking error and
414 # re-raise the original error.
414 # re-raise the original error.
415 seekerror = False
415 seekerror = False
416 try:
416 try:
417 if self.current:
417 if self.current:
418 # consume the part content to not corrupt the stream.
418 # consume the part content to not corrupt the stream.
419 self.current.consume()
419 self.current.consume()
420
420
421 for part in self.iterator:
421 for part in self.iterator:
422 # consume the bundle content
422 # consume the bundle content
423 part.consume()
423 part.consume()
424 except Exception:
424 except Exception:
425 seekerror = True
425 seekerror = True
426
426
427 # Small hack to let caller code distinguish exceptions from bundle2
427 # Small hack to let caller code distinguish exceptions from bundle2
428 # processing from processing the old format. This is mostly needed
428 # processing from processing the old format. This is mostly needed
429 # to handle different return codes to unbundle according to the type
429 # to handle different return codes to unbundle according to the type
430 # of bundle. We should probably clean up or drop this return code
430 # of bundle. We should probably clean up or drop this return code
431 # craziness in a future version.
431 # craziness in a future version.
432 exc.duringunbundle2 = True
432 exc.duringunbundle2 = True
433 salvaged = []
433 salvaged = []
434 replycaps = None
434 replycaps = None
435 if self.op.reply is not None:
435 if self.op.reply is not None:
436 salvaged = self.op.reply.salvageoutput()
436 salvaged = self.op.reply.salvageoutput()
437 replycaps = self.op.reply.capabilities
437 replycaps = self.op.reply.capabilities
438 exc._replycaps = replycaps
438 exc._replycaps = replycaps
439 exc._bundle2salvagedoutput = salvaged
439 exc._bundle2salvagedoutput = salvaged
440
440
441 # Re-raising from a variable loses the original stack. So only use
441 # Re-raising from a variable loses the original stack. So only use
442 # that form if we need to.
442 # that form if we need to.
443 if seekerror:
443 if seekerror:
444 raise exc
444 raise exc
445
445
446 self.repo.ui.debug(
446 self.repo.ui.debug(
447 b'bundle2-input-bundle: %i parts total\n' % self.count
447 b'bundle2-input-bundle: %i parts total\n' % self.count
448 )
448 )
449
449
450
450
451 def processbundle(repo, unbundler, transactiongetter=None, op=None, source=b''):
451 def processbundle(repo, unbundler, transactiongetter=None, op=None, source=b''):
452 """This function process a bundle, apply effect to/from a repo
452 """This function process a bundle, apply effect to/from a repo
453
453
454 It iterates over each part then searches for and uses the proper handling
454 It iterates over each part then searches for and uses the proper handling
455 code to process the part. Parts are processed in order.
455 code to process the part. Parts are processed in order.
456
456
457 Unknown Mandatory part will abort the process.
457 Unknown Mandatory part will abort the process.
458
458
459 It is temporarily possible to provide a prebuilt bundleoperation to the
459 It is temporarily possible to provide a prebuilt bundleoperation to the
460 function. This is used to ensure output is properly propagated in case of
460 function. This is used to ensure output is properly propagated in case of
461 an error during the unbundling. This output capturing part will likely be
461 an error during the unbundling. This output capturing part will likely be
462 reworked and this ability will probably go away in the process.
462 reworked and this ability will probably go away in the process.
463 """
463 """
464 if op is None:
464 if op is None:
465 if transactiongetter is None:
465 if transactiongetter is None:
466 transactiongetter = _notransaction
466 transactiongetter = _notransaction
467 op = bundleoperation(repo, transactiongetter, source=source)
467 op = bundleoperation(repo, transactiongetter, source=source)
468 # todo:
468 # todo:
469 # - replace this is a init function soon.
469 # - replace this is a init function soon.
470 # - exception catching
470 # - exception catching
471 unbundler.params
471 unbundler.params
472 if repo.ui.debugflag:
472 if repo.ui.debugflag:
473 msg = [b'bundle2-input-bundle:']
473 msg = [b'bundle2-input-bundle:']
474 if unbundler.params:
474 if unbundler.params:
475 msg.append(b' %i params' % len(unbundler.params))
475 msg.append(b' %i params' % len(unbundler.params))
476 if op._gettransaction is None or op._gettransaction is _notransaction:
476 if op._gettransaction is None or op._gettransaction is _notransaction:
477 msg.append(b' no-transaction')
477 msg.append(b' no-transaction')
478 else:
478 else:
479 msg.append(b' with-transaction')
479 msg.append(b' with-transaction')
480 msg.append(b'\n')
480 msg.append(b'\n')
481 repo.ui.debug(b''.join(msg))
481 repo.ui.debug(b''.join(msg))
482
482
483 processparts(repo, op, unbundler)
483 processparts(repo, op, unbundler)
484
484
485 return op
485 return op
486
486
487
487
488 def processparts(repo, op, unbundler):
488 def processparts(repo, op, unbundler):
489 with partiterator(repo, op, unbundler) as parts:
489 with partiterator(repo, op, unbundler) as parts:
490 for part in parts:
490 for part in parts:
491 _processpart(op, part)
491 _processpart(op, part)
492
492
493
493
494 def _processchangegroup(op, cg, tr, source, url, **kwargs):
494 def _processchangegroup(op, cg, tr, source, url, **kwargs):
495 ret = cg.apply(op.repo, tr, source, url, **kwargs)
495 ret = cg.apply(op.repo, tr, source, url, **kwargs)
496 op.records.add(
496 op.records.add(
497 b'changegroup',
497 b'changegroup',
498 {
498 {
499 b'return': ret,
499 b'return': ret,
500 },
500 },
501 )
501 )
502 return ret
502 return ret
503
503
504
504
505 def _gethandler(op, part):
505 def _gethandler(op, part):
506 status = b'unknown' # used by debug output
506 status = b'unknown' # used by debug output
507 try:
507 try:
508 handler = parthandlermapping.get(part.type)
508 handler = parthandlermapping.get(part.type)
509 if handler is None:
509 if handler is None:
510 status = b'unsupported-type'
510 status = b'unsupported-type'
511 raise error.BundleUnknownFeatureError(parttype=part.type)
511 raise error.BundleUnknownFeatureError(parttype=part.type)
512 indebug(op.ui, b'found a handler for part %s' % part.type)
512 indebug(op.ui, b'found a handler for part %s' % part.type)
513 unknownparams = part.mandatorykeys - handler.params
513 unknownparams = part.mandatorykeys - handler.params
514 if unknownparams:
514 if unknownparams:
515 unknownparams = list(unknownparams)
515 unknownparams = list(unknownparams)
516 unknownparams.sort()
516 unknownparams.sort()
517 status = b'unsupported-params (%s)' % b', '.join(unknownparams)
517 status = b'unsupported-params (%s)' % b', '.join(unknownparams)
518 raise error.BundleUnknownFeatureError(
518 raise error.BundleUnknownFeatureError(
519 parttype=part.type, params=unknownparams
519 parttype=part.type, params=unknownparams
520 )
520 )
521 status = b'supported'
521 status = b'supported'
522 except error.BundleUnknownFeatureError as exc:
522 except error.BundleUnknownFeatureError as exc:
523 if part.mandatory: # mandatory parts
523 if part.mandatory: # mandatory parts
524 raise
524 raise
525 indebug(op.ui, b'ignoring unsupported advisory part %s' % exc)
525 indebug(op.ui, b'ignoring unsupported advisory part %s' % exc)
526 return # skip to part processing
526 return # skip to part processing
527 finally:
527 finally:
528 if op.ui.debugflag:
528 if op.ui.debugflag:
529 msg = [b'bundle2-input-part: "%s"' % part.type]
529 msg = [b'bundle2-input-part: "%s"' % part.type]
530 if not part.mandatory:
530 if not part.mandatory:
531 msg.append(b' (advisory)')
531 msg.append(b' (advisory)')
532 nbmp = len(part.mandatorykeys)
532 nbmp = len(part.mandatorykeys)
533 nbap = len(part.params) - nbmp
533 nbap = len(part.params) - nbmp
534 if nbmp or nbap:
534 if nbmp or nbap:
535 msg.append(b' (params:')
535 msg.append(b' (params:')
536 if nbmp:
536 if nbmp:
537 msg.append(b' %i mandatory' % nbmp)
537 msg.append(b' %i mandatory' % nbmp)
538 if nbap:
538 if nbap:
539 msg.append(b' %i advisory' % nbmp)
539 msg.append(b' %i advisory' % nbmp)
540 msg.append(b')')
540 msg.append(b')')
541 msg.append(b' %s\n' % status)
541 msg.append(b' %s\n' % status)
542 op.ui.debug(b''.join(msg))
542 op.ui.debug(b''.join(msg))
543
543
544 return handler
544 return handler
545
545
546
546
547 def _processpart(op, part):
547 def _processpart(op, part):
548 """process a single part from a bundle
548 """process a single part from a bundle
549
549
550 The part is guaranteed to have been fully consumed when the function exits
550 The part is guaranteed to have been fully consumed when the function exits
551 (even if an exception is raised)."""
551 (even if an exception is raised)."""
552 handler = _gethandler(op, part)
552 handler = _gethandler(op, part)
553 if handler is None:
553 if handler is None:
554 return
554 return
555
555
556 # handler is called outside the above try block so that we don't
556 # handler is called outside the above try block so that we don't
557 # risk catching KeyErrors from anything other than the
557 # risk catching KeyErrors from anything other than the
558 # parthandlermapping lookup (any KeyError raised by handler()
558 # parthandlermapping lookup (any KeyError raised by handler()
559 # itself represents a defect of a different variety).
559 # itself represents a defect of a different variety).
560 output = None
560 output = None
561 if op.captureoutput and op.reply is not None:
561 if op.captureoutput and op.reply is not None:
562 op.ui.pushbuffer(error=True, subproc=True)
562 op.ui.pushbuffer(error=True, subproc=True)
563 output = b''
563 output = b''
564 try:
564 try:
565 handler(op, part)
565 handler(op, part)
566 finally:
566 finally:
567 if output is not None:
567 if output is not None:
568 output = op.ui.popbuffer()
568 output = op.ui.popbuffer()
569 if output:
569 if output:
570 outpart = op.reply.newpart(b'output', data=output, mandatory=False)
570 outpart = op.reply.newpart(b'output', data=output, mandatory=False)
571 outpart.addparam(
571 outpart.addparam(
572 b'in-reply-to', pycompat.bytestr(part.id), mandatory=False
572 b'in-reply-to', pycompat.bytestr(part.id), mandatory=False
573 )
573 )
574
574
575
575
576 def decodecaps(blob):
576 def decodecaps(blob):
577 """decode a bundle2 caps bytes blob into a dictionary
577 """decode a bundle2 caps bytes blob into a dictionary
578
578
579 The blob is a list of capabilities (one per line)
579 The blob is a list of capabilities (one per line)
580 Capabilities may have values using a line of the form::
580 Capabilities may have values using a line of the form::
581
581
582 capability=value1,value2,value3
582 capability=value1,value2,value3
583
583
584 The values are always a list."""
584 The values are always a list."""
585 caps = {}
585 caps = {}
586 for line in blob.splitlines():
586 for line in blob.splitlines():
587 if not line:
587 if not line:
588 continue
588 continue
589 if b'=' not in line:
589 if b'=' not in line:
590 key, vals = line, ()
590 key, vals = line, ()
591 else:
591 else:
592 key, vals = line.split(b'=', 1)
592 key, vals = line.split(b'=', 1)
593 vals = vals.split(b',')
593 vals = vals.split(b',')
594 key = urlreq.unquote(key)
594 key = urlreq.unquote(key)
595 vals = [urlreq.unquote(v) for v in vals]
595 vals = [urlreq.unquote(v) for v in vals]
596 caps[key] = vals
596 caps[key] = vals
597 return caps
597 return caps
598
598
599
599
600 def encodecaps(caps):
600 def encodecaps(caps):
601 """encode a bundle2 caps dictionary into a bytes blob"""
601 """encode a bundle2 caps dictionary into a bytes blob"""
602 chunks = []
602 chunks = []
603 for ca in sorted(caps):
603 for ca in sorted(caps):
604 vals = caps[ca]
604 vals = caps[ca]
605 ca = urlreq.quote(ca)
605 ca = urlreq.quote(ca)
606 vals = [urlreq.quote(v) for v in vals]
606 vals = [urlreq.quote(v) for v in vals]
607 if vals:
607 if vals:
608 ca = b"%s=%s" % (ca, b','.join(vals))
608 ca = b"%s=%s" % (ca, b','.join(vals))
609 chunks.append(ca)
609 chunks.append(ca)
610 return b'\n'.join(chunks)
610 return b'\n'.join(chunks)
611
611
612
612
613 bundletypes = {
613 bundletypes = {
614 b"": (b"", b'UN'), # only when using unbundle on ssh and old http servers
614 b"": (b"", b'UN'), # only when using unbundle on ssh and old http servers
615 # since the unification ssh accepts a header but there
615 # since the unification ssh accepts a header but there
616 # is no capability signaling it.
616 # is no capability signaling it.
617 b"HG20": (), # special-cased below
617 b"HG20": (), # special-cased below
618 b"HG10UN": (b"HG10UN", b'UN'),
618 b"HG10UN": (b"HG10UN", b'UN'),
619 b"HG10BZ": (b"HG10", b'BZ'),
619 b"HG10BZ": (b"HG10", b'BZ'),
620 b"HG10GZ": (b"HG10GZ", b'GZ'),
620 b"HG10GZ": (b"HG10GZ", b'GZ'),
621 }
621 }
622
622
623 # hgweb uses this list to communicate its preferred type
623 # hgweb uses this list to communicate its preferred type
624 bundlepriority = [b'HG10GZ', b'HG10BZ', b'HG10UN']
624 bundlepriority = [b'HG10GZ', b'HG10BZ', b'HG10UN']
625
625
626
626
627 class bundle20(object):
627 class bundle20(object):
628 """represent an outgoing bundle2 container
628 """represent an outgoing bundle2 container
629
629
630 Use the `addparam` method to add stream level parameter. and `newpart` to
630 Use the `addparam` method to add stream level parameter. and `newpart` to
631 populate it. Then call `getchunks` to retrieve all the binary chunks of
631 populate it. Then call `getchunks` to retrieve all the binary chunks of
632 data that compose the bundle2 container."""
632 data that compose the bundle2 container."""
633
633
634 _magicstring = b'HG20'
634 _magicstring = b'HG20'
635
635
636 def __init__(self, ui, capabilities=()):
636 def __init__(self, ui, capabilities=()):
637 self.ui = ui
637 self.ui = ui
638 self._params = []
638 self._params = []
639 self._parts = []
639 self._parts = []
640 self.capabilities = dict(capabilities)
640 self.capabilities = dict(capabilities)
641 self._compengine = util.compengines.forbundletype(b'UN')
641 self._compengine = util.compengines.forbundletype(b'UN')
642 self._compopts = None
642 self._compopts = None
643 # If compression is being handled by a consumer of the raw
643 # If compression is being handled by a consumer of the raw
644 # data (e.g. the wire protocol), unsetting this flag tells
644 # data (e.g. the wire protocol), unsetting this flag tells
645 # consumers that the bundle is best left uncompressed.
645 # consumers that the bundle is best left uncompressed.
646 self.prefercompressed = True
646 self.prefercompressed = True
647
647
648 def setcompression(self, alg, compopts=None):
648 def setcompression(self, alg, compopts=None):
649 """setup core part compression to <alg>"""
649 """setup core part compression to <alg>"""
650 if alg in (None, b'UN'):
650 if alg in (None, b'UN'):
651 return
651 return
652 assert not any(n.lower() == b'compression' for n, v in self._params)
652 assert not any(n.lower() == b'compression' for n, v in self._params)
653 self.addparam(b'Compression', alg)
653 self.addparam(b'Compression', alg)
654 self._compengine = util.compengines.forbundletype(alg)
654 self._compengine = util.compengines.forbundletype(alg)
655 self._compopts = compopts
655 self._compopts = compopts
656
656
657 @property
657 @property
658 def nbparts(self):
658 def nbparts(self):
659 """total number of parts added to the bundler"""
659 """total number of parts added to the bundler"""
660 return len(self._parts)
660 return len(self._parts)
661
661
662 # methods used to defines the bundle2 content
662 # methods used to defines the bundle2 content
663 def addparam(self, name, value=None):
663 def addparam(self, name, value=None):
664 """add a stream level parameter"""
664 """add a stream level parameter"""
665 if not name:
665 if not name:
666 raise error.ProgrammingError(b'empty parameter name')
666 raise error.ProgrammingError(b'empty parameter name')
667 if name[0:1] not in pycompat.bytestr(
667 if name[0:1] not in pycompat.bytestr(
668 string.ascii_letters # pytype: disable=wrong-arg-types
668 string.ascii_letters # pytype: disable=wrong-arg-types
669 ):
669 ):
670 raise error.ProgrammingError(
670 raise error.ProgrammingError(
671 b'non letter first character: %s' % name
671 b'non letter first character: %s' % name
672 )
672 )
673 self._params.append((name, value))
673 self._params.append((name, value))
674
674
675 def addpart(self, part):
675 def addpart(self, part):
676 """add a new part to the bundle2 container
676 """add a new part to the bundle2 container
677
677
678 Parts contains the actual applicative payload."""
678 Parts contains the actual applicative payload."""
679 assert part.id is None
679 assert part.id is None
680 part.id = len(self._parts) # very cheap counter
680 part.id = len(self._parts) # very cheap counter
681 self._parts.append(part)
681 self._parts.append(part)
682
682
683 def newpart(self, typeid, *args, **kwargs):
683 def newpart(self, typeid, *args, **kwargs):
684 """create a new part and add it to the containers
684 """create a new part and add it to the containers
685
685
686 As the part is directly added to the containers. For now, this means
686 As the part is directly added to the containers. For now, this means
687 that any failure to properly initialize the part after calling
687 that any failure to properly initialize the part after calling
688 ``newpart`` should result in a failure of the whole bundling process.
688 ``newpart`` should result in a failure of the whole bundling process.
689
689
690 You can still fall back to manually create and add if you need better
690 You can still fall back to manually create and add if you need better
691 control."""
691 control."""
692 part = bundlepart(typeid, *args, **kwargs)
692 part = bundlepart(typeid, *args, **kwargs)
693 self.addpart(part)
693 self.addpart(part)
694 return part
694 return part
695
695
696 # methods used to generate the bundle2 stream
696 # methods used to generate the bundle2 stream
697 def getchunks(self):
697 def getchunks(self):
698 if self.ui.debugflag:
698 if self.ui.debugflag:
699 msg = [b'bundle2-output-bundle: "%s",' % self._magicstring]
699 msg = [b'bundle2-output-bundle: "%s",' % self._magicstring]
700 if self._params:
700 if self._params:
701 msg.append(b' (%i params)' % len(self._params))
701 msg.append(b' (%i params)' % len(self._params))
702 msg.append(b' %i parts total\n' % len(self._parts))
702 msg.append(b' %i parts total\n' % len(self._parts))
703 self.ui.debug(b''.join(msg))
703 self.ui.debug(b''.join(msg))
704 outdebug(self.ui, b'start emission of %s stream' % self._magicstring)
704 outdebug(self.ui, b'start emission of %s stream' % self._magicstring)
705 yield self._magicstring
705 yield self._magicstring
706 param = self._paramchunk()
706 param = self._paramchunk()
707 outdebug(self.ui, b'bundle parameter: %s' % param)
707 outdebug(self.ui, b'bundle parameter: %s' % param)
708 yield _pack(_fstreamparamsize, len(param))
708 yield _pack(_fstreamparamsize, len(param))
709 if param:
709 if param:
710 yield param
710 yield param
711 for chunk in self._compengine.compressstream(
711 for chunk in self._compengine.compressstream(
712 self._getcorechunk(), self._compopts
712 self._getcorechunk(), self._compopts
713 ):
713 ):
714 yield chunk
714 yield chunk
715
715
716 def _paramchunk(self):
716 def _paramchunk(self):
717 """return a encoded version of all stream parameters"""
717 """return a encoded version of all stream parameters"""
718 blocks = []
718 blocks = []
719 for par, value in self._params:
719 for par, value in self._params:
720 par = urlreq.quote(par)
720 par = urlreq.quote(par)
721 if value is not None:
721 if value is not None:
722 value = urlreq.quote(value)
722 value = urlreq.quote(value)
723 par = b'%s=%s' % (par, value)
723 par = b'%s=%s' % (par, value)
724 blocks.append(par)
724 blocks.append(par)
725 return b' '.join(blocks)
725 return b' '.join(blocks)
726
726
727 def _getcorechunk(self):
727 def _getcorechunk(self):
728 """yield chunk for the core part of the bundle
728 """yield chunk for the core part of the bundle
729
729
730 (all but headers and parameters)"""
730 (all but headers and parameters)"""
731 outdebug(self.ui, b'start of parts')
731 outdebug(self.ui, b'start of parts')
732 for part in self._parts:
732 for part in self._parts:
733 outdebug(self.ui, b'bundle part: "%s"' % part.type)
733 outdebug(self.ui, b'bundle part: "%s"' % part.type)
734 for chunk in part.getchunks(ui=self.ui):
734 for chunk in part.getchunks(ui=self.ui):
735 yield chunk
735 yield chunk
736 outdebug(self.ui, b'end of bundle')
736 outdebug(self.ui, b'end of bundle')
737 yield _pack(_fpartheadersize, 0)
737 yield _pack(_fpartheadersize, 0)
738
738
739 def salvageoutput(self):
739 def salvageoutput(self):
740 """return a list with a copy of all output parts in the bundle
740 """return a list with a copy of all output parts in the bundle
741
741
742 This is meant to be used during error handling to make sure we preserve
742 This is meant to be used during error handling to make sure we preserve
743 server output"""
743 server output"""
744 salvaged = []
744 salvaged = []
745 for part in self._parts:
745 for part in self._parts:
746 if part.type.startswith(b'output'):
746 if part.type.startswith(b'output'):
747 salvaged.append(part.copy())
747 salvaged.append(part.copy())
748 return salvaged
748 return salvaged
749
749
750
750
751 class unpackermixin(object):
751 class unpackermixin(object):
752 """A mixin to extract bytes and struct data from a stream"""
752 """A mixin to extract bytes and struct data from a stream"""
753
753
754 def __init__(self, fp):
754 def __init__(self, fp):
755 self._fp = fp
755 self._fp = fp
756
756
757 def _unpack(self, format):
757 def _unpack(self, format):
758 """unpack this struct format from the stream
758 """unpack this struct format from the stream
759
759
760 This method is meant for internal usage by the bundle2 protocol only.
760 This method is meant for internal usage by the bundle2 protocol only.
761 They directly manipulate the low level stream including bundle2 level
761 They directly manipulate the low level stream including bundle2 level
762 instruction.
762 instruction.
763
763
764 Do not use it to implement higher-level logic or methods."""
764 Do not use it to implement higher-level logic or methods."""
765 data = self._readexact(struct.calcsize(format))
765 data = self._readexact(struct.calcsize(format))
766 return _unpack(format, data)
766 return _unpack(format, data)
767
767
768 def _readexact(self, size):
768 def _readexact(self, size):
769 """read exactly <size> bytes from the stream
769 """read exactly <size> bytes from the stream
770
770
771 This method is meant for internal usage by the bundle2 protocol only.
771 This method is meant for internal usage by the bundle2 protocol only.
772 They directly manipulate the low level stream including bundle2 level
772 They directly manipulate the low level stream including bundle2 level
773 instruction.
773 instruction.
774
774
775 Do not use it to implement higher-level logic or methods."""
775 Do not use it to implement higher-level logic or methods."""
776 return changegroup.readexactly(self._fp, size)
776 return changegroup.readexactly(self._fp, size)
777
777
778
778
779 def getunbundler(ui, fp, magicstring=None):
779 def getunbundler(ui, fp, magicstring=None):
780 """return a valid unbundler object for a given magicstring"""
780 """return a valid unbundler object for a given magicstring"""
781 if magicstring is None:
781 if magicstring is None:
782 magicstring = changegroup.readexactly(fp, 4)
782 magicstring = changegroup.readexactly(fp, 4)
783 magic, version = magicstring[0:2], magicstring[2:4]
783 magic, version = magicstring[0:2], magicstring[2:4]
784 if magic != b'HG':
784 if magic != b'HG':
785 ui.debug(
785 ui.debug(
786 b"error: invalid magic: %r (version %r), should be 'HG'\n"
786 b"error: invalid magic: %r (version %r), should be 'HG'\n"
787 % (magic, version)
787 % (magic, version)
788 )
788 )
789 raise error.Abort(_(b'not a Mercurial bundle'))
789 raise error.Abort(_(b'not a Mercurial bundle'))
790 unbundlerclass = formatmap.get(version)
790 unbundlerclass = formatmap.get(version)
791 if unbundlerclass is None:
791 if unbundlerclass is None:
792 raise error.Abort(_(b'unknown bundle version %s') % version)
792 raise error.Abort(_(b'unknown bundle version %s') % version)
793 unbundler = unbundlerclass(ui, fp)
793 unbundler = unbundlerclass(ui, fp)
794 indebug(ui, b'start processing of %s stream' % magicstring)
794 indebug(ui, b'start processing of %s stream' % magicstring)
795 return unbundler
795 return unbundler
796
796
797
797
798 class unbundle20(unpackermixin):
798 class unbundle20(unpackermixin):
799 """interpret a bundle2 stream
799 """interpret a bundle2 stream
800
800
801 This class is fed with a binary stream and yields parts through its
801 This class is fed with a binary stream and yields parts through its
802 `iterparts` methods."""
802 `iterparts` methods."""
803
803
804 _magicstring = b'HG20'
804 _magicstring = b'HG20'
805
805
806 def __init__(self, ui, fp):
806 def __init__(self, ui, fp):
807 """If header is specified, we do not read it out of the stream."""
807 """If header is specified, we do not read it out of the stream."""
808 self.ui = ui
808 self.ui = ui
809 self._compengine = util.compengines.forbundletype(b'UN')
809 self._compengine = util.compengines.forbundletype(b'UN')
810 self._compressed = None
810 self._compressed = None
811 super(unbundle20, self).__init__(fp)
811 super(unbundle20, self).__init__(fp)
812
812
813 @util.propertycache
813 @util.propertycache
814 def params(self):
814 def params(self):
815 """dictionary of stream level parameters"""
815 """dictionary of stream level parameters"""
816 indebug(self.ui, b'reading bundle2 stream parameters')
816 indebug(self.ui, b'reading bundle2 stream parameters')
817 params = {}
817 params = {}
818 paramssize = self._unpack(_fstreamparamsize)[0]
818 paramssize = self._unpack(_fstreamparamsize)[0]
819 if paramssize < 0:
819 if paramssize < 0:
820 raise error.BundleValueError(
820 raise error.BundleValueError(
821 b'negative bundle param size: %i' % paramssize
821 b'negative bundle param size: %i' % paramssize
822 )
822 )
823 if paramssize:
823 if paramssize:
824 params = self._readexact(paramssize)
824 params = self._readexact(paramssize)
825 params = self._processallparams(params)
825 params = self._processallparams(params)
826 return params
826 return params
827
827
828 def _processallparams(self, paramsblock):
828 def _processallparams(self, paramsblock):
829 """"""
829 """"""
830 params = util.sortdict()
830 params = util.sortdict()
831 for p in paramsblock.split(b' '):
831 for p in paramsblock.split(b' '):
832 p = p.split(b'=', 1)
832 p = p.split(b'=', 1)
833 p = [urlreq.unquote(i) for i in p]
833 p = [urlreq.unquote(i) for i in p]
834 if len(p) < 2:
834 if len(p) < 2:
835 p.append(None)
835 p.append(None)
836 self._processparam(*p)
836 self._processparam(*p)
837 params[p[0]] = p[1]
837 params[p[0]] = p[1]
838 return params
838 return params
839
839
840 def _processparam(self, name, value):
840 def _processparam(self, name, value):
841 """process a parameter, applying its effect if needed
841 """process a parameter, applying its effect if needed
842
842
843 Parameter starting with a lower case letter are advisory and will be
843 Parameter starting with a lower case letter are advisory and will be
844 ignored when unknown. Those starting with an upper case letter are
844 ignored when unknown. Those starting with an upper case letter are
845 mandatory and will this function will raise a KeyError when unknown.
845 mandatory and will this function will raise a KeyError when unknown.
846
846
847 Note: no option are currently supported. Any input will be either
847 Note: no option are currently supported. Any input will be either
848 ignored or failing.
848 ignored or failing.
849 """
849 """
850 if not name:
850 if not name:
851 raise ValueError('empty parameter name')
851 raise ValueError('empty parameter name')
852 if name[0:1] not in pycompat.bytestr(
852 if name[0:1] not in pycompat.bytestr(
853 string.ascii_letters # pytype: disable=wrong-arg-types
853 string.ascii_letters # pytype: disable=wrong-arg-types
854 ):
854 ):
855 raise ValueError('non letter first character: %s' % name)
855 raise ValueError('non letter first character: %s' % name)
856 try:
856 try:
857 handler = b2streamparamsmap[name.lower()]
857 handler = b2streamparamsmap[name.lower()]
858 except KeyError:
858 except KeyError:
859 if name[0:1].islower():
859 if name[0:1].islower():
860 indebug(self.ui, b"ignoring unknown parameter %s" % name)
860 indebug(self.ui, b"ignoring unknown parameter %s" % name)
861 else:
861 else:
862 raise error.BundleUnknownFeatureError(params=(name,))
862 raise error.BundleUnknownFeatureError(params=(name,))
863 else:
863 else:
864 handler(self, name, value)
864 handler(self, name, value)
865
865
866 def _forwardchunks(self):
866 def _forwardchunks(self):
867 """utility to transfer a bundle2 as binary
867 """utility to transfer a bundle2 as binary
868
868
869 This is made necessary by the fact the 'getbundle' command over 'ssh'
869 This is made necessary by the fact the 'getbundle' command over 'ssh'
870 have no way to know then the reply end, relying on the bundle to be
870 have no way to know then the reply end, relying on the bundle to be
871 interpreted to know its end. This is terrible and we are sorry, but we
871 interpreted to know its end. This is terrible and we are sorry, but we
872 needed to move forward to get general delta enabled.
872 needed to move forward to get general delta enabled.
873 """
873 """
874 yield self._magicstring
874 yield self._magicstring
875 assert 'params' not in vars(self)
875 assert 'params' not in vars(self)
876 paramssize = self._unpack(_fstreamparamsize)[0]
876 paramssize = self._unpack(_fstreamparamsize)[0]
877 if paramssize < 0:
877 if paramssize < 0:
878 raise error.BundleValueError(
878 raise error.BundleValueError(
879 b'negative bundle param size: %i' % paramssize
879 b'negative bundle param size: %i' % paramssize
880 )
880 )
881 if paramssize:
881 if paramssize:
882 params = self._readexact(paramssize)
882 params = self._readexact(paramssize)
883 self._processallparams(params)
883 self._processallparams(params)
884 # The payload itself is decompressed below, so drop
884 # The payload itself is decompressed below, so drop
885 # the compression parameter passed down to compensate.
885 # the compression parameter passed down to compensate.
886 outparams = []
886 outparams = []
887 for p in params.split(b' '):
887 for p in params.split(b' '):
888 k, v = p.split(b'=', 1)
888 k, v = p.split(b'=', 1)
889 if k.lower() != b'compression':
889 if k.lower() != b'compression':
890 outparams.append(p)
890 outparams.append(p)
891 outparams = b' '.join(outparams)
891 outparams = b' '.join(outparams)
892 yield _pack(_fstreamparamsize, len(outparams))
892 yield _pack(_fstreamparamsize, len(outparams))
893 yield outparams
893 yield outparams
894 else:
894 else:
895 yield _pack(_fstreamparamsize, paramssize)
895 yield _pack(_fstreamparamsize, paramssize)
896 # From there, payload might need to be decompressed
896 # From there, payload might need to be decompressed
897 self._fp = self._compengine.decompressorreader(self._fp)
897 self._fp = self._compengine.decompressorreader(self._fp)
898 emptycount = 0
898 emptycount = 0
899 while emptycount < 2:
899 while emptycount < 2:
900 # so we can brainlessly loop
900 # so we can brainlessly loop
901 assert _fpartheadersize == _fpayloadsize
901 assert _fpartheadersize == _fpayloadsize
902 size = self._unpack(_fpartheadersize)[0]
902 size = self._unpack(_fpartheadersize)[0]
903 yield _pack(_fpartheadersize, size)
903 yield _pack(_fpartheadersize, size)
904 if size:
904 if size:
905 emptycount = 0
905 emptycount = 0
906 else:
906 else:
907 emptycount += 1
907 emptycount += 1
908 continue
908 continue
909 if size == flaginterrupt:
909 if size == flaginterrupt:
910 continue
910 continue
911 elif size < 0:
911 elif size < 0:
912 raise error.BundleValueError(b'negative chunk size: %i')
912 raise error.BundleValueError(b'negative chunk size: %i')
913 yield self._readexact(size)
913 yield self._readexact(size)
914
914
915 def iterparts(self, seekable=False):
915 def iterparts(self, seekable=False):
916 """yield all parts contained in the stream"""
916 """yield all parts contained in the stream"""
917 cls = seekableunbundlepart if seekable else unbundlepart
917 cls = seekableunbundlepart if seekable else unbundlepart
918 # make sure param have been loaded
918 # make sure param have been loaded
919 self.params
919 self.params
920 # From there, payload need to be decompressed
920 # From there, payload need to be decompressed
921 self._fp = self._compengine.decompressorreader(self._fp)
921 self._fp = self._compengine.decompressorreader(self._fp)
922 indebug(self.ui, b'start extraction of bundle2 parts')
922 indebug(self.ui, b'start extraction of bundle2 parts')
923 headerblock = self._readpartheader()
923 headerblock = self._readpartheader()
924 while headerblock is not None:
924 while headerblock is not None:
925 part = cls(self.ui, headerblock, self._fp)
925 part = cls(self.ui, headerblock, self._fp)
926 yield part
926 yield part
927 # Ensure part is fully consumed so we can start reading the next
927 # Ensure part is fully consumed so we can start reading the next
928 # part.
928 # part.
929 part.consume()
929 part.consume()
930
930
931 headerblock = self._readpartheader()
931 headerblock = self._readpartheader()
932 indebug(self.ui, b'end of bundle2 stream')
932 indebug(self.ui, b'end of bundle2 stream')
933
933
934 def _readpartheader(self):
934 def _readpartheader(self):
935 """reads a part header size and return the bytes blob
935 """reads a part header size and return the bytes blob
936
936
937 returns None if empty"""
937 returns None if empty"""
938 headersize = self._unpack(_fpartheadersize)[0]
938 headersize = self._unpack(_fpartheadersize)[0]
939 if headersize < 0:
939 if headersize < 0:
940 raise error.BundleValueError(
940 raise error.BundleValueError(
941 b'negative part header size: %i' % headersize
941 b'negative part header size: %i' % headersize
942 )
942 )
943 indebug(self.ui, b'part header size: %i' % headersize)
943 indebug(self.ui, b'part header size: %i' % headersize)
944 if headersize:
944 if headersize:
945 return self._readexact(headersize)
945 return self._readexact(headersize)
946 return None
946 return None
947
947
948 def compressed(self):
948 def compressed(self):
949 self.params # load params
949 self.params # load params
950 return self._compressed
950 return self._compressed
951
951
952 def close(self):
952 def close(self):
953 """close underlying file"""
953 """close underlying file"""
954 if util.safehasattr(self._fp, 'close'):
954 if util.safehasattr(self._fp, 'close'):
955 return self._fp.close()
955 return self._fp.close()
956
956
957
957
958 formatmap = {b'20': unbundle20}
958 formatmap = {b'20': unbundle20}
959
959
960 b2streamparamsmap = {}
960 b2streamparamsmap = {}
961
961
962
962
963 def b2streamparamhandler(name):
963 def b2streamparamhandler(name):
964 """register a handler for a stream level parameter"""
964 """register a handler for a stream level parameter"""
965
965
966 def decorator(func):
966 def decorator(func):
967 assert name not in formatmap
967 assert name not in formatmap
968 b2streamparamsmap[name] = func
968 b2streamparamsmap[name] = func
969 return func
969 return func
970
970
971 return decorator
971 return decorator
972
972
973
973
974 @b2streamparamhandler(b'compression')
974 @b2streamparamhandler(b'compression')
975 def processcompression(unbundler, param, value):
975 def processcompression(unbundler, param, value):
976 """read compression parameter and install payload decompression"""
976 """read compression parameter and install payload decompression"""
977 if value not in util.compengines.supportedbundletypes:
977 if value not in util.compengines.supportedbundletypes:
978 raise error.BundleUnknownFeatureError(params=(param,), values=(value,))
978 raise error.BundleUnknownFeatureError(params=(param,), values=(value,))
979 unbundler._compengine = util.compengines.forbundletype(value)
979 unbundler._compengine = util.compengines.forbundletype(value)
980 if value is not None:
980 if value is not None:
981 unbundler._compressed = True
981 unbundler._compressed = True
982
982
983
983
984 class bundlepart(object):
984 class bundlepart(object):
985 """A bundle2 part contains application level payload
985 """A bundle2 part contains application level payload
986
986
987 The part `type` is used to route the part to the application level
987 The part `type` is used to route the part to the application level
988 handler.
988 handler.
989
989
990 The part payload is contained in ``part.data``. It could be raw bytes or a
990 The part payload is contained in ``part.data``. It could be raw bytes or a
991 generator of byte chunks.
991 generator of byte chunks.
992
992
993 You can add parameters to the part using the ``addparam`` method.
993 You can add parameters to the part using the ``addparam`` method.
994 Parameters can be either mandatory (default) or advisory. Remote side
994 Parameters can be either mandatory (default) or advisory. Remote side
995 should be able to safely ignore the advisory ones.
995 should be able to safely ignore the advisory ones.
996
996
997 Both data and parameters cannot be modified after the generation has begun.
997 Both data and parameters cannot be modified after the generation has begun.
998 """
998 """
999
999
1000 def __init__(
1000 def __init__(
1001 self,
1001 self,
1002 parttype,
1002 parttype,
1003 mandatoryparams=(),
1003 mandatoryparams=(),
1004 advisoryparams=(),
1004 advisoryparams=(),
1005 data=b'',
1005 data=b'',
1006 mandatory=True,
1006 mandatory=True,
1007 ):
1007 ):
1008 validateparttype(parttype)
1008 validateparttype(parttype)
1009 self.id = None
1009 self.id = None
1010 self.type = parttype
1010 self.type = parttype
1011 self._data = data
1011 self._data = data
1012 self._mandatoryparams = list(mandatoryparams)
1012 self._mandatoryparams = list(mandatoryparams)
1013 self._advisoryparams = list(advisoryparams)
1013 self._advisoryparams = list(advisoryparams)
1014 # checking for duplicated entries
1014 # checking for duplicated entries
1015 self._seenparams = set()
1015 self._seenparams = set()
1016 for pname, __ in self._mandatoryparams + self._advisoryparams:
1016 for pname, __ in self._mandatoryparams + self._advisoryparams:
1017 if pname in self._seenparams:
1017 if pname in self._seenparams:
1018 raise error.ProgrammingError(b'duplicated params: %s' % pname)
1018 raise error.ProgrammingError(b'duplicated params: %s' % pname)
1019 self._seenparams.add(pname)
1019 self._seenparams.add(pname)
1020 # status of the part's generation:
1020 # status of the part's generation:
1021 # - None: not started,
1021 # - None: not started,
1022 # - False: currently generated,
1022 # - False: currently generated,
1023 # - True: generation done.
1023 # - True: generation done.
1024 self._generated = None
1024 self._generated = None
1025 self.mandatory = mandatory
1025 self.mandatory = mandatory
1026
1026
1027 def __repr__(self):
1027 def __repr__(self):
1028 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
1028 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
1029 return '<%s object at %x; id: %s; type: %s; mandatory: %s>' % (
1029 return '<%s object at %x; id: %s; type: %s; mandatory: %s>' % (
1030 cls,
1030 cls,
1031 id(self),
1031 id(self),
1032 self.id,
1032 self.id,
1033 self.type,
1033 self.type,
1034 self.mandatory,
1034 self.mandatory,
1035 )
1035 )
1036
1036
1037 def copy(self):
1037 def copy(self):
1038 """return a copy of the part
1038 """return a copy of the part
1039
1039
1040 The new part have the very same content but no partid assigned yet.
1040 The new part have the very same content but no partid assigned yet.
1041 Parts with generated data cannot be copied."""
1041 Parts with generated data cannot be copied."""
1042 assert not util.safehasattr(self.data, 'next')
1042 assert not util.safehasattr(self.data, 'next')
1043 return self.__class__(
1043 return self.__class__(
1044 self.type,
1044 self.type,
1045 self._mandatoryparams,
1045 self._mandatoryparams,
1046 self._advisoryparams,
1046 self._advisoryparams,
1047 self._data,
1047 self._data,
1048 self.mandatory,
1048 self.mandatory,
1049 )
1049 )
1050
1050
1051 # methods used to defines the part content
1051 # methods used to defines the part content
1052 @property
1052 @property
1053 def data(self):
1053 def data(self):
1054 return self._data
1054 return self._data
1055
1055
1056 @data.setter
1056 @data.setter
1057 def data(self, data):
1057 def data(self, data):
1058 if self._generated is not None:
1058 if self._generated is not None:
1059 raise error.ReadOnlyPartError(b'part is being generated')
1059 raise error.ReadOnlyPartError(b'part is being generated')
1060 self._data = data
1060 self._data = data
1061
1061
1062 @property
1062 @property
1063 def mandatoryparams(self):
1063 def mandatoryparams(self):
1064 # make it an immutable tuple to force people through ``addparam``
1064 # make it an immutable tuple to force people through ``addparam``
1065 return tuple(self._mandatoryparams)
1065 return tuple(self._mandatoryparams)
1066
1066
1067 @property
1067 @property
1068 def advisoryparams(self):
1068 def advisoryparams(self):
1069 # make it an immutable tuple to force people through ``addparam``
1069 # make it an immutable tuple to force people through ``addparam``
1070 return tuple(self._advisoryparams)
1070 return tuple(self._advisoryparams)
1071
1071
1072 def addparam(self, name, value=b'', mandatory=True):
1072 def addparam(self, name, value=b'', mandatory=True):
1073 """add a parameter to the part
1073 """add a parameter to the part
1074
1074
1075 If 'mandatory' is set to True, the remote handler must claim support
1075 If 'mandatory' is set to True, the remote handler must claim support
1076 for this parameter or the unbundling will be aborted.
1076 for this parameter or the unbundling will be aborted.
1077
1077
1078 The 'name' and 'value' cannot exceed 255 bytes each.
1078 The 'name' and 'value' cannot exceed 255 bytes each.
1079 """
1079 """
1080 if self._generated is not None:
1080 if self._generated is not None:
1081 raise error.ReadOnlyPartError(b'part is being generated')
1081 raise error.ReadOnlyPartError(b'part is being generated')
1082 if name in self._seenparams:
1082 if name in self._seenparams:
1083 raise ValueError(b'duplicated params: %s' % name)
1083 raise ValueError(b'duplicated params: %s' % name)
1084 self._seenparams.add(name)
1084 self._seenparams.add(name)
1085 params = self._advisoryparams
1085 params = self._advisoryparams
1086 if mandatory:
1086 if mandatory:
1087 params = self._mandatoryparams
1087 params = self._mandatoryparams
1088 params.append((name, value))
1088 params.append((name, value))
1089
1089
1090 # methods used to generates the bundle2 stream
1090 # methods used to generates the bundle2 stream
1091 def getchunks(self, ui):
1091 def getchunks(self, ui):
1092 if self._generated is not None:
1092 if self._generated is not None:
1093 raise error.ProgrammingError(b'part can only be consumed once')
1093 raise error.ProgrammingError(b'part can only be consumed once')
1094 self._generated = False
1094 self._generated = False
1095
1095
1096 if ui.debugflag:
1096 if ui.debugflag:
1097 msg = [b'bundle2-output-part: "%s"' % self.type]
1097 msg = [b'bundle2-output-part: "%s"' % self.type]
1098 if not self.mandatory:
1098 if not self.mandatory:
1099 msg.append(b' (advisory)')
1099 msg.append(b' (advisory)')
1100 nbmp = len(self.mandatoryparams)
1100 nbmp = len(self.mandatoryparams)
1101 nbap = len(self.advisoryparams)
1101 nbap = len(self.advisoryparams)
1102 if nbmp or nbap:
1102 if nbmp or nbap:
1103 msg.append(b' (params:')
1103 msg.append(b' (params:')
1104 if nbmp:
1104 if nbmp:
1105 msg.append(b' %i mandatory' % nbmp)
1105 msg.append(b' %i mandatory' % nbmp)
1106 if nbap:
1106 if nbap:
1107 msg.append(b' %i advisory' % nbmp)
1107 msg.append(b' %i advisory' % nbmp)
1108 msg.append(b')')
1108 msg.append(b')')
1109 if not self.data:
1109 if not self.data:
1110 msg.append(b' empty payload')
1110 msg.append(b' empty payload')
1111 elif util.safehasattr(self.data, 'next') or util.safehasattr(
1111 elif util.safehasattr(self.data, 'next') or util.safehasattr(
1112 self.data, b'__next__'
1112 self.data, b'__next__'
1113 ):
1113 ):
1114 msg.append(b' streamed payload')
1114 msg.append(b' streamed payload')
1115 else:
1115 else:
1116 msg.append(b' %i bytes payload' % len(self.data))
1116 msg.append(b' %i bytes payload' % len(self.data))
1117 msg.append(b'\n')
1117 msg.append(b'\n')
1118 ui.debug(b''.join(msg))
1118 ui.debug(b''.join(msg))
1119
1119
1120 #### header
1120 #### header
1121 if self.mandatory:
1121 if self.mandatory:
1122 parttype = self.type.upper()
1122 parttype = self.type.upper()
1123 else:
1123 else:
1124 parttype = self.type.lower()
1124 parttype = self.type.lower()
1125 outdebug(ui, b'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1125 outdebug(ui, b'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1126 ## parttype
1126 ## parttype
1127 header = [
1127 header = [
1128 _pack(_fparttypesize, len(parttype)),
1128 _pack(_fparttypesize, len(parttype)),
1129 parttype,
1129 parttype,
1130 _pack(_fpartid, self.id),
1130 _pack(_fpartid, self.id),
1131 ]
1131 ]
1132 ## parameters
1132 ## parameters
1133 # count
1133 # count
1134 manpar = self.mandatoryparams
1134 manpar = self.mandatoryparams
1135 advpar = self.advisoryparams
1135 advpar = self.advisoryparams
1136 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1136 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1137 # size
1137 # size
1138 parsizes = []
1138 parsizes = []
1139 for key, value in manpar:
1139 for key, value in manpar:
1140 parsizes.append(len(key))
1140 parsizes.append(len(key))
1141 parsizes.append(len(value))
1141 parsizes.append(len(value))
1142 for key, value in advpar:
1142 for key, value in advpar:
1143 parsizes.append(len(key))
1143 parsizes.append(len(key))
1144 parsizes.append(len(value))
1144 parsizes.append(len(value))
1145 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1145 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1146 header.append(paramsizes)
1146 header.append(paramsizes)
1147 # key, value
1147 # key, value
1148 for key, value in manpar:
1148 for key, value in manpar:
1149 header.append(key)
1149 header.append(key)
1150 header.append(value)
1150 header.append(value)
1151 for key, value in advpar:
1151 for key, value in advpar:
1152 header.append(key)
1152 header.append(key)
1153 header.append(value)
1153 header.append(value)
1154 ## finalize header
1154 ## finalize header
1155 try:
1155 try:
1156 headerchunk = b''.join(header)
1156 headerchunk = b''.join(header)
1157 except TypeError:
1157 except TypeError:
1158 raise TypeError(
1158 raise TypeError(
1159 'Found a non-bytes trying to '
1159 'Found a non-bytes trying to '
1160 'build bundle part header: %r' % header
1160 'build bundle part header: %r' % header
1161 )
1161 )
1162 outdebug(ui, b'header chunk size: %i' % len(headerchunk))
1162 outdebug(ui, b'header chunk size: %i' % len(headerchunk))
1163 yield _pack(_fpartheadersize, len(headerchunk))
1163 yield _pack(_fpartheadersize, len(headerchunk))
1164 yield headerchunk
1164 yield headerchunk
1165 ## payload
1165 ## payload
1166 try:
1166 try:
1167 for chunk in self._payloadchunks():
1167 for chunk in self._payloadchunks():
1168 outdebug(ui, b'payload chunk size: %i' % len(chunk))
1168 outdebug(ui, b'payload chunk size: %i' % len(chunk))
1169 yield _pack(_fpayloadsize, len(chunk))
1169 yield _pack(_fpayloadsize, len(chunk))
1170 yield chunk
1170 yield chunk
1171 except GeneratorExit:
1171 except GeneratorExit:
1172 # GeneratorExit means that nobody is listening for our
1172 # GeneratorExit means that nobody is listening for our
1173 # results anyway, so just bail quickly rather than trying
1173 # results anyway, so just bail quickly rather than trying
1174 # to produce an error part.
1174 # to produce an error part.
1175 ui.debug(b'bundle2-generatorexit\n')
1175 ui.debug(b'bundle2-generatorexit\n')
1176 raise
1176 raise
1177 except BaseException as exc:
1177 except BaseException as exc:
1178 bexc = stringutil.forcebytestr(exc)
1178 bexc = stringutil.forcebytestr(exc)
1179 # backup exception data for later
1179 # backup exception data for later
1180 ui.debug(
1180 ui.debug(
1181 b'bundle2-input-stream-interrupt: encoding exception %s' % bexc
1181 b'bundle2-input-stream-interrupt: encoding exception %s' % bexc
1182 )
1182 )
1183 tb = sys.exc_info()[2]
1183 tb = sys.exc_info()[2]
1184 msg = b'unexpected error: %s' % bexc
1184 msg = b'unexpected error: %s' % bexc
1185 interpart = bundlepart(
1185 interpart = bundlepart(
1186 b'error:abort', [(b'message', msg)], mandatory=False
1186 b'error:abort', [(b'message', msg)], mandatory=False
1187 )
1187 )
1188 interpart.id = 0
1188 interpart.id = 0
1189 yield _pack(_fpayloadsize, -1)
1189 yield _pack(_fpayloadsize, -1)
1190 for chunk in interpart.getchunks(ui=ui):
1190 for chunk in interpart.getchunks(ui=ui):
1191 yield chunk
1191 yield chunk
1192 outdebug(ui, b'closing payload chunk')
1192 outdebug(ui, b'closing payload chunk')
1193 # abort current part payload
1193 # abort current part payload
1194 yield _pack(_fpayloadsize, 0)
1194 yield _pack(_fpayloadsize, 0)
1195 pycompat.raisewithtb(exc, tb)
1195 pycompat.raisewithtb(exc, tb)
1196 # end of payload
1196 # end of payload
1197 outdebug(ui, b'closing payload chunk')
1197 outdebug(ui, b'closing payload chunk')
1198 yield _pack(_fpayloadsize, 0)
1198 yield _pack(_fpayloadsize, 0)
1199 self._generated = True
1199 self._generated = True
1200
1200
1201 def _payloadchunks(self):
1201 def _payloadchunks(self):
1202 """yield chunks of a the part payload
1202 """yield chunks of a the part payload
1203
1203
1204 Exists to handle the different methods to provide data to a part."""
1204 Exists to handle the different methods to provide data to a part."""
1205 # we only support fixed size data now.
1205 # we only support fixed size data now.
1206 # This will be improved in the future.
1206 # This will be improved in the future.
1207 if util.safehasattr(self.data, 'next') or util.safehasattr(
1207 if util.safehasattr(self.data, 'next') or util.safehasattr(
1208 self.data, b'__next__'
1208 self.data, b'__next__'
1209 ):
1209 ):
1210 buff = util.chunkbuffer(self.data)
1210 buff = util.chunkbuffer(self.data)
1211 chunk = buff.read(preferedchunksize)
1211 chunk = buff.read(preferedchunksize)
1212 while chunk:
1212 while chunk:
1213 yield chunk
1213 yield chunk
1214 chunk = buff.read(preferedchunksize)
1214 chunk = buff.read(preferedchunksize)
1215 elif len(self.data):
1215 elif len(self.data):
1216 yield self.data
1216 yield self.data
1217
1217
1218
1218
1219 flaginterrupt = -1
1219 flaginterrupt = -1
1220
1220
1221
1221
1222 class interrupthandler(unpackermixin):
1222 class interrupthandler(unpackermixin):
1223 """read one part and process it with restricted capability
1223 """read one part and process it with restricted capability
1224
1224
1225 This allows to transmit exception raised on the producer size during part
1225 This allows to transmit exception raised on the producer size during part
1226 iteration while the consumer is reading a part.
1226 iteration while the consumer is reading a part.
1227
1227
1228 Part processed in this manner only have access to a ui object,"""
1228 Part processed in this manner only have access to a ui object,"""
1229
1229
1230 def __init__(self, ui, fp):
1230 def __init__(self, ui, fp):
1231 super(interrupthandler, self).__init__(fp)
1231 super(interrupthandler, self).__init__(fp)
1232 self.ui = ui
1232 self.ui = ui
1233
1233
1234 def _readpartheader(self):
1234 def _readpartheader(self):
1235 """reads a part header size and return the bytes blob
1235 """reads a part header size and return the bytes blob
1236
1236
1237 returns None if empty"""
1237 returns None if empty"""
1238 headersize = self._unpack(_fpartheadersize)[0]
1238 headersize = self._unpack(_fpartheadersize)[0]
1239 if headersize < 0:
1239 if headersize < 0:
1240 raise error.BundleValueError(
1240 raise error.BundleValueError(
1241 b'negative part header size: %i' % headersize
1241 b'negative part header size: %i' % headersize
1242 )
1242 )
1243 indebug(self.ui, b'part header size: %i\n' % headersize)
1243 indebug(self.ui, b'part header size: %i\n' % headersize)
1244 if headersize:
1244 if headersize:
1245 return self._readexact(headersize)
1245 return self._readexact(headersize)
1246 return None
1246 return None
1247
1247
1248 def __call__(self):
1248 def __call__(self):
1249
1249
1250 self.ui.debug(
1250 self.ui.debug(
1251 b'bundle2-input-stream-interrupt: opening out of band context\n'
1251 b'bundle2-input-stream-interrupt: opening out of band context\n'
1252 )
1252 )
1253 indebug(self.ui, b'bundle2 stream interruption, looking for a part.')
1253 indebug(self.ui, b'bundle2 stream interruption, looking for a part.')
1254 headerblock = self._readpartheader()
1254 headerblock = self._readpartheader()
1255 if headerblock is None:
1255 if headerblock is None:
1256 indebug(self.ui, b'no part found during interruption.')
1256 indebug(self.ui, b'no part found during interruption.')
1257 return
1257 return
1258 part = unbundlepart(self.ui, headerblock, self._fp)
1258 part = unbundlepart(self.ui, headerblock, self._fp)
1259 op = interruptoperation(self.ui)
1259 op = interruptoperation(self.ui)
1260 hardabort = False
1260 hardabort = False
1261 try:
1261 try:
1262 _processpart(op, part)
1262 _processpart(op, part)
1263 except (SystemExit, KeyboardInterrupt):
1263 except (SystemExit, KeyboardInterrupt):
1264 hardabort = True
1264 hardabort = True
1265 raise
1265 raise
1266 finally:
1266 finally:
1267 if not hardabort:
1267 if not hardabort:
1268 part.consume()
1268 part.consume()
1269 self.ui.debug(
1269 self.ui.debug(
1270 b'bundle2-input-stream-interrupt: closing out of band context\n'
1270 b'bundle2-input-stream-interrupt: closing out of band context\n'
1271 )
1271 )
1272
1272
1273
1273
1274 class interruptoperation(object):
1274 class interruptoperation(object):
1275 """A limited operation to be use by part handler during interruption
1275 """A limited operation to be use by part handler during interruption
1276
1276
1277 It only have access to an ui object.
1277 It only have access to an ui object.
1278 """
1278 """
1279
1279
1280 def __init__(self, ui):
1280 def __init__(self, ui):
1281 self.ui = ui
1281 self.ui = ui
1282 self.reply = None
1282 self.reply = None
1283 self.captureoutput = False
1283 self.captureoutput = False
1284
1284
1285 @property
1285 @property
1286 def repo(self):
1286 def repo(self):
1287 raise error.ProgrammingError(b'no repo access from stream interruption')
1287 raise error.ProgrammingError(b'no repo access from stream interruption')
1288
1288
1289 def gettransaction(self):
1289 def gettransaction(self):
1290 raise TransactionUnavailable(b'no repo access from stream interruption')
1290 raise TransactionUnavailable(b'no repo access from stream interruption')
1291
1291
1292
1292
1293 def decodepayloadchunks(ui, fh):
1293 def decodepayloadchunks(ui, fh):
1294 """Reads bundle2 part payload data into chunks.
1294 """Reads bundle2 part payload data into chunks.
1295
1295
1296 Part payload data consists of framed chunks. This function takes
1296 Part payload data consists of framed chunks. This function takes
1297 a file handle and emits those chunks.
1297 a file handle and emits those chunks.
1298 """
1298 """
1299 dolog = ui.configbool(b'devel', b'bundle2.debug')
1299 dolog = ui.configbool(b'devel', b'bundle2.debug')
1300 debug = ui.debug
1300 debug = ui.debug
1301
1301
1302 headerstruct = struct.Struct(_fpayloadsize)
1302 headerstruct = struct.Struct(_fpayloadsize)
1303 headersize = headerstruct.size
1303 headersize = headerstruct.size
1304 unpack = headerstruct.unpack
1304 unpack = headerstruct.unpack
1305
1305
1306 readexactly = changegroup.readexactly
1306 readexactly = changegroup.readexactly
1307 read = fh.read
1307 read = fh.read
1308
1308
1309 chunksize = unpack(readexactly(fh, headersize))[0]
1309 chunksize = unpack(readexactly(fh, headersize))[0]
1310 indebug(ui, b'payload chunk size: %i' % chunksize)
1310 indebug(ui, b'payload chunk size: %i' % chunksize)
1311
1311
1312 # changegroup.readexactly() is inlined below for performance.
1312 # changegroup.readexactly() is inlined below for performance.
1313 while chunksize:
1313 while chunksize:
1314 if chunksize >= 0:
1314 if chunksize >= 0:
1315 s = read(chunksize)
1315 s = read(chunksize)
1316 if len(s) < chunksize:
1316 if len(s) < chunksize:
1317 raise error.Abort(
1317 raise error.Abort(
1318 _(
1318 _(
1319 b'stream ended unexpectedly '
1319 b'stream ended unexpectedly '
1320 b' (got %d bytes, expected %d)'
1320 b' (got %d bytes, expected %d)'
1321 )
1321 )
1322 % (len(s), chunksize)
1322 % (len(s), chunksize)
1323 )
1323 )
1324
1324
1325 yield s
1325 yield s
1326 elif chunksize == flaginterrupt:
1326 elif chunksize == flaginterrupt:
1327 # Interrupt "signal" detected. The regular stream is interrupted
1327 # Interrupt "signal" detected. The regular stream is interrupted
1328 # and a bundle2 part follows. Consume it.
1328 # and a bundle2 part follows. Consume it.
1329 interrupthandler(ui, fh)()
1329 interrupthandler(ui, fh)()
1330 else:
1330 else:
1331 raise error.BundleValueError(
1331 raise error.BundleValueError(
1332 b'negative payload chunk size: %s' % chunksize
1332 b'negative payload chunk size: %s' % chunksize
1333 )
1333 )
1334
1334
1335 s = read(headersize)
1335 s = read(headersize)
1336 if len(s) < headersize:
1336 if len(s) < headersize:
1337 raise error.Abort(
1337 raise error.Abort(
1338 _(b'stream ended unexpectedly (got %d bytes, expected %d)')
1338 _(b'stream ended unexpectedly (got %d bytes, expected %d)')
1339 % (len(s), chunksize)
1339 % (len(s), chunksize)
1340 )
1340 )
1341
1341
1342 chunksize = unpack(s)[0]
1342 chunksize = unpack(s)[0]
1343
1343
1344 # indebug() inlined for performance.
1344 # indebug() inlined for performance.
1345 if dolog:
1345 if dolog:
1346 debug(b'bundle2-input: payload chunk size: %i\n' % chunksize)
1346 debug(b'bundle2-input: payload chunk size: %i\n' % chunksize)
1347
1347
1348
1348
1349 class unbundlepart(unpackermixin):
1349 class unbundlepart(unpackermixin):
1350 """a bundle part read from a bundle"""
1350 """a bundle part read from a bundle"""
1351
1351
1352 def __init__(self, ui, header, fp):
1352 def __init__(self, ui, header, fp):
1353 super(unbundlepart, self).__init__(fp)
1353 super(unbundlepart, self).__init__(fp)
1354 self._seekable = util.safehasattr(fp, 'seek') and util.safehasattr(
1354 self._seekable = util.safehasattr(fp, 'seek') and util.safehasattr(
1355 fp, b'tell'
1355 fp, b'tell'
1356 )
1356 )
1357 self.ui = ui
1357 self.ui = ui
1358 # unbundle state attr
1358 # unbundle state attr
1359 self._headerdata = header
1359 self._headerdata = header
1360 self._headeroffset = 0
1360 self._headeroffset = 0
1361 self._initialized = False
1361 self._initialized = False
1362 self.consumed = False
1362 self.consumed = False
1363 # part data
1363 # part data
1364 self.id = None
1364 self.id = None
1365 self.type = None
1365 self.type = None
1366 self.mandatoryparams = None
1366 self.mandatoryparams = None
1367 self.advisoryparams = None
1367 self.advisoryparams = None
1368 self.params = None
1368 self.params = None
1369 self.mandatorykeys = ()
1369 self.mandatorykeys = ()
1370 self._readheader()
1370 self._readheader()
1371 self._mandatory = None
1371 self._mandatory = None
1372 self._pos = 0
1372 self._pos = 0
1373
1373
1374 def _fromheader(self, size):
1374 def _fromheader(self, size):
1375 """return the next <size> byte from the header"""
1375 """return the next <size> byte from the header"""
1376 offset = self._headeroffset
1376 offset = self._headeroffset
1377 data = self._headerdata[offset : (offset + size)]
1377 data = self._headerdata[offset : (offset + size)]
1378 self._headeroffset = offset + size
1378 self._headeroffset = offset + size
1379 return data
1379 return data
1380
1380
1381 def _unpackheader(self, format):
1381 def _unpackheader(self, format):
1382 """read given format from header
1382 """read given format from header
1383
1383
1384 This automatically compute the size of the format to read."""
1384 This automatically compute the size of the format to read."""
1385 data = self._fromheader(struct.calcsize(format))
1385 data = self._fromheader(struct.calcsize(format))
1386 return _unpack(format, data)
1386 return _unpack(format, data)
1387
1387
1388 def _initparams(self, mandatoryparams, advisoryparams):
1388 def _initparams(self, mandatoryparams, advisoryparams):
1389 """internal function to setup all logic related parameters"""
1389 """internal function to setup all logic related parameters"""
1390 # make it read only to prevent people touching it by mistake.
1390 # make it read only to prevent people touching it by mistake.
1391 self.mandatoryparams = tuple(mandatoryparams)
1391 self.mandatoryparams = tuple(mandatoryparams)
1392 self.advisoryparams = tuple(advisoryparams)
1392 self.advisoryparams = tuple(advisoryparams)
1393 # user friendly UI
1393 # user friendly UI
1394 self.params = util.sortdict(self.mandatoryparams)
1394 self.params = util.sortdict(self.mandatoryparams)
1395 self.params.update(self.advisoryparams)
1395 self.params.update(self.advisoryparams)
1396 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1396 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1397
1397
1398 def _readheader(self):
1398 def _readheader(self):
1399 """read the header and setup the object"""
1399 """read the header and setup the object"""
1400 typesize = self._unpackheader(_fparttypesize)[0]
1400 typesize = self._unpackheader(_fparttypesize)[0]
1401 self.type = self._fromheader(typesize)
1401 self.type = self._fromheader(typesize)
1402 indebug(self.ui, b'part type: "%s"' % self.type)
1402 indebug(self.ui, b'part type: "%s"' % self.type)
1403 self.id = self._unpackheader(_fpartid)[0]
1403 self.id = self._unpackheader(_fpartid)[0]
1404 indebug(self.ui, b'part id: "%s"' % pycompat.bytestr(self.id))
1404 indebug(self.ui, b'part id: "%s"' % pycompat.bytestr(self.id))
1405 # extract mandatory bit from type
1405 # extract mandatory bit from type
1406 self.mandatory = self.type != self.type.lower()
1406 self.mandatory = self.type != self.type.lower()
1407 self.type = self.type.lower()
1407 self.type = self.type.lower()
1408 ## reading parameters
1408 ## reading parameters
1409 # param count
1409 # param count
1410 mancount, advcount = self._unpackheader(_fpartparamcount)
1410 mancount, advcount = self._unpackheader(_fpartparamcount)
1411 indebug(self.ui, b'part parameters: %i' % (mancount + advcount))
1411 indebug(self.ui, b'part parameters: %i' % (mancount + advcount))
1412 # param size
1412 # param size
1413 fparamsizes = _makefpartparamsizes(mancount + advcount)
1413 fparamsizes = _makefpartparamsizes(mancount + advcount)
1414 paramsizes = self._unpackheader(fparamsizes)
1414 paramsizes = self._unpackheader(fparamsizes)
1415 # make it a list of couple again
1415 # make it a list of couple again
1416 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1416 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1417 # split mandatory from advisory
1417 # split mandatory from advisory
1418 mansizes = paramsizes[:mancount]
1418 mansizes = paramsizes[:mancount]
1419 advsizes = paramsizes[mancount:]
1419 advsizes = paramsizes[mancount:]
1420 # retrieve param value
1420 # retrieve param value
1421 manparams = []
1421 manparams = []
1422 for key, value in mansizes:
1422 for key, value in mansizes:
1423 manparams.append((self._fromheader(key), self._fromheader(value)))
1423 manparams.append((self._fromheader(key), self._fromheader(value)))
1424 advparams = []
1424 advparams = []
1425 for key, value in advsizes:
1425 for key, value in advsizes:
1426 advparams.append((self._fromheader(key), self._fromheader(value)))
1426 advparams.append((self._fromheader(key), self._fromheader(value)))
1427 self._initparams(manparams, advparams)
1427 self._initparams(manparams, advparams)
1428 ## part payload
1428 ## part payload
1429 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1429 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1430 # we read the data, tell it
1430 # we read the data, tell it
1431 self._initialized = True
1431 self._initialized = True
1432
1432
1433 def _payloadchunks(self):
1433 def _payloadchunks(self):
1434 """Generator of decoded chunks in the payload."""
1434 """Generator of decoded chunks in the payload."""
1435 return decodepayloadchunks(self.ui, self._fp)
1435 return decodepayloadchunks(self.ui, self._fp)
1436
1436
1437 def consume(self):
1437 def consume(self):
1438 """Read the part payload until completion.
1438 """Read the part payload until completion.
1439
1439
1440 By consuming the part data, the underlying stream read offset will
1440 By consuming the part data, the underlying stream read offset will
1441 be advanced to the next part (or end of stream).
1441 be advanced to the next part (or end of stream).
1442 """
1442 """
1443 if self.consumed:
1443 if self.consumed:
1444 return
1444 return
1445
1445
1446 chunk = self.read(32768)
1446 chunk = self.read(32768)
1447 while chunk:
1447 while chunk:
1448 self._pos += len(chunk)
1448 self._pos += len(chunk)
1449 chunk = self.read(32768)
1449 chunk = self.read(32768)
1450
1450
1451 def read(self, size=None):
1451 def read(self, size=None):
1452 """read payload data"""
1452 """read payload data"""
1453 if not self._initialized:
1453 if not self._initialized:
1454 self._readheader()
1454 self._readheader()
1455 if size is None:
1455 if size is None:
1456 data = self._payloadstream.read()
1456 data = self._payloadstream.read()
1457 else:
1457 else:
1458 data = self._payloadstream.read(size)
1458 data = self._payloadstream.read(size)
1459 self._pos += len(data)
1459 self._pos += len(data)
1460 if size is None or len(data) < size:
1460 if size is None or len(data) < size:
1461 if not self.consumed and self._pos:
1461 if not self.consumed and self._pos:
1462 self.ui.debug(
1462 self.ui.debug(
1463 b'bundle2-input-part: total payload size %i\n' % self._pos
1463 b'bundle2-input-part: total payload size %i\n' % self._pos
1464 )
1464 )
1465 self.consumed = True
1465 self.consumed = True
1466 return data
1466 return data
1467
1467
1468
1468
1469 class seekableunbundlepart(unbundlepart):
1469 class seekableunbundlepart(unbundlepart):
1470 """A bundle2 part in a bundle that is seekable.
1470 """A bundle2 part in a bundle that is seekable.
1471
1471
1472 Regular ``unbundlepart`` instances can only be read once. This class
1472 Regular ``unbundlepart`` instances can only be read once. This class
1473 extends ``unbundlepart`` to enable bi-directional seeking within the
1473 extends ``unbundlepart`` to enable bi-directional seeking within the
1474 part.
1474 part.
1475
1475
1476 Bundle2 part data consists of framed chunks. Offsets when seeking
1476 Bundle2 part data consists of framed chunks. Offsets when seeking
1477 refer to the decoded data, not the offsets in the underlying bundle2
1477 refer to the decoded data, not the offsets in the underlying bundle2
1478 stream.
1478 stream.
1479
1479
1480 To facilitate quickly seeking within the decoded data, instances of this
1480 To facilitate quickly seeking within the decoded data, instances of this
1481 class maintain a mapping between offsets in the underlying stream and
1481 class maintain a mapping between offsets in the underlying stream and
1482 the decoded payload. This mapping will consume memory in proportion
1482 the decoded payload. This mapping will consume memory in proportion
1483 to the number of chunks within the payload (which almost certainly
1483 to the number of chunks within the payload (which almost certainly
1484 increases in proportion with the size of the part).
1484 increases in proportion with the size of the part).
1485 """
1485 """
1486
1486
1487 def __init__(self, ui, header, fp):
1487 def __init__(self, ui, header, fp):
1488 # (payload, file) offsets for chunk starts.
1488 # (payload, file) offsets for chunk starts.
1489 self._chunkindex = []
1489 self._chunkindex = []
1490
1490
1491 super(seekableunbundlepart, self).__init__(ui, header, fp)
1491 super(seekableunbundlepart, self).__init__(ui, header, fp)
1492
1492
1493 def _payloadchunks(self, chunknum=0):
1493 def _payloadchunks(self, chunknum=0):
1494 '''seek to specified chunk and start yielding data'''
1494 '''seek to specified chunk and start yielding data'''
1495 if len(self._chunkindex) == 0:
1495 if len(self._chunkindex) == 0:
1496 assert chunknum == 0, b'Must start with chunk 0'
1496 assert chunknum == 0, b'Must start with chunk 0'
1497 self._chunkindex.append((0, self._tellfp()))
1497 self._chunkindex.append((0, self._tellfp()))
1498 else:
1498 else:
1499 assert chunknum < len(self._chunkindex), (
1499 assert chunknum < len(self._chunkindex), (
1500 b'Unknown chunk %d' % chunknum
1500 b'Unknown chunk %d' % chunknum
1501 )
1501 )
1502 self._seekfp(self._chunkindex[chunknum][1])
1502 self._seekfp(self._chunkindex[chunknum][1])
1503
1503
1504 pos = self._chunkindex[chunknum][0]
1504 pos = self._chunkindex[chunknum][0]
1505
1505
1506 for chunk in decodepayloadchunks(self.ui, self._fp):
1506 for chunk in decodepayloadchunks(self.ui, self._fp):
1507 chunknum += 1
1507 chunknum += 1
1508 pos += len(chunk)
1508 pos += len(chunk)
1509 if chunknum == len(self._chunkindex):
1509 if chunknum == len(self._chunkindex):
1510 self._chunkindex.append((pos, self._tellfp()))
1510 self._chunkindex.append((pos, self._tellfp()))
1511
1511
1512 yield chunk
1512 yield chunk
1513
1513
1514 def _findchunk(self, pos):
1514 def _findchunk(self, pos):
1515 '''for a given payload position, return a chunk number and offset'''
1515 '''for a given payload position, return a chunk number and offset'''
1516 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1516 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1517 if ppos == pos:
1517 if ppos == pos:
1518 return chunk, 0
1518 return chunk, 0
1519 elif ppos > pos:
1519 elif ppos > pos:
1520 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1520 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1521 raise ValueError(b'Unknown chunk')
1521 raise ValueError(b'Unknown chunk')
1522
1522
1523 def tell(self):
1523 def tell(self):
1524 return self._pos
1524 return self._pos
1525
1525
1526 def seek(self, offset, whence=os.SEEK_SET):
1526 def seek(self, offset, whence=os.SEEK_SET):
1527 if whence == os.SEEK_SET:
1527 if whence == os.SEEK_SET:
1528 newpos = offset
1528 newpos = offset
1529 elif whence == os.SEEK_CUR:
1529 elif whence == os.SEEK_CUR:
1530 newpos = self._pos + offset
1530 newpos = self._pos + offset
1531 elif whence == os.SEEK_END:
1531 elif whence == os.SEEK_END:
1532 if not self.consumed:
1532 if not self.consumed:
1533 # Can't use self.consume() here because it advances self._pos.
1533 # Can't use self.consume() here because it advances self._pos.
1534 chunk = self.read(32768)
1534 chunk = self.read(32768)
1535 while chunk:
1535 while chunk:
1536 chunk = self.read(32768)
1536 chunk = self.read(32768)
1537 newpos = self._chunkindex[-1][0] - offset
1537 newpos = self._chunkindex[-1][0] - offset
1538 else:
1538 else:
1539 raise ValueError(b'Unknown whence value: %r' % (whence,))
1539 raise ValueError(b'Unknown whence value: %r' % (whence,))
1540
1540
1541 if newpos > self._chunkindex[-1][0] and not self.consumed:
1541 if newpos > self._chunkindex[-1][0] and not self.consumed:
1542 # Can't use self.consume() here because it advances self._pos.
1542 # Can't use self.consume() here because it advances self._pos.
1543 chunk = self.read(32768)
1543 chunk = self.read(32768)
1544 while chunk:
1544 while chunk:
1545 chunk = self.read(32668)
1545 chunk = self.read(32668)
1546
1546
1547 if not 0 <= newpos <= self._chunkindex[-1][0]:
1547 if not 0 <= newpos <= self._chunkindex[-1][0]:
1548 raise ValueError(b'Offset out of range')
1548 raise ValueError(b'Offset out of range')
1549
1549
1550 if self._pos != newpos:
1550 if self._pos != newpos:
1551 chunk, internaloffset = self._findchunk(newpos)
1551 chunk, internaloffset = self._findchunk(newpos)
1552 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1552 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1553 adjust = self.read(internaloffset)
1553 adjust = self.read(internaloffset)
1554 if len(adjust) != internaloffset:
1554 if len(adjust) != internaloffset:
1555 raise error.Abort(_(b'Seek failed\n'))
1555 raise error.Abort(_(b'Seek failed\n'))
1556 self._pos = newpos
1556 self._pos = newpos
1557
1557
1558 def _seekfp(self, offset, whence=0):
1558 def _seekfp(self, offset, whence=0):
1559 """move the underlying file pointer
1559 """move the underlying file pointer
1560
1560
1561 This method is meant for internal usage by the bundle2 protocol only.
1561 This method is meant for internal usage by the bundle2 protocol only.
1562 They directly manipulate the low level stream including bundle2 level
1562 They directly manipulate the low level stream including bundle2 level
1563 instruction.
1563 instruction.
1564
1564
1565 Do not use it to implement higher-level logic or methods."""
1565 Do not use it to implement higher-level logic or methods."""
1566 if self._seekable:
1566 if self._seekable:
1567 return self._fp.seek(offset, whence)
1567 return self._fp.seek(offset, whence)
1568 else:
1568 else:
1569 raise NotImplementedError(_(b'File pointer is not seekable'))
1569 raise NotImplementedError(_(b'File pointer is not seekable'))
1570
1570
1571 def _tellfp(self):
1571 def _tellfp(self):
1572 """return the file offset, or None if file is not seekable
1572 """return the file offset, or None if file is not seekable
1573
1573
1574 This method is meant for internal usage by the bundle2 protocol only.
1574 This method is meant for internal usage by the bundle2 protocol only.
1575 They directly manipulate the low level stream including bundle2 level
1575 They directly manipulate the low level stream including bundle2 level
1576 instruction.
1576 instruction.
1577
1577
1578 Do not use it to implement higher-level logic or methods."""
1578 Do not use it to implement higher-level logic or methods."""
1579 if self._seekable:
1579 if self._seekable:
1580 try:
1580 try:
1581 return self._fp.tell()
1581 return self._fp.tell()
1582 except IOError as e:
1582 except IOError as e:
1583 if e.errno == errno.ESPIPE:
1583 if e.errno == errno.ESPIPE:
1584 self._seekable = False
1584 self._seekable = False
1585 else:
1585 else:
1586 raise
1586 raise
1587 return None
1587 return None
1588
1588
1589
1589
1590 # These are only the static capabilities.
1590 # These are only the static capabilities.
1591 # Check the 'getrepocaps' function for the rest.
1591 # Check the 'getrepocaps' function for the rest.
1592 capabilities = {
1592 capabilities = {
1593 b'HG20': (),
1593 b'HG20': (),
1594 b'bookmarks': (),
1594 b'bookmarks': (),
1595 b'error': (b'abort', b'unsupportedcontent', b'pushraced', b'pushkey'),
1595 b'error': (b'abort', b'unsupportedcontent', b'pushraced', b'pushkey'),
1596 b'listkeys': (),
1596 b'listkeys': (),
1597 b'pushkey': (),
1597 b'pushkey': (),
1598 b'digests': tuple(sorted(util.DIGESTS.keys())),
1598 b'digests': tuple(sorted(util.DIGESTS.keys())),
1599 b'remote-changegroup': (b'http', b'https'),
1599 b'remote-changegroup': (b'http', b'https'),
1600 b'hgtagsfnodes': (),
1600 b'hgtagsfnodes': (),
1601 b'rev-branch-cache': (),
1601 b'rev-branch-cache': (),
1602 b'phases': (b'heads',),
1602 b'phases': (b'heads',),
1603 b'stream': (b'v2',),
1603 b'stream': (b'v2',),
1604 }
1604 }
1605
1605
1606
1606
1607 def getrepocaps(repo, allowpushback=False, role=None):
1607 def getrepocaps(repo, allowpushback=False, role=None):
1608 """return the bundle2 capabilities for a given repo
1608 """return the bundle2 capabilities for a given repo
1609
1609
1610 Exists to allow extensions (like evolution) to mutate the capabilities.
1610 Exists to allow extensions (like evolution) to mutate the capabilities.
1611
1611
1612 The returned value is used for servers advertising their capabilities as
1612 The returned value is used for servers advertising their capabilities as
1613 well as clients advertising their capabilities to servers as part of
1613 well as clients advertising their capabilities to servers as part of
1614 bundle2 requests. The ``role`` argument specifies which is which.
1614 bundle2 requests. The ``role`` argument specifies which is which.
1615 """
1615 """
1616 if role not in (b'client', b'server'):
1616 if role not in (b'client', b'server'):
1617 raise error.ProgrammingError(b'role argument must be client or server')
1617 raise error.ProgrammingError(b'role argument must be client or server')
1618
1618
1619 caps = capabilities.copy()
1619 caps = capabilities.copy()
1620 caps[b'changegroup'] = tuple(
1620 caps[b'changegroup'] = tuple(
1621 sorted(changegroup.supportedincomingversions(repo))
1621 sorted(changegroup.supportedincomingversions(repo))
1622 )
1622 )
1623 if obsolete.isenabled(repo, obsolete.exchangeopt):
1623 if obsolete.isenabled(repo, obsolete.exchangeopt):
1624 supportedformat = tuple(b'V%i' % v for v in obsolete.formats)
1624 supportedformat = tuple(b'V%i' % v for v in obsolete.formats)
1625 caps[b'obsmarkers'] = supportedformat
1625 caps[b'obsmarkers'] = supportedformat
1626 if allowpushback:
1626 if allowpushback:
1627 caps[b'pushback'] = ()
1627 caps[b'pushback'] = ()
1628 cpmode = repo.ui.config(b'server', b'concurrent-push-mode')
1628 cpmode = repo.ui.config(b'server', b'concurrent-push-mode')
1629 if cpmode == b'check-related':
1629 if cpmode == b'check-related':
1630 caps[b'checkheads'] = (b'related',)
1630 caps[b'checkheads'] = (b'related',)
1631 if b'phases' in repo.ui.configlist(b'devel', b'legacy.exchange'):
1631 if b'phases' in repo.ui.configlist(b'devel', b'legacy.exchange'):
1632 caps.pop(b'phases')
1632 caps.pop(b'phases')
1633
1633
1634 # Don't advertise stream clone support in server mode if not configured.
1634 # Don't advertise stream clone support in server mode if not configured.
1635 if role == b'server':
1635 if role == b'server':
1636 streamsupported = repo.ui.configbool(
1636 streamsupported = repo.ui.configbool(
1637 b'server', b'uncompressed', untrusted=True
1637 b'server', b'uncompressed', untrusted=True
1638 )
1638 )
1639 featuresupported = repo.ui.configbool(b'server', b'bundle2.stream')
1639 featuresupported = repo.ui.configbool(b'server', b'bundle2.stream')
1640
1640
1641 if not streamsupported or not featuresupported:
1641 if not streamsupported or not featuresupported:
1642 caps.pop(b'stream')
1642 caps.pop(b'stream')
1643 # Else always advertise support on client, because payload support
1643 # Else always advertise support on client, because payload support
1644 # should always be advertised.
1644 # should always be advertised.
1645
1645
1646 return caps
1646 return caps
1647
1647
1648
1648
1649 def bundle2caps(remote):
1649 def bundle2caps(remote):
1650 """return the bundle capabilities of a peer as dict"""
1650 """return the bundle capabilities of a peer as dict"""
1651 raw = remote.capable(b'bundle2')
1651 raw = remote.capable(b'bundle2')
1652 if not raw and raw != b'':
1652 if not raw and raw != b'':
1653 return {}
1653 return {}
1654 capsblob = urlreq.unquote(remote.capable(b'bundle2'))
1654 capsblob = urlreq.unquote(remote.capable(b'bundle2'))
1655 return decodecaps(capsblob)
1655 return decodecaps(capsblob)
1656
1656
1657
1657
1658 def obsmarkersversion(caps):
1658 def obsmarkersversion(caps):
1659 """extract the list of supported obsmarkers versions from a bundle2caps dict"""
1659 """extract the list of supported obsmarkers versions from a bundle2caps dict"""
1660 obscaps = caps.get(b'obsmarkers', ())
1660 obscaps = caps.get(b'obsmarkers', ())
1661 return [int(c[1:]) for c in obscaps if c.startswith(b'V')]
1661 return [int(c[1:]) for c in obscaps if c.startswith(b'V')]
1662
1662
1663
1663
1664 def writenewbundle(
1664 def writenewbundle(
1665 ui,
1665 ui,
1666 repo,
1666 repo,
1667 source,
1667 source,
1668 filename,
1668 filename,
1669 bundletype,
1669 bundletype,
1670 outgoing,
1670 outgoing,
1671 opts,
1671 opts,
1672 vfs=None,
1672 vfs=None,
1673 compression=None,
1673 compression=None,
1674 compopts=None,
1674 compopts=None,
1675 ):
1675 ):
1676 if bundletype.startswith(b'HG10'):
1676 if bundletype.startswith(b'HG10'):
1677 cg = changegroup.makechangegroup(repo, outgoing, b'01', source)
1677 cg = changegroup.makechangegroup(repo, outgoing, b'01', source)
1678 return writebundle(
1678 return writebundle(
1679 ui,
1679 ui,
1680 cg,
1680 cg,
1681 filename,
1681 filename,
1682 bundletype,
1682 bundletype,
1683 vfs=vfs,
1683 vfs=vfs,
1684 compression=compression,
1684 compression=compression,
1685 compopts=compopts,
1685 compopts=compopts,
1686 )
1686 )
1687 elif not bundletype.startswith(b'HG20'):
1687 elif not bundletype.startswith(b'HG20'):
1688 raise error.ProgrammingError(b'unknown bundle type: %s' % bundletype)
1688 raise error.ProgrammingError(b'unknown bundle type: %s' % bundletype)
1689
1689
1690 caps = {}
1690 caps = {}
1691 if b'obsolescence' in opts:
1691 if b'obsolescence' in opts:
1692 caps[b'obsmarkers'] = (b'V1',)
1692 caps[b'obsmarkers'] = (b'V1',)
1693 bundle = bundle20(ui, caps)
1693 bundle = bundle20(ui, caps)
1694 bundle.setcompression(compression, compopts)
1694 bundle.setcompression(compression, compopts)
1695 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1695 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1696 chunkiter = bundle.getchunks()
1696 chunkiter = bundle.getchunks()
1697
1697
1698 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1698 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1699
1699
1700
1700
1701 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1701 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1702 # We should eventually reconcile this logic with the one behind
1702 # We should eventually reconcile this logic with the one behind
1703 # 'exchange.getbundle2partsgenerator'.
1703 # 'exchange.getbundle2partsgenerator'.
1704 #
1704 #
1705 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1705 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1706 # different right now. So we keep them separated for now for the sake of
1706 # different right now. So we keep them separated for now for the sake of
1707 # simplicity.
1707 # simplicity.
1708
1708
1709 # we might not always want a changegroup in such bundle, for example in
1709 # we might not always want a changegroup in such bundle, for example in
1710 # stream bundles
1710 # stream bundles
1711 if opts.get(b'changegroup', True):
1711 if opts.get(b'changegroup', True):
1712 cgversion = opts.get(b'cg.version')
1712 cgversion = opts.get(b'cg.version')
1713 if cgversion is None:
1713 if cgversion is None:
1714 cgversion = changegroup.safeversion(repo)
1714 cgversion = changegroup.safeversion(repo)
1715 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1715 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1716 part = bundler.newpart(b'changegroup', data=cg.getchunks())
1716 part = bundler.newpart(b'changegroup', data=cg.getchunks())
1717 part.addparam(b'version', cg.version)
1717 part.addparam(b'version', cg.version)
1718 if b'clcount' in cg.extras:
1718 if b'clcount' in cg.extras:
1719 part.addparam(
1719 part.addparam(
1720 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1720 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1721 )
1721 )
1722 if opts.get(b'phases') and repo.revs(
1722 if opts.get(b'phases') and repo.revs(
1723 b'%ln and secret()', outgoing.ancestorsof
1723 b'%ln and secret()', outgoing.ancestorsof
1724 ):
1724 ):
1725 part.addparam(
1725 part.addparam(
1726 b'targetphase', b'%d' % phases.secret, mandatory=False
1726 b'targetphase', b'%d' % phases.secret, mandatory=False
1727 )
1727 )
1728 if b'exp-sidedata-flag' in repo.requirements:
1728 if b'exp-sidedata-flag' in repo.requirements:
1729 part.addparam(b'exp-sidedata', b'1')
1729 part.addparam(b'exp-sidedata', b'1')
1730
1730
1731 if opts.get(b'streamv2', False):
1731 if opts.get(b'streamv2', False):
1732 addpartbundlestream2(bundler, repo, stream=True)
1732 addpartbundlestream2(bundler, repo, stream=True)
1733
1733
1734 if opts.get(b'tagsfnodescache', True):
1734 if opts.get(b'tagsfnodescache', True):
1735 addparttagsfnodescache(repo, bundler, outgoing)
1735 addparttagsfnodescache(repo, bundler, outgoing)
1736
1736
1737 if opts.get(b'revbranchcache', True):
1737 if opts.get(b'revbranchcache', True):
1738 addpartrevbranchcache(repo, bundler, outgoing)
1738 addpartrevbranchcache(repo, bundler, outgoing)
1739
1739
1740 if opts.get(b'obsolescence', False):
1740 if opts.get(b'obsolescence', False):
1741 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1741 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1742 buildobsmarkerspart(
1742 buildobsmarkerspart(
1743 bundler,
1743 bundler,
1744 obsmarkers,
1744 obsmarkers,
1745 mandatory=opts.get(b'obsolescence-mandatory', True),
1745 mandatory=opts.get(b'obsolescence-mandatory', True),
1746 )
1746 )
1747
1747
1748 if opts.get(b'phases', False):
1748 if opts.get(b'phases', False):
1749 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1749 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1750 phasedata = phases.binaryencode(headsbyphase)
1750 phasedata = phases.binaryencode(headsbyphase)
1751 bundler.newpart(b'phase-heads', data=phasedata)
1751 bundler.newpart(b'phase-heads', data=phasedata)
1752
1752
1753
1753
1754 def addparttagsfnodescache(repo, bundler, outgoing):
1754 def addparttagsfnodescache(repo, bundler, outgoing):
1755 # we include the tags fnode cache for the bundle changeset
1755 # we include the tags fnode cache for the bundle changeset
1756 # (as an optional parts)
1756 # (as an optional parts)
1757 cache = tags.hgtagsfnodescache(repo.unfiltered())
1757 cache = tags.hgtagsfnodescache(repo.unfiltered())
1758 chunks = []
1758 chunks = []
1759
1759
1760 # .hgtags fnodes are only relevant for head changesets. While we could
1760 # .hgtags fnodes are only relevant for head changesets. While we could
1761 # transfer values for all known nodes, there will likely be little to
1761 # transfer values for all known nodes, there will likely be little to
1762 # no benefit.
1762 # no benefit.
1763 #
1763 #
1764 # We don't bother using a generator to produce output data because
1764 # We don't bother using a generator to produce output data because
1765 # a) we only have 40 bytes per head and even esoteric numbers of heads
1765 # a) we only have 40 bytes per head and even esoteric numbers of heads
1766 # consume little memory (1M heads is 40MB) b) we don't want to send the
1766 # consume little memory (1M heads is 40MB) b) we don't want to send the
1767 # part if we don't have entries and knowing if we have entries requires
1767 # part if we don't have entries and knowing if we have entries requires
1768 # cache lookups.
1768 # cache lookups.
1769 for node in outgoing.ancestorsof:
1769 for node in outgoing.ancestorsof:
1770 # Don't compute missing, as this may slow down serving.
1770 # Don't compute missing, as this may slow down serving.
1771 fnode = cache.getfnode(node, computemissing=False)
1771 fnode = cache.getfnode(node, computemissing=False)
1772 if fnode is not None:
1772 if fnode is not None:
1773 chunks.extend([node, fnode])
1773 chunks.extend([node, fnode])
1774
1774
1775 if chunks:
1775 if chunks:
1776 bundler.newpart(b'hgtagsfnodes', data=b''.join(chunks))
1776 bundler.newpart(b'hgtagsfnodes', data=b''.join(chunks))
1777
1777
1778
1778
1779 def addpartrevbranchcache(repo, bundler, outgoing):
1779 def addpartrevbranchcache(repo, bundler, outgoing):
1780 # we include the rev branch cache for the bundle changeset
1780 # we include the rev branch cache for the bundle changeset
1781 # (as an optional parts)
1781 # (as an optional parts)
1782 cache = repo.revbranchcache()
1782 cache = repo.revbranchcache()
1783 cl = repo.unfiltered().changelog
1783 cl = repo.unfiltered().changelog
1784 branchesdata = collections.defaultdict(lambda: (set(), set()))
1784 branchesdata = collections.defaultdict(lambda: (set(), set()))
1785 for node in outgoing.missing:
1785 for node in outgoing.missing:
1786 branch, close = cache.branchinfo(cl.rev(node))
1786 branch, close = cache.branchinfo(cl.rev(node))
1787 branchesdata[branch][close].add(node)
1787 branchesdata[branch][close].add(node)
1788
1788
1789 def generate():
1789 def generate():
1790 for branch, (nodes, closed) in sorted(branchesdata.items()):
1790 for branch, (nodes, closed) in sorted(branchesdata.items()):
1791 utf8branch = encoding.fromlocal(branch)
1791 utf8branch = encoding.fromlocal(branch)
1792 yield rbcstruct.pack(len(utf8branch), len(nodes), len(closed))
1792 yield rbcstruct.pack(len(utf8branch), len(nodes), len(closed))
1793 yield utf8branch
1793 yield utf8branch
1794 for n in sorted(nodes):
1794 for n in sorted(nodes):
1795 yield n
1795 yield n
1796 for n in sorted(closed):
1796 for n in sorted(closed):
1797 yield n
1797 yield n
1798
1798
1799 bundler.newpart(b'cache:rev-branch-cache', data=generate(), mandatory=False)
1799 bundler.newpart(b'cache:rev-branch-cache', data=generate(), mandatory=False)
1800
1800
1801
1801
1802 def _formatrequirementsspec(requirements):
1802 def _formatrequirementsspec(requirements):
1803 requirements = [req for req in requirements if req != b"shared"]
1803 requirements = [req for req in requirements if req != b"shared"]
1804 return urlreq.quote(b','.join(sorted(requirements)))
1804 return urlreq.quote(b','.join(sorted(requirements)))
1805
1805
1806
1806
1807 def _formatrequirementsparams(requirements):
1807 def _formatrequirementsparams(requirements):
1808 requirements = _formatrequirementsspec(requirements)
1808 requirements = _formatrequirementsspec(requirements)
1809 params = b"%s%s" % (urlreq.quote(b"requirements="), requirements)
1809 params = b"%s%s" % (urlreq.quote(b"requirements="), requirements)
1810 return params
1810 return params
1811
1811
1812
1812
1813 def addpartbundlestream2(bundler, repo, **kwargs):
1813 def addpartbundlestream2(bundler, repo, **kwargs):
1814 if not kwargs.get('stream', False):
1814 if not kwargs.get('stream', False):
1815 return
1815 return
1816
1816
1817 if not streamclone.allowservergeneration(repo):
1817 if not streamclone.allowservergeneration(repo):
1818 raise error.Abort(
1818 raise error.Abort(
1819 _(
1819 _(
1820 b'stream data requested but server does not allow '
1820 b'stream data requested but server does not allow '
1821 b'this feature'
1821 b'this feature'
1822 ),
1822 ),
1823 hint=_(
1823 hint=_(
1824 b'well-behaved clients should not be '
1824 b'well-behaved clients should not be '
1825 b'requesting stream data from servers not '
1825 b'requesting stream data from servers not '
1826 b'advertising it; the client may be buggy'
1826 b'advertising it; the client may be buggy'
1827 ),
1827 ),
1828 )
1828 )
1829
1829
1830 # Stream clones don't compress well. And compression undermines a
1830 # Stream clones don't compress well. And compression undermines a
1831 # goal of stream clones, which is to be fast. Communicate the desire
1831 # goal of stream clones, which is to be fast. Communicate the desire
1832 # to avoid compression to consumers of the bundle.
1832 # to avoid compression to consumers of the bundle.
1833 bundler.prefercompressed = False
1833 bundler.prefercompressed = False
1834
1834
1835 # get the includes and excludes
1835 # get the includes and excludes
1836 includepats = kwargs.get('includepats')
1836 includepats = kwargs.get('includepats')
1837 excludepats = kwargs.get('excludepats')
1837 excludepats = kwargs.get('excludepats')
1838
1838
1839 narrowstream = repo.ui.configbool(
1839 narrowstream = repo.ui.configbool(
1840 b'experimental', b'server.stream-narrow-clones'
1840 b'experimental', b'server.stream-narrow-clones'
1841 )
1841 )
1842
1842
1843 if (includepats or excludepats) and not narrowstream:
1843 if (includepats or excludepats) and not narrowstream:
1844 raise error.Abort(_(b'server does not support narrow stream clones'))
1844 raise error.Abort(_(b'server does not support narrow stream clones'))
1845
1845
1846 includeobsmarkers = False
1846 includeobsmarkers = False
1847 if repo.obsstore:
1847 if repo.obsstore:
1848 remoteversions = obsmarkersversion(bundler.capabilities)
1848 remoteversions = obsmarkersversion(bundler.capabilities)
1849 if not remoteversions:
1849 if not remoteversions:
1850 raise error.Abort(
1850 raise error.Abort(
1851 _(
1851 _(
1852 b'server has obsolescence markers, but client '
1852 b'server has obsolescence markers, but client '
1853 b'cannot receive them via stream clone'
1853 b'cannot receive them via stream clone'
1854 )
1854 )
1855 )
1855 )
1856 elif repo.obsstore._version in remoteversions:
1856 elif repo.obsstore._version in remoteversions:
1857 includeobsmarkers = True
1857 includeobsmarkers = True
1858
1858
1859 filecount, bytecount, it = streamclone.generatev2(
1859 filecount, bytecount, it = streamclone.generatev2(
1860 repo, includepats, excludepats, includeobsmarkers
1860 repo, includepats, excludepats, includeobsmarkers
1861 )
1861 )
1862 requirements = _formatrequirementsspec(repo.requirements)
1862 requirements = _formatrequirementsspec(repo.requirements)
1863 part = bundler.newpart(b'stream2', data=it)
1863 part = bundler.newpart(b'stream2', data=it)
1864 part.addparam(b'bytecount', b'%d' % bytecount, mandatory=True)
1864 part.addparam(b'bytecount', b'%d' % bytecount, mandatory=True)
1865 part.addparam(b'filecount', b'%d' % filecount, mandatory=True)
1865 part.addparam(b'filecount', b'%d' % filecount, mandatory=True)
1866 part.addparam(b'requirements', requirements, mandatory=True)
1866 part.addparam(b'requirements', requirements, mandatory=True)
1867
1867
1868
1868
1869 def buildobsmarkerspart(bundler, markers, mandatory=True):
1869 def buildobsmarkerspart(bundler, markers, mandatory=True):
1870 """add an obsmarker part to the bundler with <markers>
1870 """add an obsmarker part to the bundler with <markers>
1871
1871
1872 No part is created if markers is empty.
1872 No part is created if markers is empty.
1873 Raises ValueError if the bundler doesn't support any known obsmarker format.
1873 Raises ValueError if the bundler doesn't support any known obsmarker format.
1874 """
1874 """
1875 if not markers:
1875 if not markers:
1876 return None
1876 return None
1877
1877
1878 remoteversions = obsmarkersversion(bundler.capabilities)
1878 remoteversions = obsmarkersversion(bundler.capabilities)
1879 version = obsolete.commonversion(remoteversions)
1879 version = obsolete.commonversion(remoteversions)
1880 if version is None:
1880 if version is None:
1881 raise ValueError(b'bundler does not support common obsmarker format')
1881 raise ValueError(b'bundler does not support common obsmarker format')
1882 stream = obsolete.encodemarkers(markers, True, version=version)
1882 stream = obsolete.encodemarkers(markers, True, version=version)
1883 return bundler.newpart(b'obsmarkers', data=stream, mandatory=mandatory)
1883 return bundler.newpart(b'obsmarkers', data=stream, mandatory=mandatory)
1884
1884
1885
1885
1886 def writebundle(
1886 def writebundle(
1887 ui, cg, filename, bundletype, vfs=None, compression=None, compopts=None
1887 ui, cg, filename, bundletype, vfs=None, compression=None, compopts=None
1888 ):
1888 ):
1889 """Write a bundle file and return its filename.
1889 """Write a bundle file and return its filename.
1890
1890
1891 Existing files will not be overwritten.
1891 Existing files will not be overwritten.
1892 If no filename is specified, a temporary file is created.
1892 If no filename is specified, a temporary file is created.
1893 bz2 compression can be turned off.
1893 bz2 compression can be turned off.
1894 The bundle file will be deleted in case of errors.
1894 The bundle file will be deleted in case of errors.
1895 """
1895 """
1896
1896
1897 if bundletype == b"HG20":
1897 if bundletype == b"HG20":
1898 bundle = bundle20(ui)
1898 bundle = bundle20(ui)
1899 bundle.setcompression(compression, compopts)
1899 bundle.setcompression(compression, compopts)
1900 part = bundle.newpart(b'changegroup', data=cg.getchunks())
1900 part = bundle.newpart(b'changegroup', data=cg.getchunks())
1901 part.addparam(b'version', cg.version)
1901 part.addparam(b'version', cg.version)
1902 if b'clcount' in cg.extras:
1902 if b'clcount' in cg.extras:
1903 part.addparam(
1903 part.addparam(
1904 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1904 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1905 )
1905 )
1906 chunkiter = bundle.getchunks()
1906 chunkiter = bundle.getchunks()
1907 else:
1907 else:
1908 # compression argument is only for the bundle2 case
1908 # compression argument is only for the bundle2 case
1909 assert compression is None
1909 assert compression is None
1910 if cg.version != b'01':
1910 if cg.version != b'01':
1911 raise error.Abort(
1911 raise error.Abort(
1912 _(b'old bundle types only supports v1 changegroups')
1912 _(b'old bundle types only supports v1 changegroups')
1913 )
1913 )
1914 header, comp = bundletypes[bundletype]
1914 header, comp = bundletypes[bundletype]
1915 if comp not in util.compengines.supportedbundletypes:
1915 if comp not in util.compengines.supportedbundletypes:
1916 raise error.Abort(_(b'unknown stream compression type: %s') % comp)
1916 raise error.Abort(_(b'unknown stream compression type: %s') % comp)
1917 compengine = util.compengines.forbundletype(comp)
1917 compengine = util.compengines.forbundletype(comp)
1918
1918
1919 def chunkiter():
1919 def chunkiter():
1920 yield header
1920 yield header
1921 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1921 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1922 yield chunk
1922 yield chunk
1923
1923
1924 chunkiter = chunkiter()
1924 chunkiter = chunkiter()
1925
1925
1926 # parse the changegroup data, otherwise we will block
1926 # parse the changegroup data, otherwise we will block
1927 # in case of sshrepo because we don't know the end of the stream
1927 # in case of sshrepo because we don't know the end of the stream
1928 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1928 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1929
1929
1930
1930
1931 def combinechangegroupresults(op):
1931 def combinechangegroupresults(op):
1932 """logic to combine 0 or more addchangegroup results into one"""
1932 """logic to combine 0 or more addchangegroup results into one"""
1933 results = [r.get(b'return', 0) for r in op.records[b'changegroup']]
1933 results = [r.get(b'return', 0) for r in op.records[b'changegroup']]
1934 changedheads = 0
1934 changedheads = 0
1935 result = 1
1935 result = 1
1936 for ret in results:
1936 for ret in results:
1937 # If any changegroup result is 0, return 0
1937 # If any changegroup result is 0, return 0
1938 if ret == 0:
1938 if ret == 0:
1939 result = 0
1939 result = 0
1940 break
1940 break
1941 if ret < -1:
1941 if ret < -1:
1942 changedheads += ret + 1
1942 changedheads += ret + 1
1943 elif ret > 1:
1943 elif ret > 1:
1944 changedheads += ret - 1
1944 changedheads += ret - 1
1945 if changedheads > 0:
1945 if changedheads > 0:
1946 result = 1 + changedheads
1946 result = 1 + changedheads
1947 elif changedheads < 0:
1947 elif changedheads < 0:
1948 result = -1 + changedheads
1948 result = -1 + changedheads
1949 return result
1949 return result
1950
1950
1951
1951
1952 @parthandler(
1952 @parthandler(
1953 b'changegroup',
1953 b'changegroup',
1954 (
1954 (
1955 b'version',
1955 b'version',
1956 b'nbchanges',
1956 b'nbchanges',
1957 b'exp-sidedata',
1957 b'exp-sidedata',
1958 b'treemanifest',
1958 b'treemanifest',
1959 b'targetphase',
1959 b'targetphase',
1960 ),
1960 ),
1961 )
1961 )
1962 def handlechangegroup(op, inpart):
1962 def handlechangegroup(op, inpart):
1963 """apply a changegroup part on the repo
1963 """apply a changegroup part on the repo"""
1964
1965 This is a very early implementation that will massive rework before being
1966 inflicted to any end-user.
1967 """
1968 from . import localrepo
1964 from . import localrepo
1969
1965
1970 tr = op.gettransaction()
1966 tr = op.gettransaction()
1971 unpackerversion = inpart.params.get(b'version', b'01')
1967 unpackerversion = inpart.params.get(b'version', b'01')
1972 # We should raise an appropriate exception here
1968 # We should raise an appropriate exception here
1973 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1969 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1974 # the source and url passed here are overwritten by the one contained in
1970 # the source and url passed here are overwritten by the one contained in
1975 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1971 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1976 nbchangesets = None
1972 nbchangesets = None
1977 if b'nbchanges' in inpart.params:
1973 if b'nbchanges' in inpart.params:
1978 nbchangesets = int(inpart.params.get(b'nbchanges'))
1974 nbchangesets = int(inpart.params.get(b'nbchanges'))
1979 if b'treemanifest' in inpart.params and not scmutil.istreemanifest(op.repo):
1975 if b'treemanifest' in inpart.params and not scmutil.istreemanifest(op.repo):
1980 if len(op.repo.changelog) != 0:
1976 if len(op.repo.changelog) != 0:
1981 raise error.Abort(
1977 raise error.Abort(
1982 _(
1978 _(
1983 b"bundle contains tree manifests, but local repo is "
1979 b"bundle contains tree manifests, but local repo is "
1984 b"non-empty and does not use tree manifests"
1980 b"non-empty and does not use tree manifests"
1985 )
1981 )
1986 )
1982 )
1987 op.repo.requirements.add(requirements.TREEMANIFEST_REQUIREMENT)
1983 op.repo.requirements.add(requirements.TREEMANIFEST_REQUIREMENT)
1988 op.repo.svfs.options = localrepo.resolvestorevfsoptions(
1984 op.repo.svfs.options = localrepo.resolvestorevfsoptions(
1989 op.repo.ui, op.repo.requirements, op.repo.features
1985 op.repo.ui, op.repo.requirements, op.repo.features
1990 )
1986 )
1991 scmutil.writereporequirements(op.repo)
1987 scmutil.writereporequirements(op.repo)
1992
1988
1993 bundlesidedata = bool(b'exp-sidedata' in inpart.params)
1989 bundlesidedata = bool(b'exp-sidedata' in inpart.params)
1994 reposidedata = bool(b'exp-sidedata-flag' in op.repo.requirements)
1990 reposidedata = bool(b'exp-sidedata-flag' in op.repo.requirements)
1995 if reposidedata and not bundlesidedata:
1991 if reposidedata and not bundlesidedata:
1996 msg = b"repository is using sidedata but the bundle source do not"
1992 msg = b"repository is using sidedata but the bundle source do not"
1997 hint = b'this is currently unsupported'
1993 hint = b'this is currently unsupported'
1998 raise error.Abort(msg, hint=hint)
1994 raise error.Abort(msg, hint=hint)
1999
1995
2000 extrakwargs = {}
1996 extrakwargs = {}
2001 targetphase = inpart.params.get(b'targetphase')
1997 targetphase = inpart.params.get(b'targetphase')
2002 if targetphase is not None:
1998 if targetphase is not None:
2003 extrakwargs['targetphase'] = int(targetphase)
1999 extrakwargs['targetphase'] = int(targetphase)
2004 ret = _processchangegroup(
2000 ret = _processchangegroup(
2005 op,
2001 op,
2006 cg,
2002 cg,
2007 tr,
2003 tr,
2008 b'bundle2',
2004 b'bundle2',
2009 b'bundle2',
2005 b'bundle2',
2010 expectedtotal=nbchangesets,
2006 expectedtotal=nbchangesets,
2011 **extrakwargs
2007 **extrakwargs
2012 )
2008 )
2013 if op.reply is not None:
2009 if op.reply is not None:
2014 # This is definitely not the final form of this
2010 # This is definitely not the final form of this
2015 # return. But one need to start somewhere.
2011 # return. But one need to start somewhere.
2016 part = op.reply.newpart(b'reply:changegroup', mandatory=False)
2012 part = op.reply.newpart(b'reply:changegroup', mandatory=False)
2017 part.addparam(
2013 part.addparam(
2018 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2014 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2019 )
2015 )
2020 part.addparam(b'return', b'%i' % ret, mandatory=False)
2016 part.addparam(b'return', b'%i' % ret, mandatory=False)
2021 assert not inpart.read()
2017 assert not inpart.read()
2022
2018
2023
2019
2024 _remotechangegroupparams = tuple(
2020 _remotechangegroupparams = tuple(
2025 [b'url', b'size', b'digests']
2021 [b'url', b'size', b'digests']
2026 + [b'digest:%s' % k for k in util.DIGESTS.keys()]
2022 + [b'digest:%s' % k for k in util.DIGESTS.keys()]
2027 )
2023 )
2028
2024
2029
2025
2030 @parthandler(b'remote-changegroup', _remotechangegroupparams)
2026 @parthandler(b'remote-changegroup', _remotechangegroupparams)
2031 def handleremotechangegroup(op, inpart):
2027 def handleremotechangegroup(op, inpart):
2032 """apply a bundle10 on the repo, given an url and validation information
2028 """apply a bundle10 on the repo, given an url and validation information
2033
2029
2034 All the information about the remote bundle to import are given as
2030 All the information about the remote bundle to import are given as
2035 parameters. The parameters include:
2031 parameters. The parameters include:
2036 - url: the url to the bundle10.
2032 - url: the url to the bundle10.
2037 - size: the bundle10 file size. It is used to validate what was
2033 - size: the bundle10 file size. It is used to validate what was
2038 retrieved by the client matches the server knowledge about the bundle.
2034 retrieved by the client matches the server knowledge about the bundle.
2039 - digests: a space separated list of the digest types provided as
2035 - digests: a space separated list of the digest types provided as
2040 parameters.
2036 parameters.
2041 - digest:<digest-type>: the hexadecimal representation of the digest with
2037 - digest:<digest-type>: the hexadecimal representation of the digest with
2042 that name. Like the size, it is used to validate what was retrieved by
2038 that name. Like the size, it is used to validate what was retrieved by
2043 the client matches what the server knows about the bundle.
2039 the client matches what the server knows about the bundle.
2044
2040
2045 When multiple digest types are given, all of them are checked.
2041 When multiple digest types are given, all of them are checked.
2046 """
2042 """
2047 try:
2043 try:
2048 raw_url = inpart.params[b'url']
2044 raw_url = inpart.params[b'url']
2049 except KeyError:
2045 except KeyError:
2050 raise error.Abort(_(b'remote-changegroup: missing "%s" param') % b'url')
2046 raise error.Abort(_(b'remote-changegroup: missing "%s" param') % b'url')
2051 parsed_url = util.url(raw_url)
2047 parsed_url = util.url(raw_url)
2052 if parsed_url.scheme not in capabilities[b'remote-changegroup']:
2048 if parsed_url.scheme not in capabilities[b'remote-changegroup']:
2053 raise error.Abort(
2049 raise error.Abort(
2054 _(b'remote-changegroup does not support %s urls')
2050 _(b'remote-changegroup does not support %s urls')
2055 % parsed_url.scheme
2051 % parsed_url.scheme
2056 )
2052 )
2057
2053
2058 try:
2054 try:
2059 size = int(inpart.params[b'size'])
2055 size = int(inpart.params[b'size'])
2060 except ValueError:
2056 except ValueError:
2061 raise error.Abort(
2057 raise error.Abort(
2062 _(b'remote-changegroup: invalid value for param "%s"') % b'size'
2058 _(b'remote-changegroup: invalid value for param "%s"') % b'size'
2063 )
2059 )
2064 except KeyError:
2060 except KeyError:
2065 raise error.Abort(
2061 raise error.Abort(
2066 _(b'remote-changegroup: missing "%s" param') % b'size'
2062 _(b'remote-changegroup: missing "%s" param') % b'size'
2067 )
2063 )
2068
2064
2069 digests = {}
2065 digests = {}
2070 for typ in inpart.params.get(b'digests', b'').split():
2066 for typ in inpart.params.get(b'digests', b'').split():
2071 param = b'digest:%s' % typ
2067 param = b'digest:%s' % typ
2072 try:
2068 try:
2073 value = inpart.params[param]
2069 value = inpart.params[param]
2074 except KeyError:
2070 except KeyError:
2075 raise error.Abort(
2071 raise error.Abort(
2076 _(b'remote-changegroup: missing "%s" param') % param
2072 _(b'remote-changegroup: missing "%s" param') % param
2077 )
2073 )
2078 digests[typ] = value
2074 digests[typ] = value
2079
2075
2080 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
2076 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
2081
2077
2082 tr = op.gettransaction()
2078 tr = op.gettransaction()
2083 from . import exchange
2079 from . import exchange
2084
2080
2085 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
2081 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
2086 if not isinstance(cg, changegroup.cg1unpacker):
2082 if not isinstance(cg, changegroup.cg1unpacker):
2087 raise error.Abort(
2083 raise error.Abort(
2088 _(b'%s: not a bundle version 1.0') % util.hidepassword(raw_url)
2084 _(b'%s: not a bundle version 1.0') % util.hidepassword(raw_url)
2089 )
2085 )
2090 ret = _processchangegroup(op, cg, tr, b'bundle2', b'bundle2')
2086 ret = _processchangegroup(op, cg, tr, b'bundle2', b'bundle2')
2091 if op.reply is not None:
2087 if op.reply is not None:
2092 # This is definitely not the final form of this
2088 # This is definitely not the final form of this
2093 # return. But one need to start somewhere.
2089 # return. But one need to start somewhere.
2094 part = op.reply.newpart(b'reply:changegroup')
2090 part = op.reply.newpart(b'reply:changegroup')
2095 part.addparam(
2091 part.addparam(
2096 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2092 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2097 )
2093 )
2098 part.addparam(b'return', b'%i' % ret, mandatory=False)
2094 part.addparam(b'return', b'%i' % ret, mandatory=False)
2099 try:
2095 try:
2100 real_part.validate()
2096 real_part.validate()
2101 except error.Abort as e:
2097 except error.Abort as e:
2102 raise error.Abort(
2098 raise error.Abort(
2103 _(b'bundle at %s is corrupted:\n%s')
2099 _(b'bundle at %s is corrupted:\n%s')
2104 % (util.hidepassword(raw_url), e.message)
2100 % (util.hidepassword(raw_url), e.message)
2105 )
2101 )
2106 assert not inpart.read()
2102 assert not inpart.read()
2107
2103
2108
2104
2109 @parthandler(b'reply:changegroup', (b'return', b'in-reply-to'))
2105 @parthandler(b'reply:changegroup', (b'return', b'in-reply-to'))
2110 def handlereplychangegroup(op, inpart):
2106 def handlereplychangegroup(op, inpart):
2111 ret = int(inpart.params[b'return'])
2107 ret = int(inpart.params[b'return'])
2112 replyto = int(inpart.params[b'in-reply-to'])
2108 replyto = int(inpart.params[b'in-reply-to'])
2113 op.records.add(b'changegroup', {b'return': ret}, replyto)
2109 op.records.add(b'changegroup', {b'return': ret}, replyto)
2114
2110
2115
2111
2116 @parthandler(b'check:bookmarks')
2112 @parthandler(b'check:bookmarks')
2117 def handlecheckbookmarks(op, inpart):
2113 def handlecheckbookmarks(op, inpart):
2118 """check location of bookmarks
2114 """check location of bookmarks
2119
2115
2120 This part is to be used to detect push race regarding bookmark, it
2116 This part is to be used to detect push race regarding bookmark, it
2121 contains binary encoded (bookmark, node) tuple. If the local state does
2117 contains binary encoded (bookmark, node) tuple. If the local state does
2122 not marks the one in the part, a PushRaced exception is raised
2118 not marks the one in the part, a PushRaced exception is raised
2123 """
2119 """
2124 bookdata = bookmarks.binarydecode(inpart)
2120 bookdata = bookmarks.binarydecode(inpart)
2125
2121
2126 msgstandard = (
2122 msgstandard = (
2127 b'remote repository changed while pushing - please try again '
2123 b'remote repository changed while pushing - please try again '
2128 b'(bookmark "%s" move from %s to %s)'
2124 b'(bookmark "%s" move from %s to %s)'
2129 )
2125 )
2130 msgmissing = (
2126 msgmissing = (
2131 b'remote repository changed while pushing - please try again '
2127 b'remote repository changed while pushing - please try again '
2132 b'(bookmark "%s" is missing, expected %s)'
2128 b'(bookmark "%s" is missing, expected %s)'
2133 )
2129 )
2134 msgexist = (
2130 msgexist = (
2135 b'remote repository changed while pushing - please try again '
2131 b'remote repository changed while pushing - please try again '
2136 b'(bookmark "%s" set on %s, expected missing)'
2132 b'(bookmark "%s" set on %s, expected missing)'
2137 )
2133 )
2138 for book, node in bookdata:
2134 for book, node in bookdata:
2139 currentnode = op.repo._bookmarks.get(book)
2135 currentnode = op.repo._bookmarks.get(book)
2140 if currentnode != node:
2136 if currentnode != node:
2141 if node is None:
2137 if node is None:
2142 finalmsg = msgexist % (book, short(currentnode))
2138 finalmsg = msgexist % (book, short(currentnode))
2143 elif currentnode is None:
2139 elif currentnode is None:
2144 finalmsg = msgmissing % (book, short(node))
2140 finalmsg = msgmissing % (book, short(node))
2145 else:
2141 else:
2146 finalmsg = msgstandard % (
2142 finalmsg = msgstandard % (
2147 book,
2143 book,
2148 short(node),
2144 short(node),
2149 short(currentnode),
2145 short(currentnode),
2150 )
2146 )
2151 raise error.PushRaced(finalmsg)
2147 raise error.PushRaced(finalmsg)
2152
2148
2153
2149
2154 @parthandler(b'check:heads')
2150 @parthandler(b'check:heads')
2155 def handlecheckheads(op, inpart):
2151 def handlecheckheads(op, inpart):
2156 """check that head of the repo did not change
2152 """check that head of the repo did not change
2157
2153
2158 This is used to detect a push race when using unbundle.
2154 This is used to detect a push race when using unbundle.
2159 This replaces the "heads" argument of unbundle."""
2155 This replaces the "heads" argument of unbundle."""
2160 h = inpart.read(20)
2156 h = inpart.read(20)
2161 heads = []
2157 heads = []
2162 while len(h) == 20:
2158 while len(h) == 20:
2163 heads.append(h)
2159 heads.append(h)
2164 h = inpart.read(20)
2160 h = inpart.read(20)
2165 assert not h
2161 assert not h
2166 # Trigger a transaction so that we are guaranteed to have the lock now.
2162 # Trigger a transaction so that we are guaranteed to have the lock now.
2167 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2163 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2168 op.gettransaction()
2164 op.gettransaction()
2169 if sorted(heads) != sorted(op.repo.heads()):
2165 if sorted(heads) != sorted(op.repo.heads()):
2170 raise error.PushRaced(
2166 raise error.PushRaced(
2171 b'remote repository changed while pushing - please try again'
2167 b'remote repository changed while pushing - please try again'
2172 )
2168 )
2173
2169
2174
2170
2175 @parthandler(b'check:updated-heads')
2171 @parthandler(b'check:updated-heads')
2176 def handlecheckupdatedheads(op, inpart):
2172 def handlecheckupdatedheads(op, inpart):
2177 """check for race on the heads touched by a push
2173 """check for race on the heads touched by a push
2178
2174
2179 This is similar to 'check:heads' but focus on the heads actually updated
2175 This is similar to 'check:heads' but focus on the heads actually updated
2180 during the push. If other activities happen on unrelated heads, it is
2176 during the push. If other activities happen on unrelated heads, it is
2181 ignored.
2177 ignored.
2182
2178
2183 This allow server with high traffic to avoid push contention as long as
2179 This allow server with high traffic to avoid push contention as long as
2184 unrelated parts of the graph are involved."""
2180 unrelated parts of the graph are involved."""
2185 h = inpart.read(20)
2181 h = inpart.read(20)
2186 heads = []
2182 heads = []
2187 while len(h) == 20:
2183 while len(h) == 20:
2188 heads.append(h)
2184 heads.append(h)
2189 h = inpart.read(20)
2185 h = inpart.read(20)
2190 assert not h
2186 assert not h
2191 # trigger a transaction so that we are guaranteed to have the lock now.
2187 # trigger a transaction so that we are guaranteed to have the lock now.
2192 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2188 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2193 op.gettransaction()
2189 op.gettransaction()
2194
2190
2195 currentheads = set()
2191 currentheads = set()
2196 for ls in op.repo.branchmap().iterheads():
2192 for ls in op.repo.branchmap().iterheads():
2197 currentheads.update(ls)
2193 currentheads.update(ls)
2198
2194
2199 for h in heads:
2195 for h in heads:
2200 if h not in currentheads:
2196 if h not in currentheads:
2201 raise error.PushRaced(
2197 raise error.PushRaced(
2202 b'remote repository changed while pushing - '
2198 b'remote repository changed while pushing - '
2203 b'please try again'
2199 b'please try again'
2204 )
2200 )
2205
2201
2206
2202
2207 @parthandler(b'check:phases')
2203 @parthandler(b'check:phases')
2208 def handlecheckphases(op, inpart):
2204 def handlecheckphases(op, inpart):
2209 """check that phase boundaries of the repository did not change
2205 """check that phase boundaries of the repository did not change
2210
2206
2211 This is used to detect a push race.
2207 This is used to detect a push race.
2212 """
2208 """
2213 phasetonodes = phases.binarydecode(inpart)
2209 phasetonodes = phases.binarydecode(inpart)
2214 unfi = op.repo.unfiltered()
2210 unfi = op.repo.unfiltered()
2215 cl = unfi.changelog
2211 cl = unfi.changelog
2216 phasecache = unfi._phasecache
2212 phasecache = unfi._phasecache
2217 msg = (
2213 msg = (
2218 b'remote repository changed while pushing - please try again '
2214 b'remote repository changed while pushing - please try again '
2219 b'(%s is %s expected %s)'
2215 b'(%s is %s expected %s)'
2220 )
2216 )
2221 for expectedphase, nodes in pycompat.iteritems(phasetonodes):
2217 for expectedphase, nodes in pycompat.iteritems(phasetonodes):
2222 for n in nodes:
2218 for n in nodes:
2223 actualphase = phasecache.phase(unfi, cl.rev(n))
2219 actualphase = phasecache.phase(unfi, cl.rev(n))
2224 if actualphase != expectedphase:
2220 if actualphase != expectedphase:
2225 finalmsg = msg % (
2221 finalmsg = msg % (
2226 short(n),
2222 short(n),
2227 phases.phasenames[actualphase],
2223 phases.phasenames[actualphase],
2228 phases.phasenames[expectedphase],
2224 phases.phasenames[expectedphase],
2229 )
2225 )
2230 raise error.PushRaced(finalmsg)
2226 raise error.PushRaced(finalmsg)
2231
2227
2232
2228
2233 @parthandler(b'output')
2229 @parthandler(b'output')
2234 def handleoutput(op, inpart):
2230 def handleoutput(op, inpart):
2235 """forward output captured on the server to the client"""
2231 """forward output captured on the server to the client"""
2236 for line in inpart.read().splitlines():
2232 for line in inpart.read().splitlines():
2237 op.ui.status(_(b'remote: %s\n') % line)
2233 op.ui.status(_(b'remote: %s\n') % line)
2238
2234
2239
2235
2240 @parthandler(b'replycaps')
2236 @parthandler(b'replycaps')
2241 def handlereplycaps(op, inpart):
2237 def handlereplycaps(op, inpart):
2242 """Notify that a reply bundle should be created
2238 """Notify that a reply bundle should be created
2243
2239
2244 The payload contains the capabilities information for the reply"""
2240 The payload contains the capabilities information for the reply"""
2245 caps = decodecaps(inpart.read())
2241 caps = decodecaps(inpart.read())
2246 if op.reply is None:
2242 if op.reply is None:
2247 op.reply = bundle20(op.ui, caps)
2243 op.reply = bundle20(op.ui, caps)
2248
2244
2249
2245
2250 class AbortFromPart(error.Abort):
2246 class AbortFromPart(error.Abort):
2251 """Sub-class of Abort that denotes an error from a bundle2 part."""
2247 """Sub-class of Abort that denotes an error from a bundle2 part."""
2252
2248
2253
2249
2254 @parthandler(b'error:abort', (b'message', b'hint'))
2250 @parthandler(b'error:abort', (b'message', b'hint'))
2255 def handleerrorabort(op, inpart):
2251 def handleerrorabort(op, inpart):
2256 """Used to transmit abort error over the wire"""
2252 """Used to transmit abort error over the wire"""
2257 raise AbortFromPart(
2253 raise AbortFromPart(
2258 inpart.params[b'message'], hint=inpart.params.get(b'hint')
2254 inpart.params[b'message'], hint=inpart.params.get(b'hint')
2259 )
2255 )
2260
2256
2261
2257
2262 @parthandler(
2258 @parthandler(
2263 b'error:pushkey',
2259 b'error:pushkey',
2264 (b'namespace', b'key', b'new', b'old', b'ret', b'in-reply-to'),
2260 (b'namespace', b'key', b'new', b'old', b'ret', b'in-reply-to'),
2265 )
2261 )
2266 def handleerrorpushkey(op, inpart):
2262 def handleerrorpushkey(op, inpart):
2267 """Used to transmit failure of a mandatory pushkey over the wire"""
2263 """Used to transmit failure of a mandatory pushkey over the wire"""
2268 kwargs = {}
2264 kwargs = {}
2269 for name in (b'namespace', b'key', b'new', b'old', b'ret'):
2265 for name in (b'namespace', b'key', b'new', b'old', b'ret'):
2270 value = inpart.params.get(name)
2266 value = inpart.params.get(name)
2271 if value is not None:
2267 if value is not None:
2272 kwargs[name] = value
2268 kwargs[name] = value
2273 raise error.PushkeyFailed(
2269 raise error.PushkeyFailed(
2274 inpart.params[b'in-reply-to'], **pycompat.strkwargs(kwargs)
2270 inpart.params[b'in-reply-to'], **pycompat.strkwargs(kwargs)
2275 )
2271 )
2276
2272
2277
2273
2278 @parthandler(b'error:unsupportedcontent', (b'parttype', b'params'))
2274 @parthandler(b'error:unsupportedcontent', (b'parttype', b'params'))
2279 def handleerrorunsupportedcontent(op, inpart):
2275 def handleerrorunsupportedcontent(op, inpart):
2280 """Used to transmit unknown content error over the wire"""
2276 """Used to transmit unknown content error over the wire"""
2281 kwargs = {}
2277 kwargs = {}
2282 parttype = inpart.params.get(b'parttype')
2278 parttype = inpart.params.get(b'parttype')
2283 if parttype is not None:
2279 if parttype is not None:
2284 kwargs[b'parttype'] = parttype
2280 kwargs[b'parttype'] = parttype
2285 params = inpart.params.get(b'params')
2281 params = inpart.params.get(b'params')
2286 if params is not None:
2282 if params is not None:
2287 kwargs[b'params'] = params.split(b'\0')
2283 kwargs[b'params'] = params.split(b'\0')
2288
2284
2289 raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
2285 raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
2290
2286
2291
2287
2292 @parthandler(b'error:pushraced', (b'message',))
2288 @parthandler(b'error:pushraced', (b'message',))
2293 def handleerrorpushraced(op, inpart):
2289 def handleerrorpushraced(op, inpart):
2294 """Used to transmit push race error over the wire"""
2290 """Used to transmit push race error over the wire"""
2295 raise error.ResponseError(_(b'push failed:'), inpart.params[b'message'])
2291 raise error.ResponseError(_(b'push failed:'), inpart.params[b'message'])
2296
2292
2297
2293
2298 @parthandler(b'listkeys', (b'namespace',))
2294 @parthandler(b'listkeys', (b'namespace',))
2299 def handlelistkeys(op, inpart):
2295 def handlelistkeys(op, inpart):
2300 """retrieve pushkey namespace content stored in a bundle2"""
2296 """retrieve pushkey namespace content stored in a bundle2"""
2301 namespace = inpart.params[b'namespace']
2297 namespace = inpart.params[b'namespace']
2302 r = pushkey.decodekeys(inpart.read())
2298 r = pushkey.decodekeys(inpart.read())
2303 op.records.add(b'listkeys', (namespace, r))
2299 op.records.add(b'listkeys', (namespace, r))
2304
2300
2305
2301
2306 @parthandler(b'pushkey', (b'namespace', b'key', b'old', b'new'))
2302 @parthandler(b'pushkey', (b'namespace', b'key', b'old', b'new'))
2307 def handlepushkey(op, inpart):
2303 def handlepushkey(op, inpart):
2308 """process a pushkey request"""
2304 """process a pushkey request"""
2309 dec = pushkey.decode
2305 dec = pushkey.decode
2310 namespace = dec(inpart.params[b'namespace'])
2306 namespace = dec(inpart.params[b'namespace'])
2311 key = dec(inpart.params[b'key'])
2307 key = dec(inpart.params[b'key'])
2312 old = dec(inpart.params[b'old'])
2308 old = dec(inpart.params[b'old'])
2313 new = dec(inpart.params[b'new'])
2309 new = dec(inpart.params[b'new'])
2314 # Grab the transaction to ensure that we have the lock before performing the
2310 # Grab the transaction to ensure that we have the lock before performing the
2315 # pushkey.
2311 # pushkey.
2316 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2312 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2317 op.gettransaction()
2313 op.gettransaction()
2318 ret = op.repo.pushkey(namespace, key, old, new)
2314 ret = op.repo.pushkey(namespace, key, old, new)
2319 record = {b'namespace': namespace, b'key': key, b'old': old, b'new': new}
2315 record = {b'namespace': namespace, b'key': key, b'old': old, b'new': new}
2320 op.records.add(b'pushkey', record)
2316 op.records.add(b'pushkey', record)
2321 if op.reply is not None:
2317 if op.reply is not None:
2322 rpart = op.reply.newpart(b'reply:pushkey')
2318 rpart = op.reply.newpart(b'reply:pushkey')
2323 rpart.addparam(
2319 rpart.addparam(
2324 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2320 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2325 )
2321 )
2326 rpart.addparam(b'return', b'%i' % ret, mandatory=False)
2322 rpart.addparam(b'return', b'%i' % ret, mandatory=False)
2327 if inpart.mandatory and not ret:
2323 if inpart.mandatory and not ret:
2328 kwargs = {}
2324 kwargs = {}
2329 for key in (b'namespace', b'key', b'new', b'old', b'ret'):
2325 for key in (b'namespace', b'key', b'new', b'old', b'ret'):
2330 if key in inpart.params:
2326 if key in inpart.params:
2331 kwargs[key] = inpart.params[key]
2327 kwargs[key] = inpart.params[key]
2332 raise error.PushkeyFailed(
2328 raise error.PushkeyFailed(
2333 partid=b'%d' % inpart.id, **pycompat.strkwargs(kwargs)
2329 partid=b'%d' % inpart.id, **pycompat.strkwargs(kwargs)
2334 )
2330 )
2335
2331
2336
2332
2337 @parthandler(b'bookmarks')
2333 @parthandler(b'bookmarks')
2338 def handlebookmark(op, inpart):
2334 def handlebookmark(op, inpart):
2339 """transmit bookmark information
2335 """transmit bookmark information
2340
2336
2341 The part contains binary encoded bookmark information.
2337 The part contains binary encoded bookmark information.
2342
2338
2343 The exact behavior of this part can be controlled by the 'bookmarks' mode
2339 The exact behavior of this part can be controlled by the 'bookmarks' mode
2344 on the bundle operation.
2340 on the bundle operation.
2345
2341
2346 When mode is 'apply' (the default) the bookmark information is applied as
2342 When mode is 'apply' (the default) the bookmark information is applied as
2347 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2343 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2348 issued earlier to check for push races in such update. This behavior is
2344 issued earlier to check for push races in such update. This behavior is
2349 suitable for pushing.
2345 suitable for pushing.
2350
2346
2351 When mode is 'records', the information is recorded into the 'bookmarks'
2347 When mode is 'records', the information is recorded into the 'bookmarks'
2352 records of the bundle operation. This behavior is suitable for pulling.
2348 records of the bundle operation. This behavior is suitable for pulling.
2353 """
2349 """
2354 changes = bookmarks.binarydecode(inpart)
2350 changes = bookmarks.binarydecode(inpart)
2355
2351
2356 pushkeycompat = op.repo.ui.configbool(
2352 pushkeycompat = op.repo.ui.configbool(
2357 b'server', b'bookmarks-pushkey-compat'
2353 b'server', b'bookmarks-pushkey-compat'
2358 )
2354 )
2359 bookmarksmode = op.modes.get(b'bookmarks', b'apply')
2355 bookmarksmode = op.modes.get(b'bookmarks', b'apply')
2360
2356
2361 if bookmarksmode == b'apply':
2357 if bookmarksmode == b'apply':
2362 tr = op.gettransaction()
2358 tr = op.gettransaction()
2363 bookstore = op.repo._bookmarks
2359 bookstore = op.repo._bookmarks
2364 if pushkeycompat:
2360 if pushkeycompat:
2365 allhooks = []
2361 allhooks = []
2366 for book, node in changes:
2362 for book, node in changes:
2367 hookargs = tr.hookargs.copy()
2363 hookargs = tr.hookargs.copy()
2368 hookargs[b'pushkeycompat'] = b'1'
2364 hookargs[b'pushkeycompat'] = b'1'
2369 hookargs[b'namespace'] = b'bookmarks'
2365 hookargs[b'namespace'] = b'bookmarks'
2370 hookargs[b'key'] = book
2366 hookargs[b'key'] = book
2371 hookargs[b'old'] = hex(bookstore.get(book, b''))
2367 hookargs[b'old'] = hex(bookstore.get(book, b''))
2372 hookargs[b'new'] = hex(node if node is not None else b'')
2368 hookargs[b'new'] = hex(node if node is not None else b'')
2373 allhooks.append(hookargs)
2369 allhooks.append(hookargs)
2374
2370
2375 for hookargs in allhooks:
2371 for hookargs in allhooks:
2376 op.repo.hook(
2372 op.repo.hook(
2377 b'prepushkey', throw=True, **pycompat.strkwargs(hookargs)
2373 b'prepushkey', throw=True, **pycompat.strkwargs(hookargs)
2378 )
2374 )
2379
2375
2380 for book, node in changes:
2376 for book, node in changes:
2381 if bookmarks.isdivergent(book):
2377 if bookmarks.isdivergent(book):
2382 msg = _(b'cannot accept divergent bookmark %s!') % book
2378 msg = _(b'cannot accept divergent bookmark %s!') % book
2383 raise error.Abort(msg)
2379 raise error.Abort(msg)
2384
2380
2385 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2381 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2386
2382
2387 if pushkeycompat:
2383 if pushkeycompat:
2388
2384
2389 def runhook(unused_success):
2385 def runhook(unused_success):
2390 for hookargs in allhooks:
2386 for hookargs in allhooks:
2391 op.repo.hook(b'pushkey', **pycompat.strkwargs(hookargs))
2387 op.repo.hook(b'pushkey', **pycompat.strkwargs(hookargs))
2392
2388
2393 op.repo._afterlock(runhook)
2389 op.repo._afterlock(runhook)
2394
2390
2395 elif bookmarksmode == b'records':
2391 elif bookmarksmode == b'records':
2396 for book, node in changes:
2392 for book, node in changes:
2397 record = {b'bookmark': book, b'node': node}
2393 record = {b'bookmark': book, b'node': node}
2398 op.records.add(b'bookmarks', record)
2394 op.records.add(b'bookmarks', record)
2399 else:
2395 else:
2400 raise error.ProgrammingError(
2396 raise error.ProgrammingError(
2401 b'unkown bookmark mode: %s' % bookmarksmode
2397 b'unkown bookmark mode: %s' % bookmarksmode
2402 )
2398 )
2403
2399
2404
2400
2405 @parthandler(b'phase-heads')
2401 @parthandler(b'phase-heads')
2406 def handlephases(op, inpart):
2402 def handlephases(op, inpart):
2407 """apply phases from bundle part to repo"""
2403 """apply phases from bundle part to repo"""
2408 headsbyphase = phases.binarydecode(inpart)
2404 headsbyphase = phases.binarydecode(inpart)
2409 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2405 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2410
2406
2411
2407
2412 @parthandler(b'reply:pushkey', (b'return', b'in-reply-to'))
2408 @parthandler(b'reply:pushkey', (b'return', b'in-reply-to'))
2413 def handlepushkeyreply(op, inpart):
2409 def handlepushkeyreply(op, inpart):
2414 """retrieve the result of a pushkey request"""
2410 """retrieve the result of a pushkey request"""
2415 ret = int(inpart.params[b'return'])
2411 ret = int(inpart.params[b'return'])
2416 partid = int(inpart.params[b'in-reply-to'])
2412 partid = int(inpart.params[b'in-reply-to'])
2417 op.records.add(b'pushkey', {b'return': ret}, partid)
2413 op.records.add(b'pushkey', {b'return': ret}, partid)
2418
2414
2419
2415
2420 @parthandler(b'obsmarkers')
2416 @parthandler(b'obsmarkers')
2421 def handleobsmarker(op, inpart):
2417 def handleobsmarker(op, inpart):
2422 """add a stream of obsmarkers to the repo"""
2418 """add a stream of obsmarkers to the repo"""
2423 tr = op.gettransaction()
2419 tr = op.gettransaction()
2424 markerdata = inpart.read()
2420 markerdata = inpart.read()
2425 if op.ui.config(b'experimental', b'obsmarkers-exchange-debug'):
2421 if op.ui.config(b'experimental', b'obsmarkers-exchange-debug'):
2426 op.ui.writenoi18n(
2422 op.ui.writenoi18n(
2427 b'obsmarker-exchange: %i bytes received\n' % len(markerdata)
2423 b'obsmarker-exchange: %i bytes received\n' % len(markerdata)
2428 )
2424 )
2429 # The mergemarkers call will crash if marker creation is not enabled.
2425 # The mergemarkers call will crash if marker creation is not enabled.
2430 # we want to avoid this if the part is advisory.
2426 # we want to avoid this if the part is advisory.
2431 if not inpart.mandatory and op.repo.obsstore.readonly:
2427 if not inpart.mandatory and op.repo.obsstore.readonly:
2432 op.repo.ui.debug(
2428 op.repo.ui.debug(
2433 b'ignoring obsolescence markers, feature not enabled\n'
2429 b'ignoring obsolescence markers, feature not enabled\n'
2434 )
2430 )
2435 return
2431 return
2436 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2432 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2437 op.repo.invalidatevolatilesets()
2433 op.repo.invalidatevolatilesets()
2438 op.records.add(b'obsmarkers', {b'new': new})
2434 op.records.add(b'obsmarkers', {b'new': new})
2439 if op.reply is not None:
2435 if op.reply is not None:
2440 rpart = op.reply.newpart(b'reply:obsmarkers')
2436 rpart = op.reply.newpart(b'reply:obsmarkers')
2441 rpart.addparam(
2437 rpart.addparam(
2442 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2438 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2443 )
2439 )
2444 rpart.addparam(b'new', b'%i' % new, mandatory=False)
2440 rpart.addparam(b'new', b'%i' % new, mandatory=False)
2445
2441
2446
2442
2447 @parthandler(b'reply:obsmarkers', (b'new', b'in-reply-to'))
2443 @parthandler(b'reply:obsmarkers', (b'new', b'in-reply-to'))
2448 def handleobsmarkerreply(op, inpart):
2444 def handleobsmarkerreply(op, inpart):
2449 """retrieve the result of a pushkey request"""
2445 """retrieve the result of a pushkey request"""
2450 ret = int(inpart.params[b'new'])
2446 ret = int(inpart.params[b'new'])
2451 partid = int(inpart.params[b'in-reply-to'])
2447 partid = int(inpart.params[b'in-reply-to'])
2452 op.records.add(b'obsmarkers', {b'new': ret}, partid)
2448 op.records.add(b'obsmarkers', {b'new': ret}, partid)
2453
2449
2454
2450
2455 @parthandler(b'hgtagsfnodes')
2451 @parthandler(b'hgtagsfnodes')
2456 def handlehgtagsfnodes(op, inpart):
2452 def handlehgtagsfnodes(op, inpart):
2457 """Applies .hgtags fnodes cache entries to the local repo.
2453 """Applies .hgtags fnodes cache entries to the local repo.
2458
2454
2459 Payload is pairs of 20 byte changeset nodes and filenodes.
2455 Payload is pairs of 20 byte changeset nodes and filenodes.
2460 """
2456 """
2461 # Grab the transaction so we ensure that we have the lock at this point.
2457 # Grab the transaction so we ensure that we have the lock at this point.
2462 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2458 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2463 op.gettransaction()
2459 op.gettransaction()
2464 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2460 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2465
2461
2466 count = 0
2462 count = 0
2467 while True:
2463 while True:
2468 node = inpart.read(20)
2464 node = inpart.read(20)
2469 fnode = inpart.read(20)
2465 fnode = inpart.read(20)
2470 if len(node) < 20 or len(fnode) < 20:
2466 if len(node) < 20 or len(fnode) < 20:
2471 op.ui.debug(b'ignoring incomplete received .hgtags fnodes data\n')
2467 op.ui.debug(b'ignoring incomplete received .hgtags fnodes data\n')
2472 break
2468 break
2473 cache.setfnode(node, fnode)
2469 cache.setfnode(node, fnode)
2474 count += 1
2470 count += 1
2475
2471
2476 cache.write()
2472 cache.write()
2477 op.ui.debug(b'applied %i hgtags fnodes cache entries\n' % count)
2473 op.ui.debug(b'applied %i hgtags fnodes cache entries\n' % count)
2478
2474
2479
2475
2480 rbcstruct = struct.Struct(b'>III')
2476 rbcstruct = struct.Struct(b'>III')
2481
2477
2482
2478
2483 @parthandler(b'cache:rev-branch-cache')
2479 @parthandler(b'cache:rev-branch-cache')
2484 def handlerbc(op, inpart):
2480 def handlerbc(op, inpart):
2485 """receive a rev-branch-cache payload and update the local cache
2481 """receive a rev-branch-cache payload and update the local cache
2486
2482
2487 The payload is a series of data related to each branch
2483 The payload is a series of data related to each branch
2488
2484
2489 1) branch name length
2485 1) branch name length
2490 2) number of open heads
2486 2) number of open heads
2491 3) number of closed heads
2487 3) number of closed heads
2492 4) open heads nodes
2488 4) open heads nodes
2493 5) closed heads nodes
2489 5) closed heads nodes
2494 """
2490 """
2495 total = 0
2491 total = 0
2496 rawheader = inpart.read(rbcstruct.size)
2492 rawheader = inpart.read(rbcstruct.size)
2497 cache = op.repo.revbranchcache()
2493 cache = op.repo.revbranchcache()
2498 cl = op.repo.unfiltered().changelog
2494 cl = op.repo.unfiltered().changelog
2499 while rawheader:
2495 while rawheader:
2500 header = rbcstruct.unpack(rawheader)
2496 header = rbcstruct.unpack(rawheader)
2501 total += header[1] + header[2]
2497 total += header[1] + header[2]
2502 utf8branch = inpart.read(header[0])
2498 utf8branch = inpart.read(header[0])
2503 branch = encoding.tolocal(utf8branch)
2499 branch = encoding.tolocal(utf8branch)
2504 for x in pycompat.xrange(header[1]):
2500 for x in pycompat.xrange(header[1]):
2505 node = inpart.read(20)
2501 node = inpart.read(20)
2506 rev = cl.rev(node)
2502 rev = cl.rev(node)
2507 cache.setdata(branch, rev, node, False)
2503 cache.setdata(branch, rev, node, False)
2508 for x in pycompat.xrange(header[2]):
2504 for x in pycompat.xrange(header[2]):
2509 node = inpart.read(20)
2505 node = inpart.read(20)
2510 rev = cl.rev(node)
2506 rev = cl.rev(node)
2511 cache.setdata(branch, rev, node, True)
2507 cache.setdata(branch, rev, node, True)
2512 rawheader = inpart.read(rbcstruct.size)
2508 rawheader = inpart.read(rbcstruct.size)
2513 cache.write()
2509 cache.write()
2514
2510
2515
2511
2516 @parthandler(b'pushvars')
2512 @parthandler(b'pushvars')
2517 def bundle2getvars(op, part):
2513 def bundle2getvars(op, part):
2518 '''unbundle a bundle2 containing shellvars on the server'''
2514 '''unbundle a bundle2 containing shellvars on the server'''
2519 # An option to disable unbundling on server-side for security reasons
2515 # An option to disable unbundling on server-side for security reasons
2520 if op.ui.configbool(b'push', b'pushvars.server'):
2516 if op.ui.configbool(b'push', b'pushvars.server'):
2521 hookargs = {}
2517 hookargs = {}
2522 for key, value in part.advisoryparams:
2518 for key, value in part.advisoryparams:
2523 key = key.upper()
2519 key = key.upper()
2524 # We want pushed variables to have USERVAR_ prepended so we know
2520 # We want pushed variables to have USERVAR_ prepended so we know
2525 # they came from the --pushvar flag.
2521 # they came from the --pushvar flag.
2526 key = b"USERVAR_" + key
2522 key = b"USERVAR_" + key
2527 hookargs[key] = value
2523 hookargs[key] = value
2528 op.addhookargs(hookargs)
2524 op.addhookargs(hookargs)
2529
2525
2530
2526
2531 @parthandler(b'stream2', (b'requirements', b'filecount', b'bytecount'))
2527 @parthandler(b'stream2', (b'requirements', b'filecount', b'bytecount'))
2532 def handlestreamv2bundle(op, part):
2528 def handlestreamv2bundle(op, part):
2533
2529
2534 requirements = urlreq.unquote(part.params[b'requirements']).split(b',')
2530 requirements = urlreq.unquote(part.params[b'requirements']).split(b',')
2535 filecount = int(part.params[b'filecount'])
2531 filecount = int(part.params[b'filecount'])
2536 bytecount = int(part.params[b'bytecount'])
2532 bytecount = int(part.params[b'bytecount'])
2537
2533
2538 repo = op.repo
2534 repo = op.repo
2539 if len(repo):
2535 if len(repo):
2540 msg = _(b'cannot apply stream clone to non empty repository')
2536 msg = _(b'cannot apply stream clone to non empty repository')
2541 raise error.Abort(msg)
2537 raise error.Abort(msg)
2542
2538
2543 repo.ui.debug(b'applying stream bundle\n')
2539 repo.ui.debug(b'applying stream bundle\n')
2544 streamclone.applybundlev2(repo, part, filecount, bytecount, requirements)
2540 streamclone.applybundlev2(repo, part, filecount, bytecount, requirements)
2545
2541
2546
2542
2547 def widen_bundle(
2543 def widen_bundle(
2548 bundler, repo, oldmatcher, newmatcher, common, known, cgversion, ellipses
2544 bundler, repo, oldmatcher, newmatcher, common, known, cgversion, ellipses
2549 ):
2545 ):
2550 """generates bundle2 for widening a narrow clone
2546 """generates bundle2 for widening a narrow clone
2551
2547
2552 bundler is the bundle to which data should be added
2548 bundler is the bundle to which data should be added
2553 repo is the localrepository instance
2549 repo is the localrepository instance
2554 oldmatcher matches what the client already has
2550 oldmatcher matches what the client already has
2555 newmatcher matches what the client needs (including what it already has)
2551 newmatcher matches what the client needs (including what it already has)
2556 common is set of common heads between server and client
2552 common is set of common heads between server and client
2557 known is a set of revs known on the client side (used in ellipses)
2553 known is a set of revs known on the client side (used in ellipses)
2558 cgversion is the changegroup version to send
2554 cgversion is the changegroup version to send
2559 ellipses is boolean value telling whether to send ellipses data or not
2555 ellipses is boolean value telling whether to send ellipses data or not
2560
2556
2561 returns bundle2 of the data required for extending
2557 returns bundle2 of the data required for extending
2562 """
2558 """
2563 commonnodes = set()
2559 commonnodes = set()
2564 cl = repo.changelog
2560 cl = repo.changelog
2565 for r in repo.revs(b"::%ln", common):
2561 for r in repo.revs(b"::%ln", common):
2566 commonnodes.add(cl.node(r))
2562 commonnodes.add(cl.node(r))
2567 if commonnodes:
2563 if commonnodes:
2568 # XXX: we should only send the filelogs (and treemanifest). user
2564 # XXX: we should only send the filelogs (and treemanifest). user
2569 # already has the changelog and manifest
2565 # already has the changelog and manifest
2570 packer = changegroup.getbundler(
2566 packer = changegroup.getbundler(
2571 cgversion,
2567 cgversion,
2572 repo,
2568 repo,
2573 oldmatcher=oldmatcher,
2569 oldmatcher=oldmatcher,
2574 matcher=newmatcher,
2570 matcher=newmatcher,
2575 fullnodes=commonnodes,
2571 fullnodes=commonnodes,
2576 )
2572 )
2577 cgdata = packer.generate(
2573 cgdata = packer.generate(
2578 {nullid},
2574 {nullid},
2579 list(commonnodes),
2575 list(commonnodes),
2580 False,
2576 False,
2581 b'narrow_widen',
2577 b'narrow_widen',
2582 changelog=False,
2578 changelog=False,
2583 )
2579 )
2584
2580
2585 part = bundler.newpart(b'changegroup', data=cgdata)
2581 part = bundler.newpart(b'changegroup', data=cgdata)
2586 part.addparam(b'version', cgversion)
2582 part.addparam(b'version', cgversion)
2587 if scmutil.istreemanifest(repo):
2583 if scmutil.istreemanifest(repo):
2588 part.addparam(b'treemanifest', b'1')
2584 part.addparam(b'treemanifest', b'1')
2589 if b'exp-sidedata-flag' in repo.requirements:
2585 if b'exp-sidedata-flag' in repo.requirements:
2590 part.addparam(b'exp-sidedata', b'1')
2586 part.addparam(b'exp-sidedata', b'1')
2591
2587
2592 return bundler
2588 return bundler
General Comments 0
You need to be logged in to leave comments. Login now