##// END OF EJS Templates
py3: fully fix bundlepart.__repr__ to return str not bytes...
Kyle Lippincott -
r44759:74172a23 stable
parent child Browse files
Show More
@@ -1,2579 +1,2578 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import collections
150 import collections
151 import errno
151 import errno
152 import os
152 import os
153 import re
153 import re
154 import string
154 import string
155 import struct
155 import struct
156 import sys
156 import sys
157
157
158 from .i18n import _
158 from .i18n import _
159 from . import (
159 from . import (
160 bookmarks,
160 bookmarks,
161 changegroup,
161 changegroup,
162 encoding,
162 encoding,
163 error,
163 error,
164 node as nodemod,
164 node as nodemod,
165 obsolete,
165 obsolete,
166 phases,
166 phases,
167 pushkey,
167 pushkey,
168 pycompat,
168 pycompat,
169 streamclone,
169 streamclone,
170 tags,
170 tags,
171 url,
171 url,
172 util,
172 util,
173 )
173 )
174 from .utils import stringutil
174 from .utils import stringutil
175
175
176 urlerr = util.urlerr
176 urlerr = util.urlerr
177 urlreq = util.urlreq
177 urlreq = util.urlreq
178
178
179 _pack = struct.pack
179 _pack = struct.pack
180 _unpack = struct.unpack
180 _unpack = struct.unpack
181
181
182 _fstreamparamsize = b'>i'
182 _fstreamparamsize = b'>i'
183 _fpartheadersize = b'>i'
183 _fpartheadersize = b'>i'
184 _fparttypesize = b'>B'
184 _fparttypesize = b'>B'
185 _fpartid = b'>I'
185 _fpartid = b'>I'
186 _fpayloadsize = b'>i'
186 _fpayloadsize = b'>i'
187 _fpartparamcount = b'>BB'
187 _fpartparamcount = b'>BB'
188
188
189 preferedchunksize = 32768
189 preferedchunksize = 32768
190
190
191 _parttypeforbidden = re.compile(b'[^a-zA-Z0-9_:-]')
191 _parttypeforbidden = re.compile(b'[^a-zA-Z0-9_:-]')
192
192
193
193
194 def outdebug(ui, message):
194 def outdebug(ui, message):
195 """debug regarding output stream (bundling)"""
195 """debug regarding output stream (bundling)"""
196 if ui.configbool(b'devel', b'bundle2.debug'):
196 if ui.configbool(b'devel', b'bundle2.debug'):
197 ui.debug(b'bundle2-output: %s\n' % message)
197 ui.debug(b'bundle2-output: %s\n' % message)
198
198
199
199
200 def indebug(ui, message):
200 def indebug(ui, message):
201 """debug on input stream (unbundling)"""
201 """debug on input stream (unbundling)"""
202 if ui.configbool(b'devel', b'bundle2.debug'):
202 if ui.configbool(b'devel', b'bundle2.debug'):
203 ui.debug(b'bundle2-input: %s\n' % message)
203 ui.debug(b'bundle2-input: %s\n' % message)
204
204
205
205
206 def validateparttype(parttype):
206 def validateparttype(parttype):
207 """raise ValueError if a parttype contains invalid character"""
207 """raise ValueError if a parttype contains invalid character"""
208 if _parttypeforbidden.search(parttype):
208 if _parttypeforbidden.search(parttype):
209 raise ValueError(parttype)
209 raise ValueError(parttype)
210
210
211
211
212 def _makefpartparamsizes(nbparams):
212 def _makefpartparamsizes(nbparams):
213 """return a struct format to read part parameter sizes
213 """return a struct format to read part parameter sizes
214
214
215 The number parameters is variable so we need to build that format
215 The number parameters is variable so we need to build that format
216 dynamically.
216 dynamically.
217 """
217 """
218 return b'>' + (b'BB' * nbparams)
218 return b'>' + (b'BB' * nbparams)
219
219
220
220
221 parthandlermapping = {}
221 parthandlermapping = {}
222
222
223
223
224 def parthandler(parttype, params=()):
224 def parthandler(parttype, params=()):
225 """decorator that register a function as a bundle2 part handler
225 """decorator that register a function as a bundle2 part handler
226
226
227 eg::
227 eg::
228
228
229 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
229 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
230 def myparttypehandler(...):
230 def myparttypehandler(...):
231 '''process a part of type "my part".'''
231 '''process a part of type "my part".'''
232 ...
232 ...
233 """
233 """
234 validateparttype(parttype)
234 validateparttype(parttype)
235
235
236 def _decorator(func):
236 def _decorator(func):
237 lparttype = parttype.lower() # enforce lower case matching.
237 lparttype = parttype.lower() # enforce lower case matching.
238 assert lparttype not in parthandlermapping
238 assert lparttype not in parthandlermapping
239 parthandlermapping[lparttype] = func
239 parthandlermapping[lparttype] = func
240 func.params = frozenset(params)
240 func.params = frozenset(params)
241 return func
241 return func
242
242
243 return _decorator
243 return _decorator
244
244
245
245
246 class unbundlerecords(object):
246 class unbundlerecords(object):
247 """keep record of what happens during and unbundle
247 """keep record of what happens during and unbundle
248
248
249 New records are added using `records.add('cat', obj)`. Where 'cat' is a
249 New records are added using `records.add('cat', obj)`. Where 'cat' is a
250 category of record and obj is an arbitrary object.
250 category of record and obj is an arbitrary object.
251
251
252 `records['cat']` will return all entries of this category 'cat'.
252 `records['cat']` will return all entries of this category 'cat'.
253
253
254 Iterating on the object itself will yield `('category', obj)` tuples
254 Iterating on the object itself will yield `('category', obj)` tuples
255 for all entries.
255 for all entries.
256
256
257 All iterations happens in chronological order.
257 All iterations happens in chronological order.
258 """
258 """
259
259
260 def __init__(self):
260 def __init__(self):
261 self._categories = {}
261 self._categories = {}
262 self._sequences = []
262 self._sequences = []
263 self._replies = {}
263 self._replies = {}
264
264
265 def add(self, category, entry, inreplyto=None):
265 def add(self, category, entry, inreplyto=None):
266 """add a new record of a given category.
266 """add a new record of a given category.
267
267
268 The entry can then be retrieved in the list returned by
268 The entry can then be retrieved in the list returned by
269 self['category']."""
269 self['category']."""
270 self._categories.setdefault(category, []).append(entry)
270 self._categories.setdefault(category, []).append(entry)
271 self._sequences.append((category, entry))
271 self._sequences.append((category, entry))
272 if inreplyto is not None:
272 if inreplyto is not None:
273 self.getreplies(inreplyto).add(category, entry)
273 self.getreplies(inreplyto).add(category, entry)
274
274
275 def getreplies(self, partid):
275 def getreplies(self, partid):
276 """get the records that are replies to a specific part"""
276 """get the records that are replies to a specific part"""
277 return self._replies.setdefault(partid, unbundlerecords())
277 return self._replies.setdefault(partid, unbundlerecords())
278
278
279 def __getitem__(self, cat):
279 def __getitem__(self, cat):
280 return tuple(self._categories.get(cat, ()))
280 return tuple(self._categories.get(cat, ()))
281
281
282 def __iter__(self):
282 def __iter__(self):
283 return iter(self._sequences)
283 return iter(self._sequences)
284
284
285 def __len__(self):
285 def __len__(self):
286 return len(self._sequences)
286 return len(self._sequences)
287
287
288 def __nonzero__(self):
288 def __nonzero__(self):
289 return bool(self._sequences)
289 return bool(self._sequences)
290
290
291 __bool__ = __nonzero__
291 __bool__ = __nonzero__
292
292
293
293
294 class bundleoperation(object):
294 class bundleoperation(object):
295 """an object that represents a single bundling process
295 """an object that represents a single bundling process
296
296
297 Its purpose is to carry unbundle-related objects and states.
297 Its purpose is to carry unbundle-related objects and states.
298
298
299 A new object should be created at the beginning of each bundle processing.
299 A new object should be created at the beginning of each bundle processing.
300 The object is to be returned by the processing function.
300 The object is to be returned by the processing function.
301
301
302 The object has very little content now it will ultimately contain:
302 The object has very little content now it will ultimately contain:
303 * an access to the repo the bundle is applied to,
303 * an access to the repo the bundle is applied to,
304 * a ui object,
304 * a ui object,
305 * a way to retrieve a transaction to add changes to the repo,
305 * a way to retrieve a transaction to add changes to the repo,
306 * a way to record the result of processing each part,
306 * a way to record the result of processing each part,
307 * a way to construct a bundle response when applicable.
307 * a way to construct a bundle response when applicable.
308 """
308 """
309
309
310 def __init__(self, repo, transactiongetter, captureoutput=True, source=b''):
310 def __init__(self, repo, transactiongetter, captureoutput=True, source=b''):
311 self.repo = repo
311 self.repo = repo
312 self.ui = repo.ui
312 self.ui = repo.ui
313 self.records = unbundlerecords()
313 self.records = unbundlerecords()
314 self.reply = None
314 self.reply = None
315 self.captureoutput = captureoutput
315 self.captureoutput = captureoutput
316 self.hookargs = {}
316 self.hookargs = {}
317 self._gettransaction = transactiongetter
317 self._gettransaction = transactiongetter
318 # carries value that can modify part behavior
318 # carries value that can modify part behavior
319 self.modes = {}
319 self.modes = {}
320 self.source = source
320 self.source = source
321
321
322 def gettransaction(self):
322 def gettransaction(self):
323 transaction = self._gettransaction()
323 transaction = self._gettransaction()
324
324
325 if self.hookargs:
325 if self.hookargs:
326 # the ones added to the transaction supercede those added
326 # the ones added to the transaction supercede those added
327 # to the operation.
327 # to the operation.
328 self.hookargs.update(transaction.hookargs)
328 self.hookargs.update(transaction.hookargs)
329 transaction.hookargs = self.hookargs
329 transaction.hookargs = self.hookargs
330
330
331 # mark the hookargs as flushed. further attempts to add to
331 # mark the hookargs as flushed. further attempts to add to
332 # hookargs will result in an abort.
332 # hookargs will result in an abort.
333 self.hookargs = None
333 self.hookargs = None
334
334
335 return transaction
335 return transaction
336
336
337 def addhookargs(self, hookargs):
337 def addhookargs(self, hookargs):
338 if self.hookargs is None:
338 if self.hookargs is None:
339 raise error.ProgrammingError(
339 raise error.ProgrammingError(
340 b'attempted to add hookargs to '
340 b'attempted to add hookargs to '
341 b'operation after transaction started'
341 b'operation after transaction started'
342 )
342 )
343 self.hookargs.update(hookargs)
343 self.hookargs.update(hookargs)
344
344
345
345
346 class TransactionUnavailable(RuntimeError):
346 class TransactionUnavailable(RuntimeError):
347 pass
347 pass
348
348
349
349
350 def _notransaction():
350 def _notransaction():
351 """default method to get a transaction while processing a bundle
351 """default method to get a transaction while processing a bundle
352
352
353 Raise an exception to highlight the fact that no transaction was expected
353 Raise an exception to highlight the fact that no transaction was expected
354 to be created"""
354 to be created"""
355 raise TransactionUnavailable()
355 raise TransactionUnavailable()
356
356
357
357
358 def applybundle(repo, unbundler, tr, source, url=None, **kwargs):
358 def applybundle(repo, unbundler, tr, source, url=None, **kwargs):
359 # transform me into unbundler.apply() as soon as the freeze is lifted
359 # transform me into unbundler.apply() as soon as the freeze is lifted
360 if isinstance(unbundler, unbundle20):
360 if isinstance(unbundler, unbundle20):
361 tr.hookargs[b'bundle2'] = b'1'
361 tr.hookargs[b'bundle2'] = b'1'
362 if source is not None and b'source' not in tr.hookargs:
362 if source is not None and b'source' not in tr.hookargs:
363 tr.hookargs[b'source'] = source
363 tr.hookargs[b'source'] = source
364 if url is not None and b'url' not in tr.hookargs:
364 if url is not None and b'url' not in tr.hookargs:
365 tr.hookargs[b'url'] = url
365 tr.hookargs[b'url'] = url
366 return processbundle(repo, unbundler, lambda: tr, source=source)
366 return processbundle(repo, unbundler, lambda: tr, source=source)
367 else:
367 else:
368 # the transactiongetter won't be used, but we might as well set it
368 # the transactiongetter won't be used, but we might as well set it
369 op = bundleoperation(repo, lambda: tr, source=source)
369 op = bundleoperation(repo, lambda: tr, source=source)
370 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
370 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
371 return op
371 return op
372
372
373
373
374 class partiterator(object):
374 class partiterator(object):
375 def __init__(self, repo, op, unbundler):
375 def __init__(self, repo, op, unbundler):
376 self.repo = repo
376 self.repo = repo
377 self.op = op
377 self.op = op
378 self.unbundler = unbundler
378 self.unbundler = unbundler
379 self.iterator = None
379 self.iterator = None
380 self.count = 0
380 self.count = 0
381 self.current = None
381 self.current = None
382
382
383 def __enter__(self):
383 def __enter__(self):
384 def func():
384 def func():
385 itr = enumerate(self.unbundler.iterparts(), 1)
385 itr = enumerate(self.unbundler.iterparts(), 1)
386 for count, p in itr:
386 for count, p in itr:
387 self.count = count
387 self.count = count
388 self.current = p
388 self.current = p
389 yield p
389 yield p
390 p.consume()
390 p.consume()
391 self.current = None
391 self.current = None
392
392
393 self.iterator = func()
393 self.iterator = func()
394 return self.iterator
394 return self.iterator
395
395
396 def __exit__(self, type, exc, tb):
396 def __exit__(self, type, exc, tb):
397 if not self.iterator:
397 if not self.iterator:
398 return
398 return
399
399
400 # Only gracefully abort in a normal exception situation. User aborts
400 # Only gracefully abort in a normal exception situation. User aborts
401 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
401 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
402 # and should not gracefully cleanup.
402 # and should not gracefully cleanup.
403 if isinstance(exc, Exception):
403 if isinstance(exc, Exception):
404 # Any exceptions seeking to the end of the bundle at this point are
404 # Any exceptions seeking to the end of the bundle at this point are
405 # almost certainly related to the underlying stream being bad.
405 # almost certainly related to the underlying stream being bad.
406 # And, chances are that the exception we're handling is related to
406 # And, chances are that the exception we're handling is related to
407 # getting in that bad state. So, we swallow the seeking error and
407 # getting in that bad state. So, we swallow the seeking error and
408 # re-raise the original error.
408 # re-raise the original error.
409 seekerror = False
409 seekerror = False
410 try:
410 try:
411 if self.current:
411 if self.current:
412 # consume the part content to not corrupt the stream.
412 # consume the part content to not corrupt the stream.
413 self.current.consume()
413 self.current.consume()
414
414
415 for part in self.iterator:
415 for part in self.iterator:
416 # consume the bundle content
416 # consume the bundle content
417 part.consume()
417 part.consume()
418 except Exception:
418 except Exception:
419 seekerror = True
419 seekerror = True
420
420
421 # Small hack to let caller code distinguish exceptions from bundle2
421 # Small hack to let caller code distinguish exceptions from bundle2
422 # processing from processing the old format. This is mostly needed
422 # processing from processing the old format. This is mostly needed
423 # to handle different return codes to unbundle according to the type
423 # to handle different return codes to unbundle according to the type
424 # of bundle. We should probably clean up or drop this return code
424 # of bundle. We should probably clean up or drop this return code
425 # craziness in a future version.
425 # craziness in a future version.
426 exc.duringunbundle2 = True
426 exc.duringunbundle2 = True
427 salvaged = []
427 salvaged = []
428 replycaps = None
428 replycaps = None
429 if self.op.reply is not None:
429 if self.op.reply is not None:
430 salvaged = self.op.reply.salvageoutput()
430 salvaged = self.op.reply.salvageoutput()
431 replycaps = self.op.reply.capabilities
431 replycaps = self.op.reply.capabilities
432 exc._replycaps = replycaps
432 exc._replycaps = replycaps
433 exc._bundle2salvagedoutput = salvaged
433 exc._bundle2salvagedoutput = salvaged
434
434
435 # Re-raising from a variable loses the original stack. So only use
435 # Re-raising from a variable loses the original stack. So only use
436 # that form if we need to.
436 # that form if we need to.
437 if seekerror:
437 if seekerror:
438 raise exc
438 raise exc
439
439
440 self.repo.ui.debug(
440 self.repo.ui.debug(
441 b'bundle2-input-bundle: %i parts total\n' % self.count
441 b'bundle2-input-bundle: %i parts total\n' % self.count
442 )
442 )
443
443
444
444
445 def processbundle(repo, unbundler, transactiongetter=None, op=None, source=b''):
445 def processbundle(repo, unbundler, transactiongetter=None, op=None, source=b''):
446 """This function process a bundle, apply effect to/from a repo
446 """This function process a bundle, apply effect to/from a repo
447
447
448 It iterates over each part then searches for and uses the proper handling
448 It iterates over each part then searches for and uses the proper handling
449 code to process the part. Parts are processed in order.
449 code to process the part. Parts are processed in order.
450
450
451 Unknown Mandatory part will abort the process.
451 Unknown Mandatory part will abort the process.
452
452
453 It is temporarily possible to provide a prebuilt bundleoperation to the
453 It is temporarily possible to provide a prebuilt bundleoperation to the
454 function. This is used to ensure output is properly propagated in case of
454 function. This is used to ensure output is properly propagated in case of
455 an error during the unbundling. This output capturing part will likely be
455 an error during the unbundling. This output capturing part will likely be
456 reworked and this ability will probably go away in the process.
456 reworked and this ability will probably go away in the process.
457 """
457 """
458 if op is None:
458 if op is None:
459 if transactiongetter is None:
459 if transactiongetter is None:
460 transactiongetter = _notransaction
460 transactiongetter = _notransaction
461 op = bundleoperation(repo, transactiongetter, source=source)
461 op = bundleoperation(repo, transactiongetter, source=source)
462 # todo:
462 # todo:
463 # - replace this is a init function soon.
463 # - replace this is a init function soon.
464 # - exception catching
464 # - exception catching
465 unbundler.params
465 unbundler.params
466 if repo.ui.debugflag:
466 if repo.ui.debugflag:
467 msg = [b'bundle2-input-bundle:']
467 msg = [b'bundle2-input-bundle:']
468 if unbundler.params:
468 if unbundler.params:
469 msg.append(b' %i params' % len(unbundler.params))
469 msg.append(b' %i params' % len(unbundler.params))
470 if op._gettransaction is None or op._gettransaction is _notransaction:
470 if op._gettransaction is None or op._gettransaction is _notransaction:
471 msg.append(b' no-transaction')
471 msg.append(b' no-transaction')
472 else:
472 else:
473 msg.append(b' with-transaction')
473 msg.append(b' with-transaction')
474 msg.append(b'\n')
474 msg.append(b'\n')
475 repo.ui.debug(b''.join(msg))
475 repo.ui.debug(b''.join(msg))
476
476
477 processparts(repo, op, unbundler)
477 processparts(repo, op, unbundler)
478
478
479 return op
479 return op
480
480
481
481
482 def processparts(repo, op, unbundler):
482 def processparts(repo, op, unbundler):
483 with partiterator(repo, op, unbundler) as parts:
483 with partiterator(repo, op, unbundler) as parts:
484 for part in parts:
484 for part in parts:
485 _processpart(op, part)
485 _processpart(op, part)
486
486
487
487
488 def _processchangegroup(op, cg, tr, source, url, **kwargs):
488 def _processchangegroup(op, cg, tr, source, url, **kwargs):
489 ret = cg.apply(op.repo, tr, source, url, **kwargs)
489 ret = cg.apply(op.repo, tr, source, url, **kwargs)
490 op.records.add(b'changegroup', {b'return': ret,})
490 op.records.add(b'changegroup', {b'return': ret,})
491 return ret
491 return ret
492
492
493
493
494 def _gethandler(op, part):
494 def _gethandler(op, part):
495 status = b'unknown' # used by debug output
495 status = b'unknown' # used by debug output
496 try:
496 try:
497 handler = parthandlermapping.get(part.type)
497 handler = parthandlermapping.get(part.type)
498 if handler is None:
498 if handler is None:
499 status = b'unsupported-type'
499 status = b'unsupported-type'
500 raise error.BundleUnknownFeatureError(parttype=part.type)
500 raise error.BundleUnknownFeatureError(parttype=part.type)
501 indebug(op.ui, b'found a handler for part %s' % part.type)
501 indebug(op.ui, b'found a handler for part %s' % part.type)
502 unknownparams = part.mandatorykeys - handler.params
502 unknownparams = part.mandatorykeys - handler.params
503 if unknownparams:
503 if unknownparams:
504 unknownparams = list(unknownparams)
504 unknownparams = list(unknownparams)
505 unknownparams.sort()
505 unknownparams.sort()
506 status = b'unsupported-params (%s)' % b', '.join(unknownparams)
506 status = b'unsupported-params (%s)' % b', '.join(unknownparams)
507 raise error.BundleUnknownFeatureError(
507 raise error.BundleUnknownFeatureError(
508 parttype=part.type, params=unknownparams
508 parttype=part.type, params=unknownparams
509 )
509 )
510 status = b'supported'
510 status = b'supported'
511 except error.BundleUnknownFeatureError as exc:
511 except error.BundleUnknownFeatureError as exc:
512 if part.mandatory: # mandatory parts
512 if part.mandatory: # mandatory parts
513 raise
513 raise
514 indebug(op.ui, b'ignoring unsupported advisory part %s' % exc)
514 indebug(op.ui, b'ignoring unsupported advisory part %s' % exc)
515 return # skip to part processing
515 return # skip to part processing
516 finally:
516 finally:
517 if op.ui.debugflag:
517 if op.ui.debugflag:
518 msg = [b'bundle2-input-part: "%s"' % part.type]
518 msg = [b'bundle2-input-part: "%s"' % part.type]
519 if not part.mandatory:
519 if not part.mandatory:
520 msg.append(b' (advisory)')
520 msg.append(b' (advisory)')
521 nbmp = len(part.mandatorykeys)
521 nbmp = len(part.mandatorykeys)
522 nbap = len(part.params) - nbmp
522 nbap = len(part.params) - nbmp
523 if nbmp or nbap:
523 if nbmp or nbap:
524 msg.append(b' (params:')
524 msg.append(b' (params:')
525 if nbmp:
525 if nbmp:
526 msg.append(b' %i mandatory' % nbmp)
526 msg.append(b' %i mandatory' % nbmp)
527 if nbap:
527 if nbap:
528 msg.append(b' %i advisory' % nbmp)
528 msg.append(b' %i advisory' % nbmp)
529 msg.append(b')')
529 msg.append(b')')
530 msg.append(b' %s\n' % status)
530 msg.append(b' %s\n' % status)
531 op.ui.debug(b''.join(msg))
531 op.ui.debug(b''.join(msg))
532
532
533 return handler
533 return handler
534
534
535
535
536 def _processpart(op, part):
536 def _processpart(op, part):
537 """process a single part from a bundle
537 """process a single part from a bundle
538
538
539 The part is guaranteed to have been fully consumed when the function exits
539 The part is guaranteed to have been fully consumed when the function exits
540 (even if an exception is raised)."""
540 (even if an exception is raised)."""
541 handler = _gethandler(op, part)
541 handler = _gethandler(op, part)
542 if handler is None:
542 if handler is None:
543 return
543 return
544
544
545 # handler is called outside the above try block so that we don't
545 # handler is called outside the above try block so that we don't
546 # risk catching KeyErrors from anything other than the
546 # risk catching KeyErrors from anything other than the
547 # parthandlermapping lookup (any KeyError raised by handler()
547 # parthandlermapping lookup (any KeyError raised by handler()
548 # itself represents a defect of a different variety).
548 # itself represents a defect of a different variety).
549 output = None
549 output = None
550 if op.captureoutput and op.reply is not None:
550 if op.captureoutput and op.reply is not None:
551 op.ui.pushbuffer(error=True, subproc=True)
551 op.ui.pushbuffer(error=True, subproc=True)
552 output = b''
552 output = b''
553 try:
553 try:
554 handler(op, part)
554 handler(op, part)
555 finally:
555 finally:
556 if output is not None:
556 if output is not None:
557 output = op.ui.popbuffer()
557 output = op.ui.popbuffer()
558 if output:
558 if output:
559 outpart = op.reply.newpart(b'output', data=output, mandatory=False)
559 outpart = op.reply.newpart(b'output', data=output, mandatory=False)
560 outpart.addparam(
560 outpart.addparam(
561 b'in-reply-to', pycompat.bytestr(part.id), mandatory=False
561 b'in-reply-to', pycompat.bytestr(part.id), mandatory=False
562 )
562 )
563
563
564
564
565 def decodecaps(blob):
565 def decodecaps(blob):
566 """decode a bundle2 caps bytes blob into a dictionary
566 """decode a bundle2 caps bytes blob into a dictionary
567
567
568 The blob is a list of capabilities (one per line)
568 The blob is a list of capabilities (one per line)
569 Capabilities may have values using a line of the form::
569 Capabilities may have values using a line of the form::
570
570
571 capability=value1,value2,value3
571 capability=value1,value2,value3
572
572
573 The values are always a list."""
573 The values are always a list."""
574 caps = {}
574 caps = {}
575 for line in blob.splitlines():
575 for line in blob.splitlines():
576 if not line:
576 if not line:
577 continue
577 continue
578 if b'=' not in line:
578 if b'=' not in line:
579 key, vals = line, ()
579 key, vals = line, ()
580 else:
580 else:
581 key, vals = line.split(b'=', 1)
581 key, vals = line.split(b'=', 1)
582 vals = vals.split(b',')
582 vals = vals.split(b',')
583 key = urlreq.unquote(key)
583 key = urlreq.unquote(key)
584 vals = [urlreq.unquote(v) for v in vals]
584 vals = [urlreq.unquote(v) for v in vals]
585 caps[key] = vals
585 caps[key] = vals
586 return caps
586 return caps
587
587
588
588
589 def encodecaps(caps):
589 def encodecaps(caps):
590 """encode a bundle2 caps dictionary into a bytes blob"""
590 """encode a bundle2 caps dictionary into a bytes blob"""
591 chunks = []
591 chunks = []
592 for ca in sorted(caps):
592 for ca in sorted(caps):
593 vals = caps[ca]
593 vals = caps[ca]
594 ca = urlreq.quote(ca)
594 ca = urlreq.quote(ca)
595 vals = [urlreq.quote(v) for v in vals]
595 vals = [urlreq.quote(v) for v in vals]
596 if vals:
596 if vals:
597 ca = b"%s=%s" % (ca, b','.join(vals))
597 ca = b"%s=%s" % (ca, b','.join(vals))
598 chunks.append(ca)
598 chunks.append(ca)
599 return b'\n'.join(chunks)
599 return b'\n'.join(chunks)
600
600
601
601
602 bundletypes = {
602 bundletypes = {
603 b"": (b"", b'UN'), # only when using unbundle on ssh and old http servers
603 b"": (b"", b'UN'), # only when using unbundle on ssh and old http servers
604 # since the unification ssh accepts a header but there
604 # since the unification ssh accepts a header but there
605 # is no capability signaling it.
605 # is no capability signaling it.
606 b"HG20": (), # special-cased below
606 b"HG20": (), # special-cased below
607 b"HG10UN": (b"HG10UN", b'UN'),
607 b"HG10UN": (b"HG10UN", b'UN'),
608 b"HG10BZ": (b"HG10", b'BZ'),
608 b"HG10BZ": (b"HG10", b'BZ'),
609 b"HG10GZ": (b"HG10GZ", b'GZ'),
609 b"HG10GZ": (b"HG10GZ", b'GZ'),
610 }
610 }
611
611
612 # hgweb uses this list to communicate its preferred type
612 # hgweb uses this list to communicate its preferred type
613 bundlepriority = [b'HG10GZ', b'HG10BZ', b'HG10UN']
613 bundlepriority = [b'HG10GZ', b'HG10BZ', b'HG10UN']
614
614
615
615
616 class bundle20(object):
616 class bundle20(object):
617 """represent an outgoing bundle2 container
617 """represent an outgoing bundle2 container
618
618
619 Use the `addparam` method to add stream level parameter. and `newpart` to
619 Use the `addparam` method to add stream level parameter. and `newpart` to
620 populate it. Then call `getchunks` to retrieve all the binary chunks of
620 populate it. Then call `getchunks` to retrieve all the binary chunks of
621 data that compose the bundle2 container."""
621 data that compose the bundle2 container."""
622
622
623 _magicstring = b'HG20'
623 _magicstring = b'HG20'
624
624
625 def __init__(self, ui, capabilities=()):
625 def __init__(self, ui, capabilities=()):
626 self.ui = ui
626 self.ui = ui
627 self._params = []
627 self._params = []
628 self._parts = []
628 self._parts = []
629 self.capabilities = dict(capabilities)
629 self.capabilities = dict(capabilities)
630 self._compengine = util.compengines.forbundletype(b'UN')
630 self._compengine = util.compengines.forbundletype(b'UN')
631 self._compopts = None
631 self._compopts = None
632 # If compression is being handled by a consumer of the raw
632 # If compression is being handled by a consumer of the raw
633 # data (e.g. the wire protocol), unsetting this flag tells
633 # data (e.g. the wire protocol), unsetting this flag tells
634 # consumers that the bundle is best left uncompressed.
634 # consumers that the bundle is best left uncompressed.
635 self.prefercompressed = True
635 self.prefercompressed = True
636
636
637 def setcompression(self, alg, compopts=None):
637 def setcompression(self, alg, compopts=None):
638 """setup core part compression to <alg>"""
638 """setup core part compression to <alg>"""
639 if alg in (None, b'UN'):
639 if alg in (None, b'UN'):
640 return
640 return
641 assert not any(n.lower() == b'compression' for n, v in self._params)
641 assert not any(n.lower() == b'compression' for n, v in self._params)
642 self.addparam(b'Compression', alg)
642 self.addparam(b'Compression', alg)
643 self._compengine = util.compengines.forbundletype(alg)
643 self._compengine = util.compengines.forbundletype(alg)
644 self._compopts = compopts
644 self._compopts = compopts
645
645
646 @property
646 @property
647 def nbparts(self):
647 def nbparts(self):
648 """total number of parts added to the bundler"""
648 """total number of parts added to the bundler"""
649 return len(self._parts)
649 return len(self._parts)
650
650
651 # methods used to defines the bundle2 content
651 # methods used to defines the bundle2 content
652 def addparam(self, name, value=None):
652 def addparam(self, name, value=None):
653 """add a stream level parameter"""
653 """add a stream level parameter"""
654 if not name:
654 if not name:
655 raise error.ProgrammingError(b'empty parameter name')
655 raise error.ProgrammingError(b'empty parameter name')
656 if name[0:1] not in pycompat.bytestr(
656 if name[0:1] not in pycompat.bytestr(
657 string.ascii_letters # pytype: disable=wrong-arg-types
657 string.ascii_letters # pytype: disable=wrong-arg-types
658 ):
658 ):
659 raise error.ProgrammingError(
659 raise error.ProgrammingError(
660 b'non letter first character: %s' % name
660 b'non letter first character: %s' % name
661 )
661 )
662 self._params.append((name, value))
662 self._params.append((name, value))
663
663
664 def addpart(self, part):
664 def addpart(self, part):
665 """add a new part to the bundle2 container
665 """add a new part to the bundle2 container
666
666
667 Parts contains the actual applicative payload."""
667 Parts contains the actual applicative payload."""
668 assert part.id is None
668 assert part.id is None
669 part.id = len(self._parts) # very cheap counter
669 part.id = len(self._parts) # very cheap counter
670 self._parts.append(part)
670 self._parts.append(part)
671
671
672 def newpart(self, typeid, *args, **kwargs):
672 def newpart(self, typeid, *args, **kwargs):
673 """create a new part and add it to the containers
673 """create a new part and add it to the containers
674
674
675 As the part is directly added to the containers. For now, this means
675 As the part is directly added to the containers. For now, this means
676 that any failure to properly initialize the part after calling
676 that any failure to properly initialize the part after calling
677 ``newpart`` should result in a failure of the whole bundling process.
677 ``newpart`` should result in a failure of the whole bundling process.
678
678
679 You can still fall back to manually create and add if you need better
679 You can still fall back to manually create and add if you need better
680 control."""
680 control."""
681 part = bundlepart(typeid, *args, **kwargs)
681 part = bundlepart(typeid, *args, **kwargs)
682 self.addpart(part)
682 self.addpart(part)
683 return part
683 return part
684
684
685 # methods used to generate the bundle2 stream
685 # methods used to generate the bundle2 stream
686 def getchunks(self):
686 def getchunks(self):
687 if self.ui.debugflag:
687 if self.ui.debugflag:
688 msg = [b'bundle2-output-bundle: "%s",' % self._magicstring]
688 msg = [b'bundle2-output-bundle: "%s",' % self._magicstring]
689 if self._params:
689 if self._params:
690 msg.append(b' (%i params)' % len(self._params))
690 msg.append(b' (%i params)' % len(self._params))
691 msg.append(b' %i parts total\n' % len(self._parts))
691 msg.append(b' %i parts total\n' % len(self._parts))
692 self.ui.debug(b''.join(msg))
692 self.ui.debug(b''.join(msg))
693 outdebug(self.ui, b'start emission of %s stream' % self._magicstring)
693 outdebug(self.ui, b'start emission of %s stream' % self._magicstring)
694 yield self._magicstring
694 yield self._magicstring
695 param = self._paramchunk()
695 param = self._paramchunk()
696 outdebug(self.ui, b'bundle parameter: %s' % param)
696 outdebug(self.ui, b'bundle parameter: %s' % param)
697 yield _pack(_fstreamparamsize, len(param))
697 yield _pack(_fstreamparamsize, len(param))
698 if param:
698 if param:
699 yield param
699 yield param
700 for chunk in self._compengine.compressstream(
700 for chunk in self._compengine.compressstream(
701 self._getcorechunk(), self._compopts
701 self._getcorechunk(), self._compopts
702 ):
702 ):
703 yield chunk
703 yield chunk
704
704
705 def _paramchunk(self):
705 def _paramchunk(self):
706 """return a encoded version of all stream parameters"""
706 """return a encoded version of all stream parameters"""
707 blocks = []
707 blocks = []
708 for par, value in self._params:
708 for par, value in self._params:
709 par = urlreq.quote(par)
709 par = urlreq.quote(par)
710 if value is not None:
710 if value is not None:
711 value = urlreq.quote(value)
711 value = urlreq.quote(value)
712 par = b'%s=%s' % (par, value)
712 par = b'%s=%s' % (par, value)
713 blocks.append(par)
713 blocks.append(par)
714 return b' '.join(blocks)
714 return b' '.join(blocks)
715
715
716 def _getcorechunk(self):
716 def _getcorechunk(self):
717 """yield chunk for the core part of the bundle
717 """yield chunk for the core part of the bundle
718
718
719 (all but headers and parameters)"""
719 (all but headers and parameters)"""
720 outdebug(self.ui, b'start of parts')
720 outdebug(self.ui, b'start of parts')
721 for part in self._parts:
721 for part in self._parts:
722 outdebug(self.ui, b'bundle part: "%s"' % part.type)
722 outdebug(self.ui, b'bundle part: "%s"' % part.type)
723 for chunk in part.getchunks(ui=self.ui):
723 for chunk in part.getchunks(ui=self.ui):
724 yield chunk
724 yield chunk
725 outdebug(self.ui, b'end of bundle')
725 outdebug(self.ui, b'end of bundle')
726 yield _pack(_fpartheadersize, 0)
726 yield _pack(_fpartheadersize, 0)
727
727
728 def salvageoutput(self):
728 def salvageoutput(self):
729 """return a list with a copy of all output parts in the bundle
729 """return a list with a copy of all output parts in the bundle
730
730
731 This is meant to be used during error handling to make sure we preserve
731 This is meant to be used during error handling to make sure we preserve
732 server output"""
732 server output"""
733 salvaged = []
733 salvaged = []
734 for part in self._parts:
734 for part in self._parts:
735 if part.type.startswith(b'output'):
735 if part.type.startswith(b'output'):
736 salvaged.append(part.copy())
736 salvaged.append(part.copy())
737 return salvaged
737 return salvaged
738
738
739
739
740 class unpackermixin(object):
740 class unpackermixin(object):
741 """A mixin to extract bytes and struct data from a stream"""
741 """A mixin to extract bytes and struct data from a stream"""
742
742
743 def __init__(self, fp):
743 def __init__(self, fp):
744 self._fp = fp
744 self._fp = fp
745
745
746 def _unpack(self, format):
746 def _unpack(self, format):
747 """unpack this struct format from the stream
747 """unpack this struct format from the stream
748
748
749 This method is meant for internal usage by the bundle2 protocol only.
749 This method is meant for internal usage by the bundle2 protocol only.
750 They directly manipulate the low level stream including bundle2 level
750 They directly manipulate the low level stream including bundle2 level
751 instruction.
751 instruction.
752
752
753 Do not use it to implement higher-level logic or methods."""
753 Do not use it to implement higher-level logic or methods."""
754 data = self._readexact(struct.calcsize(format))
754 data = self._readexact(struct.calcsize(format))
755 return _unpack(format, data)
755 return _unpack(format, data)
756
756
757 def _readexact(self, size):
757 def _readexact(self, size):
758 """read exactly <size> bytes from the stream
758 """read exactly <size> bytes from the stream
759
759
760 This method is meant for internal usage by the bundle2 protocol only.
760 This method is meant for internal usage by the bundle2 protocol only.
761 They directly manipulate the low level stream including bundle2 level
761 They directly manipulate the low level stream including bundle2 level
762 instruction.
762 instruction.
763
763
764 Do not use it to implement higher-level logic or methods."""
764 Do not use it to implement higher-level logic or methods."""
765 return changegroup.readexactly(self._fp, size)
765 return changegroup.readexactly(self._fp, size)
766
766
767
767
768 def getunbundler(ui, fp, magicstring=None):
768 def getunbundler(ui, fp, magicstring=None):
769 """return a valid unbundler object for a given magicstring"""
769 """return a valid unbundler object for a given magicstring"""
770 if magicstring is None:
770 if magicstring is None:
771 magicstring = changegroup.readexactly(fp, 4)
771 magicstring = changegroup.readexactly(fp, 4)
772 magic, version = magicstring[0:2], magicstring[2:4]
772 magic, version = magicstring[0:2], magicstring[2:4]
773 if magic != b'HG':
773 if magic != b'HG':
774 ui.debug(
774 ui.debug(
775 b"error: invalid magic: %r (version %r), should be 'HG'\n"
775 b"error: invalid magic: %r (version %r), should be 'HG'\n"
776 % (magic, version)
776 % (magic, version)
777 )
777 )
778 raise error.Abort(_(b'not a Mercurial bundle'))
778 raise error.Abort(_(b'not a Mercurial bundle'))
779 unbundlerclass = formatmap.get(version)
779 unbundlerclass = formatmap.get(version)
780 if unbundlerclass is None:
780 if unbundlerclass is None:
781 raise error.Abort(_(b'unknown bundle version %s') % version)
781 raise error.Abort(_(b'unknown bundle version %s') % version)
782 unbundler = unbundlerclass(ui, fp)
782 unbundler = unbundlerclass(ui, fp)
783 indebug(ui, b'start processing of %s stream' % magicstring)
783 indebug(ui, b'start processing of %s stream' % magicstring)
784 return unbundler
784 return unbundler
785
785
786
786
787 class unbundle20(unpackermixin):
787 class unbundle20(unpackermixin):
788 """interpret a bundle2 stream
788 """interpret a bundle2 stream
789
789
790 This class is fed with a binary stream and yields parts through its
790 This class is fed with a binary stream and yields parts through its
791 `iterparts` methods."""
791 `iterparts` methods."""
792
792
793 _magicstring = b'HG20'
793 _magicstring = b'HG20'
794
794
795 def __init__(self, ui, fp):
795 def __init__(self, ui, fp):
796 """If header is specified, we do not read it out of the stream."""
796 """If header is specified, we do not read it out of the stream."""
797 self.ui = ui
797 self.ui = ui
798 self._compengine = util.compengines.forbundletype(b'UN')
798 self._compengine = util.compengines.forbundletype(b'UN')
799 self._compressed = None
799 self._compressed = None
800 super(unbundle20, self).__init__(fp)
800 super(unbundle20, self).__init__(fp)
801
801
802 @util.propertycache
802 @util.propertycache
803 def params(self):
803 def params(self):
804 """dictionary of stream level parameters"""
804 """dictionary of stream level parameters"""
805 indebug(self.ui, b'reading bundle2 stream parameters')
805 indebug(self.ui, b'reading bundle2 stream parameters')
806 params = {}
806 params = {}
807 paramssize = self._unpack(_fstreamparamsize)[0]
807 paramssize = self._unpack(_fstreamparamsize)[0]
808 if paramssize < 0:
808 if paramssize < 0:
809 raise error.BundleValueError(
809 raise error.BundleValueError(
810 b'negative bundle param size: %i' % paramssize
810 b'negative bundle param size: %i' % paramssize
811 )
811 )
812 if paramssize:
812 if paramssize:
813 params = self._readexact(paramssize)
813 params = self._readexact(paramssize)
814 params = self._processallparams(params)
814 params = self._processallparams(params)
815 return params
815 return params
816
816
817 def _processallparams(self, paramsblock):
817 def _processallparams(self, paramsblock):
818 """"""
818 """"""
819 params = util.sortdict()
819 params = util.sortdict()
820 for p in paramsblock.split(b' '):
820 for p in paramsblock.split(b' '):
821 p = p.split(b'=', 1)
821 p = p.split(b'=', 1)
822 p = [urlreq.unquote(i) for i in p]
822 p = [urlreq.unquote(i) for i in p]
823 if len(p) < 2:
823 if len(p) < 2:
824 p.append(None)
824 p.append(None)
825 self._processparam(*p)
825 self._processparam(*p)
826 params[p[0]] = p[1]
826 params[p[0]] = p[1]
827 return params
827 return params
828
828
829 def _processparam(self, name, value):
829 def _processparam(self, name, value):
830 """process a parameter, applying its effect if needed
830 """process a parameter, applying its effect if needed
831
831
832 Parameter starting with a lower case letter are advisory and will be
832 Parameter starting with a lower case letter are advisory and will be
833 ignored when unknown. Those starting with an upper case letter are
833 ignored when unknown. Those starting with an upper case letter are
834 mandatory and will this function will raise a KeyError when unknown.
834 mandatory and will this function will raise a KeyError when unknown.
835
835
836 Note: no option are currently supported. Any input will be either
836 Note: no option are currently supported. Any input will be either
837 ignored or failing.
837 ignored or failing.
838 """
838 """
839 if not name:
839 if not name:
840 raise ValueError('empty parameter name')
840 raise ValueError('empty parameter name')
841 if name[0:1] not in pycompat.bytestr(
841 if name[0:1] not in pycompat.bytestr(
842 string.ascii_letters # pytype: disable=wrong-arg-types
842 string.ascii_letters # pytype: disable=wrong-arg-types
843 ):
843 ):
844 raise ValueError('non letter first character: %s' % name)
844 raise ValueError('non letter first character: %s' % name)
845 try:
845 try:
846 handler = b2streamparamsmap[name.lower()]
846 handler = b2streamparamsmap[name.lower()]
847 except KeyError:
847 except KeyError:
848 if name[0:1].islower():
848 if name[0:1].islower():
849 indebug(self.ui, b"ignoring unknown parameter %s" % name)
849 indebug(self.ui, b"ignoring unknown parameter %s" % name)
850 else:
850 else:
851 raise error.BundleUnknownFeatureError(params=(name,))
851 raise error.BundleUnknownFeatureError(params=(name,))
852 else:
852 else:
853 handler(self, name, value)
853 handler(self, name, value)
854
854
855 def _forwardchunks(self):
855 def _forwardchunks(self):
856 """utility to transfer a bundle2 as binary
856 """utility to transfer a bundle2 as binary
857
857
858 This is made necessary by the fact the 'getbundle' command over 'ssh'
858 This is made necessary by the fact the 'getbundle' command over 'ssh'
859 have no way to know then the reply end, relying on the bundle to be
859 have no way to know then the reply end, relying on the bundle to be
860 interpreted to know its end. This is terrible and we are sorry, but we
860 interpreted to know its end. This is terrible and we are sorry, but we
861 needed to move forward to get general delta enabled.
861 needed to move forward to get general delta enabled.
862 """
862 """
863 yield self._magicstring
863 yield self._magicstring
864 assert 'params' not in vars(self)
864 assert 'params' not in vars(self)
865 paramssize = self._unpack(_fstreamparamsize)[0]
865 paramssize = self._unpack(_fstreamparamsize)[0]
866 if paramssize < 0:
866 if paramssize < 0:
867 raise error.BundleValueError(
867 raise error.BundleValueError(
868 b'negative bundle param size: %i' % paramssize
868 b'negative bundle param size: %i' % paramssize
869 )
869 )
870 if paramssize:
870 if paramssize:
871 params = self._readexact(paramssize)
871 params = self._readexact(paramssize)
872 self._processallparams(params)
872 self._processallparams(params)
873 # The payload itself is decompressed below, so drop
873 # The payload itself is decompressed below, so drop
874 # the compression parameter passed down to compensate.
874 # the compression parameter passed down to compensate.
875 outparams = []
875 outparams = []
876 for p in params.split(b' '):
876 for p in params.split(b' '):
877 k, v = p.split(b'=', 1)
877 k, v = p.split(b'=', 1)
878 if k.lower() != b'compression':
878 if k.lower() != b'compression':
879 outparams.append(p)
879 outparams.append(p)
880 outparams = b' '.join(outparams)
880 outparams = b' '.join(outparams)
881 yield _pack(_fstreamparamsize, len(outparams))
881 yield _pack(_fstreamparamsize, len(outparams))
882 yield outparams
882 yield outparams
883 else:
883 else:
884 yield _pack(_fstreamparamsize, paramssize)
884 yield _pack(_fstreamparamsize, paramssize)
885 # From there, payload might need to be decompressed
885 # From there, payload might need to be decompressed
886 self._fp = self._compengine.decompressorreader(self._fp)
886 self._fp = self._compengine.decompressorreader(self._fp)
887 emptycount = 0
887 emptycount = 0
888 while emptycount < 2:
888 while emptycount < 2:
889 # so we can brainlessly loop
889 # so we can brainlessly loop
890 assert _fpartheadersize == _fpayloadsize
890 assert _fpartheadersize == _fpayloadsize
891 size = self._unpack(_fpartheadersize)[0]
891 size = self._unpack(_fpartheadersize)[0]
892 yield _pack(_fpartheadersize, size)
892 yield _pack(_fpartheadersize, size)
893 if size:
893 if size:
894 emptycount = 0
894 emptycount = 0
895 else:
895 else:
896 emptycount += 1
896 emptycount += 1
897 continue
897 continue
898 if size == flaginterrupt:
898 if size == flaginterrupt:
899 continue
899 continue
900 elif size < 0:
900 elif size < 0:
901 raise error.BundleValueError(b'negative chunk size: %i')
901 raise error.BundleValueError(b'negative chunk size: %i')
902 yield self._readexact(size)
902 yield self._readexact(size)
903
903
904 def iterparts(self, seekable=False):
904 def iterparts(self, seekable=False):
905 """yield all parts contained in the stream"""
905 """yield all parts contained in the stream"""
906 cls = seekableunbundlepart if seekable else unbundlepart
906 cls = seekableunbundlepart if seekable else unbundlepart
907 # make sure param have been loaded
907 # make sure param have been loaded
908 self.params
908 self.params
909 # From there, payload need to be decompressed
909 # From there, payload need to be decompressed
910 self._fp = self._compengine.decompressorreader(self._fp)
910 self._fp = self._compengine.decompressorreader(self._fp)
911 indebug(self.ui, b'start extraction of bundle2 parts')
911 indebug(self.ui, b'start extraction of bundle2 parts')
912 headerblock = self._readpartheader()
912 headerblock = self._readpartheader()
913 while headerblock is not None:
913 while headerblock is not None:
914 part = cls(self.ui, headerblock, self._fp)
914 part = cls(self.ui, headerblock, self._fp)
915 yield part
915 yield part
916 # Ensure part is fully consumed so we can start reading the next
916 # Ensure part is fully consumed so we can start reading the next
917 # part.
917 # part.
918 part.consume()
918 part.consume()
919
919
920 headerblock = self._readpartheader()
920 headerblock = self._readpartheader()
921 indebug(self.ui, b'end of bundle2 stream')
921 indebug(self.ui, b'end of bundle2 stream')
922
922
923 def _readpartheader(self):
923 def _readpartheader(self):
924 """reads a part header size and return the bytes blob
924 """reads a part header size and return the bytes blob
925
925
926 returns None if empty"""
926 returns None if empty"""
927 headersize = self._unpack(_fpartheadersize)[0]
927 headersize = self._unpack(_fpartheadersize)[0]
928 if headersize < 0:
928 if headersize < 0:
929 raise error.BundleValueError(
929 raise error.BundleValueError(
930 b'negative part header size: %i' % headersize
930 b'negative part header size: %i' % headersize
931 )
931 )
932 indebug(self.ui, b'part header size: %i' % headersize)
932 indebug(self.ui, b'part header size: %i' % headersize)
933 if headersize:
933 if headersize:
934 return self._readexact(headersize)
934 return self._readexact(headersize)
935 return None
935 return None
936
936
937 def compressed(self):
937 def compressed(self):
938 self.params # load params
938 self.params # load params
939 return self._compressed
939 return self._compressed
940
940
941 def close(self):
941 def close(self):
942 """close underlying file"""
942 """close underlying file"""
943 if util.safehasattr(self._fp, 'close'):
943 if util.safehasattr(self._fp, 'close'):
944 return self._fp.close()
944 return self._fp.close()
945
945
946
946
947 formatmap = {b'20': unbundle20}
947 formatmap = {b'20': unbundle20}
948
948
949 b2streamparamsmap = {}
949 b2streamparamsmap = {}
950
950
951
951
952 def b2streamparamhandler(name):
952 def b2streamparamhandler(name):
953 """register a handler for a stream level parameter"""
953 """register a handler for a stream level parameter"""
954
954
955 def decorator(func):
955 def decorator(func):
956 assert name not in formatmap
956 assert name not in formatmap
957 b2streamparamsmap[name] = func
957 b2streamparamsmap[name] = func
958 return func
958 return func
959
959
960 return decorator
960 return decorator
961
961
962
962
963 @b2streamparamhandler(b'compression')
963 @b2streamparamhandler(b'compression')
964 def processcompression(unbundler, param, value):
964 def processcompression(unbundler, param, value):
965 """read compression parameter and install payload decompression"""
965 """read compression parameter and install payload decompression"""
966 if value not in util.compengines.supportedbundletypes:
966 if value not in util.compengines.supportedbundletypes:
967 raise error.BundleUnknownFeatureError(params=(param,), values=(value,))
967 raise error.BundleUnknownFeatureError(params=(param,), values=(value,))
968 unbundler._compengine = util.compengines.forbundletype(value)
968 unbundler._compengine = util.compengines.forbundletype(value)
969 if value is not None:
969 if value is not None:
970 unbundler._compressed = True
970 unbundler._compressed = True
971
971
972
972
973 class bundlepart(object):
973 class bundlepart(object):
974 """A bundle2 part contains application level payload
974 """A bundle2 part contains application level payload
975
975
976 The part `type` is used to route the part to the application level
976 The part `type` is used to route the part to the application level
977 handler.
977 handler.
978
978
979 The part payload is contained in ``part.data``. It could be raw bytes or a
979 The part payload is contained in ``part.data``. It could be raw bytes or a
980 generator of byte chunks.
980 generator of byte chunks.
981
981
982 You can add parameters to the part using the ``addparam`` method.
982 You can add parameters to the part using the ``addparam`` method.
983 Parameters can be either mandatory (default) or advisory. Remote side
983 Parameters can be either mandatory (default) or advisory. Remote side
984 should be able to safely ignore the advisory ones.
984 should be able to safely ignore the advisory ones.
985
985
986 Both data and parameters cannot be modified after the generation has begun.
986 Both data and parameters cannot be modified after the generation has begun.
987 """
987 """
988
988
989 def __init__(
989 def __init__(
990 self,
990 self,
991 parttype,
991 parttype,
992 mandatoryparams=(),
992 mandatoryparams=(),
993 advisoryparams=(),
993 advisoryparams=(),
994 data=b'',
994 data=b'',
995 mandatory=True,
995 mandatory=True,
996 ):
996 ):
997 validateparttype(parttype)
997 validateparttype(parttype)
998 self.id = None
998 self.id = None
999 self.type = parttype
999 self.type = parttype
1000 self._data = data
1000 self._data = data
1001 self._mandatoryparams = list(mandatoryparams)
1001 self._mandatoryparams = list(mandatoryparams)
1002 self._advisoryparams = list(advisoryparams)
1002 self._advisoryparams = list(advisoryparams)
1003 # checking for duplicated entries
1003 # checking for duplicated entries
1004 self._seenparams = set()
1004 self._seenparams = set()
1005 for pname, __ in self._mandatoryparams + self._advisoryparams:
1005 for pname, __ in self._mandatoryparams + self._advisoryparams:
1006 if pname in self._seenparams:
1006 if pname in self._seenparams:
1007 raise error.ProgrammingError(b'duplicated params: %s' % pname)
1007 raise error.ProgrammingError(b'duplicated params: %s' % pname)
1008 self._seenparams.add(pname)
1008 self._seenparams.add(pname)
1009 # status of the part's generation:
1009 # status of the part's generation:
1010 # - None: not started,
1010 # - None: not started,
1011 # - False: currently generated,
1011 # - False: currently generated,
1012 # - True: generation done.
1012 # - True: generation done.
1013 self._generated = None
1013 self._generated = None
1014 self.mandatory = mandatory
1014 self.mandatory = mandatory
1015
1015
1016 @encoding.strmethod
1017 def __repr__(self):
1016 def __repr__(self):
1018 cls = b"%s.%s" % (self.__class__.__module__, self.__class__.__name__)
1017 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
1019 return b'<%s object at %x; id: %s; type: %s; mandatory: %s>' % (
1018 return '<%s object at %x; id: %s; type: %s; mandatory: %s>' % (
1020 cls,
1019 cls,
1021 id(self),
1020 id(self),
1022 self.id,
1021 self.id,
1023 self.type,
1022 self.type,
1024 self.mandatory,
1023 self.mandatory,
1025 )
1024 )
1026
1025
1027 def copy(self):
1026 def copy(self):
1028 """return a copy of the part
1027 """return a copy of the part
1029
1028
1030 The new part have the very same content but no partid assigned yet.
1029 The new part have the very same content but no partid assigned yet.
1031 Parts with generated data cannot be copied."""
1030 Parts with generated data cannot be copied."""
1032 assert not util.safehasattr(self.data, 'next')
1031 assert not util.safehasattr(self.data, 'next')
1033 return self.__class__(
1032 return self.__class__(
1034 self.type,
1033 self.type,
1035 self._mandatoryparams,
1034 self._mandatoryparams,
1036 self._advisoryparams,
1035 self._advisoryparams,
1037 self._data,
1036 self._data,
1038 self.mandatory,
1037 self.mandatory,
1039 )
1038 )
1040
1039
1041 # methods used to defines the part content
1040 # methods used to defines the part content
1042 @property
1041 @property
1043 def data(self):
1042 def data(self):
1044 return self._data
1043 return self._data
1045
1044
1046 @data.setter
1045 @data.setter
1047 def data(self, data):
1046 def data(self, data):
1048 if self._generated is not None:
1047 if self._generated is not None:
1049 raise error.ReadOnlyPartError(b'part is being generated')
1048 raise error.ReadOnlyPartError(b'part is being generated')
1050 self._data = data
1049 self._data = data
1051
1050
1052 @property
1051 @property
1053 def mandatoryparams(self):
1052 def mandatoryparams(self):
1054 # make it an immutable tuple to force people through ``addparam``
1053 # make it an immutable tuple to force people through ``addparam``
1055 return tuple(self._mandatoryparams)
1054 return tuple(self._mandatoryparams)
1056
1055
1057 @property
1056 @property
1058 def advisoryparams(self):
1057 def advisoryparams(self):
1059 # make it an immutable tuple to force people through ``addparam``
1058 # make it an immutable tuple to force people through ``addparam``
1060 return tuple(self._advisoryparams)
1059 return tuple(self._advisoryparams)
1061
1060
1062 def addparam(self, name, value=b'', mandatory=True):
1061 def addparam(self, name, value=b'', mandatory=True):
1063 """add a parameter to the part
1062 """add a parameter to the part
1064
1063
1065 If 'mandatory' is set to True, the remote handler must claim support
1064 If 'mandatory' is set to True, the remote handler must claim support
1066 for this parameter or the unbundling will be aborted.
1065 for this parameter or the unbundling will be aborted.
1067
1066
1068 The 'name' and 'value' cannot exceed 255 bytes each.
1067 The 'name' and 'value' cannot exceed 255 bytes each.
1069 """
1068 """
1070 if self._generated is not None:
1069 if self._generated is not None:
1071 raise error.ReadOnlyPartError(b'part is being generated')
1070 raise error.ReadOnlyPartError(b'part is being generated')
1072 if name in self._seenparams:
1071 if name in self._seenparams:
1073 raise ValueError(b'duplicated params: %s' % name)
1072 raise ValueError(b'duplicated params: %s' % name)
1074 self._seenparams.add(name)
1073 self._seenparams.add(name)
1075 params = self._advisoryparams
1074 params = self._advisoryparams
1076 if mandatory:
1075 if mandatory:
1077 params = self._mandatoryparams
1076 params = self._mandatoryparams
1078 params.append((name, value))
1077 params.append((name, value))
1079
1078
1080 # methods used to generates the bundle2 stream
1079 # methods used to generates the bundle2 stream
1081 def getchunks(self, ui):
1080 def getchunks(self, ui):
1082 if self._generated is not None:
1081 if self._generated is not None:
1083 raise error.ProgrammingError(b'part can only be consumed once')
1082 raise error.ProgrammingError(b'part can only be consumed once')
1084 self._generated = False
1083 self._generated = False
1085
1084
1086 if ui.debugflag:
1085 if ui.debugflag:
1087 msg = [b'bundle2-output-part: "%s"' % self.type]
1086 msg = [b'bundle2-output-part: "%s"' % self.type]
1088 if not self.mandatory:
1087 if not self.mandatory:
1089 msg.append(b' (advisory)')
1088 msg.append(b' (advisory)')
1090 nbmp = len(self.mandatoryparams)
1089 nbmp = len(self.mandatoryparams)
1091 nbap = len(self.advisoryparams)
1090 nbap = len(self.advisoryparams)
1092 if nbmp or nbap:
1091 if nbmp or nbap:
1093 msg.append(b' (params:')
1092 msg.append(b' (params:')
1094 if nbmp:
1093 if nbmp:
1095 msg.append(b' %i mandatory' % nbmp)
1094 msg.append(b' %i mandatory' % nbmp)
1096 if nbap:
1095 if nbap:
1097 msg.append(b' %i advisory' % nbmp)
1096 msg.append(b' %i advisory' % nbmp)
1098 msg.append(b')')
1097 msg.append(b')')
1099 if not self.data:
1098 if not self.data:
1100 msg.append(b' empty payload')
1099 msg.append(b' empty payload')
1101 elif util.safehasattr(self.data, 'next') or util.safehasattr(
1100 elif util.safehasattr(self.data, 'next') or util.safehasattr(
1102 self.data, b'__next__'
1101 self.data, b'__next__'
1103 ):
1102 ):
1104 msg.append(b' streamed payload')
1103 msg.append(b' streamed payload')
1105 else:
1104 else:
1106 msg.append(b' %i bytes payload' % len(self.data))
1105 msg.append(b' %i bytes payload' % len(self.data))
1107 msg.append(b'\n')
1106 msg.append(b'\n')
1108 ui.debug(b''.join(msg))
1107 ui.debug(b''.join(msg))
1109
1108
1110 #### header
1109 #### header
1111 if self.mandatory:
1110 if self.mandatory:
1112 parttype = self.type.upper()
1111 parttype = self.type.upper()
1113 else:
1112 else:
1114 parttype = self.type.lower()
1113 parttype = self.type.lower()
1115 outdebug(ui, b'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1114 outdebug(ui, b'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1116 ## parttype
1115 ## parttype
1117 header = [
1116 header = [
1118 _pack(_fparttypesize, len(parttype)),
1117 _pack(_fparttypesize, len(parttype)),
1119 parttype,
1118 parttype,
1120 _pack(_fpartid, self.id),
1119 _pack(_fpartid, self.id),
1121 ]
1120 ]
1122 ## parameters
1121 ## parameters
1123 # count
1122 # count
1124 manpar = self.mandatoryparams
1123 manpar = self.mandatoryparams
1125 advpar = self.advisoryparams
1124 advpar = self.advisoryparams
1126 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1125 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1127 # size
1126 # size
1128 parsizes = []
1127 parsizes = []
1129 for key, value in manpar:
1128 for key, value in manpar:
1130 parsizes.append(len(key))
1129 parsizes.append(len(key))
1131 parsizes.append(len(value))
1130 parsizes.append(len(value))
1132 for key, value in advpar:
1131 for key, value in advpar:
1133 parsizes.append(len(key))
1132 parsizes.append(len(key))
1134 parsizes.append(len(value))
1133 parsizes.append(len(value))
1135 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1134 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1136 header.append(paramsizes)
1135 header.append(paramsizes)
1137 # key, value
1136 # key, value
1138 for key, value in manpar:
1137 for key, value in manpar:
1139 header.append(key)
1138 header.append(key)
1140 header.append(value)
1139 header.append(value)
1141 for key, value in advpar:
1140 for key, value in advpar:
1142 header.append(key)
1141 header.append(key)
1143 header.append(value)
1142 header.append(value)
1144 ## finalize header
1143 ## finalize header
1145 try:
1144 try:
1146 headerchunk = b''.join(header)
1145 headerchunk = b''.join(header)
1147 except TypeError:
1146 except TypeError:
1148 raise TypeError(
1147 raise TypeError(
1149 'Found a non-bytes trying to '
1148 'Found a non-bytes trying to '
1150 'build bundle part header: %r' % header
1149 'build bundle part header: %r' % header
1151 )
1150 )
1152 outdebug(ui, b'header chunk size: %i' % len(headerchunk))
1151 outdebug(ui, b'header chunk size: %i' % len(headerchunk))
1153 yield _pack(_fpartheadersize, len(headerchunk))
1152 yield _pack(_fpartheadersize, len(headerchunk))
1154 yield headerchunk
1153 yield headerchunk
1155 ## payload
1154 ## payload
1156 try:
1155 try:
1157 for chunk in self._payloadchunks():
1156 for chunk in self._payloadchunks():
1158 outdebug(ui, b'payload chunk size: %i' % len(chunk))
1157 outdebug(ui, b'payload chunk size: %i' % len(chunk))
1159 yield _pack(_fpayloadsize, len(chunk))
1158 yield _pack(_fpayloadsize, len(chunk))
1160 yield chunk
1159 yield chunk
1161 except GeneratorExit:
1160 except GeneratorExit:
1162 # GeneratorExit means that nobody is listening for our
1161 # GeneratorExit means that nobody is listening for our
1163 # results anyway, so just bail quickly rather than trying
1162 # results anyway, so just bail quickly rather than trying
1164 # to produce an error part.
1163 # to produce an error part.
1165 ui.debug(b'bundle2-generatorexit\n')
1164 ui.debug(b'bundle2-generatorexit\n')
1166 raise
1165 raise
1167 except BaseException as exc:
1166 except BaseException as exc:
1168 bexc = stringutil.forcebytestr(exc)
1167 bexc = stringutil.forcebytestr(exc)
1169 # backup exception data for later
1168 # backup exception data for later
1170 ui.debug(
1169 ui.debug(
1171 b'bundle2-input-stream-interrupt: encoding exception %s' % bexc
1170 b'bundle2-input-stream-interrupt: encoding exception %s' % bexc
1172 )
1171 )
1173 tb = sys.exc_info()[2]
1172 tb = sys.exc_info()[2]
1174 msg = b'unexpected error: %s' % bexc
1173 msg = b'unexpected error: %s' % bexc
1175 interpart = bundlepart(
1174 interpart = bundlepart(
1176 b'error:abort', [(b'message', msg)], mandatory=False
1175 b'error:abort', [(b'message', msg)], mandatory=False
1177 )
1176 )
1178 interpart.id = 0
1177 interpart.id = 0
1179 yield _pack(_fpayloadsize, -1)
1178 yield _pack(_fpayloadsize, -1)
1180 for chunk in interpart.getchunks(ui=ui):
1179 for chunk in interpart.getchunks(ui=ui):
1181 yield chunk
1180 yield chunk
1182 outdebug(ui, b'closing payload chunk')
1181 outdebug(ui, b'closing payload chunk')
1183 # abort current part payload
1182 # abort current part payload
1184 yield _pack(_fpayloadsize, 0)
1183 yield _pack(_fpayloadsize, 0)
1185 pycompat.raisewithtb(exc, tb)
1184 pycompat.raisewithtb(exc, tb)
1186 # end of payload
1185 # end of payload
1187 outdebug(ui, b'closing payload chunk')
1186 outdebug(ui, b'closing payload chunk')
1188 yield _pack(_fpayloadsize, 0)
1187 yield _pack(_fpayloadsize, 0)
1189 self._generated = True
1188 self._generated = True
1190
1189
1191 def _payloadchunks(self):
1190 def _payloadchunks(self):
1192 """yield chunks of a the part payload
1191 """yield chunks of a the part payload
1193
1192
1194 Exists to handle the different methods to provide data to a part."""
1193 Exists to handle the different methods to provide data to a part."""
1195 # we only support fixed size data now.
1194 # we only support fixed size data now.
1196 # This will be improved in the future.
1195 # This will be improved in the future.
1197 if util.safehasattr(self.data, 'next') or util.safehasattr(
1196 if util.safehasattr(self.data, 'next') or util.safehasattr(
1198 self.data, b'__next__'
1197 self.data, b'__next__'
1199 ):
1198 ):
1200 buff = util.chunkbuffer(self.data)
1199 buff = util.chunkbuffer(self.data)
1201 chunk = buff.read(preferedchunksize)
1200 chunk = buff.read(preferedchunksize)
1202 while chunk:
1201 while chunk:
1203 yield chunk
1202 yield chunk
1204 chunk = buff.read(preferedchunksize)
1203 chunk = buff.read(preferedchunksize)
1205 elif len(self.data):
1204 elif len(self.data):
1206 yield self.data
1205 yield self.data
1207
1206
1208
1207
1209 flaginterrupt = -1
1208 flaginterrupt = -1
1210
1209
1211
1210
1212 class interrupthandler(unpackermixin):
1211 class interrupthandler(unpackermixin):
1213 """read one part and process it with restricted capability
1212 """read one part and process it with restricted capability
1214
1213
1215 This allows to transmit exception raised on the producer size during part
1214 This allows to transmit exception raised on the producer size during part
1216 iteration while the consumer is reading a part.
1215 iteration while the consumer is reading a part.
1217
1216
1218 Part processed in this manner only have access to a ui object,"""
1217 Part processed in this manner only have access to a ui object,"""
1219
1218
1220 def __init__(self, ui, fp):
1219 def __init__(self, ui, fp):
1221 super(interrupthandler, self).__init__(fp)
1220 super(interrupthandler, self).__init__(fp)
1222 self.ui = ui
1221 self.ui = ui
1223
1222
1224 def _readpartheader(self):
1223 def _readpartheader(self):
1225 """reads a part header size and return the bytes blob
1224 """reads a part header size and return the bytes blob
1226
1225
1227 returns None if empty"""
1226 returns None if empty"""
1228 headersize = self._unpack(_fpartheadersize)[0]
1227 headersize = self._unpack(_fpartheadersize)[0]
1229 if headersize < 0:
1228 if headersize < 0:
1230 raise error.BundleValueError(
1229 raise error.BundleValueError(
1231 b'negative part header size: %i' % headersize
1230 b'negative part header size: %i' % headersize
1232 )
1231 )
1233 indebug(self.ui, b'part header size: %i\n' % headersize)
1232 indebug(self.ui, b'part header size: %i\n' % headersize)
1234 if headersize:
1233 if headersize:
1235 return self._readexact(headersize)
1234 return self._readexact(headersize)
1236 return None
1235 return None
1237
1236
1238 def __call__(self):
1237 def __call__(self):
1239
1238
1240 self.ui.debug(
1239 self.ui.debug(
1241 b'bundle2-input-stream-interrupt: opening out of band context\n'
1240 b'bundle2-input-stream-interrupt: opening out of band context\n'
1242 )
1241 )
1243 indebug(self.ui, b'bundle2 stream interruption, looking for a part.')
1242 indebug(self.ui, b'bundle2 stream interruption, looking for a part.')
1244 headerblock = self._readpartheader()
1243 headerblock = self._readpartheader()
1245 if headerblock is None:
1244 if headerblock is None:
1246 indebug(self.ui, b'no part found during interruption.')
1245 indebug(self.ui, b'no part found during interruption.')
1247 return
1246 return
1248 part = unbundlepart(self.ui, headerblock, self._fp)
1247 part = unbundlepart(self.ui, headerblock, self._fp)
1249 op = interruptoperation(self.ui)
1248 op = interruptoperation(self.ui)
1250 hardabort = False
1249 hardabort = False
1251 try:
1250 try:
1252 _processpart(op, part)
1251 _processpart(op, part)
1253 except (SystemExit, KeyboardInterrupt):
1252 except (SystemExit, KeyboardInterrupt):
1254 hardabort = True
1253 hardabort = True
1255 raise
1254 raise
1256 finally:
1255 finally:
1257 if not hardabort:
1256 if not hardabort:
1258 part.consume()
1257 part.consume()
1259 self.ui.debug(
1258 self.ui.debug(
1260 b'bundle2-input-stream-interrupt: closing out of band context\n'
1259 b'bundle2-input-stream-interrupt: closing out of band context\n'
1261 )
1260 )
1262
1261
1263
1262
1264 class interruptoperation(object):
1263 class interruptoperation(object):
1265 """A limited operation to be use by part handler during interruption
1264 """A limited operation to be use by part handler during interruption
1266
1265
1267 It only have access to an ui object.
1266 It only have access to an ui object.
1268 """
1267 """
1269
1268
1270 def __init__(self, ui):
1269 def __init__(self, ui):
1271 self.ui = ui
1270 self.ui = ui
1272 self.reply = None
1271 self.reply = None
1273 self.captureoutput = False
1272 self.captureoutput = False
1274
1273
1275 @property
1274 @property
1276 def repo(self):
1275 def repo(self):
1277 raise error.ProgrammingError(b'no repo access from stream interruption')
1276 raise error.ProgrammingError(b'no repo access from stream interruption')
1278
1277
1279 def gettransaction(self):
1278 def gettransaction(self):
1280 raise TransactionUnavailable(b'no repo access from stream interruption')
1279 raise TransactionUnavailable(b'no repo access from stream interruption')
1281
1280
1282
1281
1283 def decodepayloadchunks(ui, fh):
1282 def decodepayloadchunks(ui, fh):
1284 """Reads bundle2 part payload data into chunks.
1283 """Reads bundle2 part payload data into chunks.
1285
1284
1286 Part payload data consists of framed chunks. This function takes
1285 Part payload data consists of framed chunks. This function takes
1287 a file handle and emits those chunks.
1286 a file handle and emits those chunks.
1288 """
1287 """
1289 dolog = ui.configbool(b'devel', b'bundle2.debug')
1288 dolog = ui.configbool(b'devel', b'bundle2.debug')
1290 debug = ui.debug
1289 debug = ui.debug
1291
1290
1292 headerstruct = struct.Struct(_fpayloadsize)
1291 headerstruct = struct.Struct(_fpayloadsize)
1293 headersize = headerstruct.size
1292 headersize = headerstruct.size
1294 unpack = headerstruct.unpack
1293 unpack = headerstruct.unpack
1295
1294
1296 readexactly = changegroup.readexactly
1295 readexactly = changegroup.readexactly
1297 read = fh.read
1296 read = fh.read
1298
1297
1299 chunksize = unpack(readexactly(fh, headersize))[0]
1298 chunksize = unpack(readexactly(fh, headersize))[0]
1300 indebug(ui, b'payload chunk size: %i' % chunksize)
1299 indebug(ui, b'payload chunk size: %i' % chunksize)
1301
1300
1302 # changegroup.readexactly() is inlined below for performance.
1301 # changegroup.readexactly() is inlined below for performance.
1303 while chunksize:
1302 while chunksize:
1304 if chunksize >= 0:
1303 if chunksize >= 0:
1305 s = read(chunksize)
1304 s = read(chunksize)
1306 if len(s) < chunksize:
1305 if len(s) < chunksize:
1307 raise error.Abort(
1306 raise error.Abort(
1308 _(
1307 _(
1309 b'stream ended unexpectedly '
1308 b'stream ended unexpectedly '
1310 b' (got %d bytes, expected %d)'
1309 b' (got %d bytes, expected %d)'
1311 )
1310 )
1312 % (len(s), chunksize)
1311 % (len(s), chunksize)
1313 )
1312 )
1314
1313
1315 yield s
1314 yield s
1316 elif chunksize == flaginterrupt:
1315 elif chunksize == flaginterrupt:
1317 # Interrupt "signal" detected. The regular stream is interrupted
1316 # Interrupt "signal" detected. The regular stream is interrupted
1318 # and a bundle2 part follows. Consume it.
1317 # and a bundle2 part follows. Consume it.
1319 interrupthandler(ui, fh)()
1318 interrupthandler(ui, fh)()
1320 else:
1319 else:
1321 raise error.BundleValueError(
1320 raise error.BundleValueError(
1322 b'negative payload chunk size: %s' % chunksize
1321 b'negative payload chunk size: %s' % chunksize
1323 )
1322 )
1324
1323
1325 s = read(headersize)
1324 s = read(headersize)
1326 if len(s) < headersize:
1325 if len(s) < headersize:
1327 raise error.Abort(
1326 raise error.Abort(
1328 _(b'stream ended unexpectedly (got %d bytes, expected %d)')
1327 _(b'stream ended unexpectedly (got %d bytes, expected %d)')
1329 % (len(s), chunksize)
1328 % (len(s), chunksize)
1330 )
1329 )
1331
1330
1332 chunksize = unpack(s)[0]
1331 chunksize = unpack(s)[0]
1333
1332
1334 # indebug() inlined for performance.
1333 # indebug() inlined for performance.
1335 if dolog:
1334 if dolog:
1336 debug(b'bundle2-input: payload chunk size: %i\n' % chunksize)
1335 debug(b'bundle2-input: payload chunk size: %i\n' % chunksize)
1337
1336
1338
1337
1339 class unbundlepart(unpackermixin):
1338 class unbundlepart(unpackermixin):
1340 """a bundle part read from a bundle"""
1339 """a bundle part read from a bundle"""
1341
1340
1342 def __init__(self, ui, header, fp):
1341 def __init__(self, ui, header, fp):
1343 super(unbundlepart, self).__init__(fp)
1342 super(unbundlepart, self).__init__(fp)
1344 self._seekable = util.safehasattr(fp, 'seek') and util.safehasattr(
1343 self._seekable = util.safehasattr(fp, 'seek') and util.safehasattr(
1345 fp, b'tell'
1344 fp, b'tell'
1346 )
1345 )
1347 self.ui = ui
1346 self.ui = ui
1348 # unbundle state attr
1347 # unbundle state attr
1349 self._headerdata = header
1348 self._headerdata = header
1350 self._headeroffset = 0
1349 self._headeroffset = 0
1351 self._initialized = False
1350 self._initialized = False
1352 self.consumed = False
1351 self.consumed = False
1353 # part data
1352 # part data
1354 self.id = None
1353 self.id = None
1355 self.type = None
1354 self.type = None
1356 self.mandatoryparams = None
1355 self.mandatoryparams = None
1357 self.advisoryparams = None
1356 self.advisoryparams = None
1358 self.params = None
1357 self.params = None
1359 self.mandatorykeys = ()
1358 self.mandatorykeys = ()
1360 self._readheader()
1359 self._readheader()
1361 self._mandatory = None
1360 self._mandatory = None
1362 self._pos = 0
1361 self._pos = 0
1363
1362
1364 def _fromheader(self, size):
1363 def _fromheader(self, size):
1365 """return the next <size> byte from the header"""
1364 """return the next <size> byte from the header"""
1366 offset = self._headeroffset
1365 offset = self._headeroffset
1367 data = self._headerdata[offset : (offset + size)]
1366 data = self._headerdata[offset : (offset + size)]
1368 self._headeroffset = offset + size
1367 self._headeroffset = offset + size
1369 return data
1368 return data
1370
1369
1371 def _unpackheader(self, format):
1370 def _unpackheader(self, format):
1372 """read given format from header
1371 """read given format from header
1373
1372
1374 This automatically compute the size of the format to read."""
1373 This automatically compute the size of the format to read."""
1375 data = self._fromheader(struct.calcsize(format))
1374 data = self._fromheader(struct.calcsize(format))
1376 return _unpack(format, data)
1375 return _unpack(format, data)
1377
1376
1378 def _initparams(self, mandatoryparams, advisoryparams):
1377 def _initparams(self, mandatoryparams, advisoryparams):
1379 """internal function to setup all logic related parameters"""
1378 """internal function to setup all logic related parameters"""
1380 # make it read only to prevent people touching it by mistake.
1379 # make it read only to prevent people touching it by mistake.
1381 self.mandatoryparams = tuple(mandatoryparams)
1380 self.mandatoryparams = tuple(mandatoryparams)
1382 self.advisoryparams = tuple(advisoryparams)
1381 self.advisoryparams = tuple(advisoryparams)
1383 # user friendly UI
1382 # user friendly UI
1384 self.params = util.sortdict(self.mandatoryparams)
1383 self.params = util.sortdict(self.mandatoryparams)
1385 self.params.update(self.advisoryparams)
1384 self.params.update(self.advisoryparams)
1386 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1385 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1387
1386
1388 def _readheader(self):
1387 def _readheader(self):
1389 """read the header and setup the object"""
1388 """read the header and setup the object"""
1390 typesize = self._unpackheader(_fparttypesize)[0]
1389 typesize = self._unpackheader(_fparttypesize)[0]
1391 self.type = self._fromheader(typesize)
1390 self.type = self._fromheader(typesize)
1392 indebug(self.ui, b'part type: "%s"' % self.type)
1391 indebug(self.ui, b'part type: "%s"' % self.type)
1393 self.id = self._unpackheader(_fpartid)[0]
1392 self.id = self._unpackheader(_fpartid)[0]
1394 indebug(self.ui, b'part id: "%s"' % pycompat.bytestr(self.id))
1393 indebug(self.ui, b'part id: "%s"' % pycompat.bytestr(self.id))
1395 # extract mandatory bit from type
1394 # extract mandatory bit from type
1396 self.mandatory = self.type != self.type.lower()
1395 self.mandatory = self.type != self.type.lower()
1397 self.type = self.type.lower()
1396 self.type = self.type.lower()
1398 ## reading parameters
1397 ## reading parameters
1399 # param count
1398 # param count
1400 mancount, advcount = self._unpackheader(_fpartparamcount)
1399 mancount, advcount = self._unpackheader(_fpartparamcount)
1401 indebug(self.ui, b'part parameters: %i' % (mancount + advcount))
1400 indebug(self.ui, b'part parameters: %i' % (mancount + advcount))
1402 # param size
1401 # param size
1403 fparamsizes = _makefpartparamsizes(mancount + advcount)
1402 fparamsizes = _makefpartparamsizes(mancount + advcount)
1404 paramsizes = self._unpackheader(fparamsizes)
1403 paramsizes = self._unpackheader(fparamsizes)
1405 # make it a list of couple again
1404 # make it a list of couple again
1406 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1405 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1407 # split mandatory from advisory
1406 # split mandatory from advisory
1408 mansizes = paramsizes[:mancount]
1407 mansizes = paramsizes[:mancount]
1409 advsizes = paramsizes[mancount:]
1408 advsizes = paramsizes[mancount:]
1410 # retrieve param value
1409 # retrieve param value
1411 manparams = []
1410 manparams = []
1412 for key, value in mansizes:
1411 for key, value in mansizes:
1413 manparams.append((self._fromheader(key), self._fromheader(value)))
1412 manparams.append((self._fromheader(key), self._fromheader(value)))
1414 advparams = []
1413 advparams = []
1415 for key, value in advsizes:
1414 for key, value in advsizes:
1416 advparams.append((self._fromheader(key), self._fromheader(value)))
1415 advparams.append((self._fromheader(key), self._fromheader(value)))
1417 self._initparams(manparams, advparams)
1416 self._initparams(manparams, advparams)
1418 ## part payload
1417 ## part payload
1419 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1418 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1420 # we read the data, tell it
1419 # we read the data, tell it
1421 self._initialized = True
1420 self._initialized = True
1422
1421
1423 def _payloadchunks(self):
1422 def _payloadchunks(self):
1424 """Generator of decoded chunks in the payload."""
1423 """Generator of decoded chunks in the payload."""
1425 return decodepayloadchunks(self.ui, self._fp)
1424 return decodepayloadchunks(self.ui, self._fp)
1426
1425
1427 def consume(self):
1426 def consume(self):
1428 """Read the part payload until completion.
1427 """Read the part payload until completion.
1429
1428
1430 By consuming the part data, the underlying stream read offset will
1429 By consuming the part data, the underlying stream read offset will
1431 be advanced to the next part (or end of stream).
1430 be advanced to the next part (or end of stream).
1432 """
1431 """
1433 if self.consumed:
1432 if self.consumed:
1434 return
1433 return
1435
1434
1436 chunk = self.read(32768)
1435 chunk = self.read(32768)
1437 while chunk:
1436 while chunk:
1438 self._pos += len(chunk)
1437 self._pos += len(chunk)
1439 chunk = self.read(32768)
1438 chunk = self.read(32768)
1440
1439
1441 def read(self, size=None):
1440 def read(self, size=None):
1442 """read payload data"""
1441 """read payload data"""
1443 if not self._initialized:
1442 if not self._initialized:
1444 self._readheader()
1443 self._readheader()
1445 if size is None:
1444 if size is None:
1446 data = self._payloadstream.read()
1445 data = self._payloadstream.read()
1447 else:
1446 else:
1448 data = self._payloadstream.read(size)
1447 data = self._payloadstream.read(size)
1449 self._pos += len(data)
1448 self._pos += len(data)
1450 if size is None or len(data) < size:
1449 if size is None or len(data) < size:
1451 if not self.consumed and self._pos:
1450 if not self.consumed and self._pos:
1452 self.ui.debug(
1451 self.ui.debug(
1453 b'bundle2-input-part: total payload size %i\n' % self._pos
1452 b'bundle2-input-part: total payload size %i\n' % self._pos
1454 )
1453 )
1455 self.consumed = True
1454 self.consumed = True
1456 return data
1455 return data
1457
1456
1458
1457
1459 class seekableunbundlepart(unbundlepart):
1458 class seekableunbundlepart(unbundlepart):
1460 """A bundle2 part in a bundle that is seekable.
1459 """A bundle2 part in a bundle that is seekable.
1461
1460
1462 Regular ``unbundlepart`` instances can only be read once. This class
1461 Regular ``unbundlepart`` instances can only be read once. This class
1463 extends ``unbundlepart`` to enable bi-directional seeking within the
1462 extends ``unbundlepart`` to enable bi-directional seeking within the
1464 part.
1463 part.
1465
1464
1466 Bundle2 part data consists of framed chunks. Offsets when seeking
1465 Bundle2 part data consists of framed chunks. Offsets when seeking
1467 refer to the decoded data, not the offsets in the underlying bundle2
1466 refer to the decoded data, not the offsets in the underlying bundle2
1468 stream.
1467 stream.
1469
1468
1470 To facilitate quickly seeking within the decoded data, instances of this
1469 To facilitate quickly seeking within the decoded data, instances of this
1471 class maintain a mapping between offsets in the underlying stream and
1470 class maintain a mapping between offsets in the underlying stream and
1472 the decoded payload. This mapping will consume memory in proportion
1471 the decoded payload. This mapping will consume memory in proportion
1473 to the number of chunks within the payload (which almost certainly
1472 to the number of chunks within the payload (which almost certainly
1474 increases in proportion with the size of the part).
1473 increases in proportion with the size of the part).
1475 """
1474 """
1476
1475
1477 def __init__(self, ui, header, fp):
1476 def __init__(self, ui, header, fp):
1478 # (payload, file) offsets for chunk starts.
1477 # (payload, file) offsets for chunk starts.
1479 self._chunkindex = []
1478 self._chunkindex = []
1480
1479
1481 super(seekableunbundlepart, self).__init__(ui, header, fp)
1480 super(seekableunbundlepart, self).__init__(ui, header, fp)
1482
1481
1483 def _payloadchunks(self, chunknum=0):
1482 def _payloadchunks(self, chunknum=0):
1484 '''seek to specified chunk and start yielding data'''
1483 '''seek to specified chunk and start yielding data'''
1485 if len(self._chunkindex) == 0:
1484 if len(self._chunkindex) == 0:
1486 assert chunknum == 0, b'Must start with chunk 0'
1485 assert chunknum == 0, b'Must start with chunk 0'
1487 self._chunkindex.append((0, self._tellfp()))
1486 self._chunkindex.append((0, self._tellfp()))
1488 else:
1487 else:
1489 assert chunknum < len(self._chunkindex), (
1488 assert chunknum < len(self._chunkindex), (
1490 b'Unknown chunk %d' % chunknum
1489 b'Unknown chunk %d' % chunknum
1491 )
1490 )
1492 self._seekfp(self._chunkindex[chunknum][1])
1491 self._seekfp(self._chunkindex[chunknum][1])
1493
1492
1494 pos = self._chunkindex[chunknum][0]
1493 pos = self._chunkindex[chunknum][0]
1495
1494
1496 for chunk in decodepayloadchunks(self.ui, self._fp):
1495 for chunk in decodepayloadchunks(self.ui, self._fp):
1497 chunknum += 1
1496 chunknum += 1
1498 pos += len(chunk)
1497 pos += len(chunk)
1499 if chunknum == len(self._chunkindex):
1498 if chunknum == len(self._chunkindex):
1500 self._chunkindex.append((pos, self._tellfp()))
1499 self._chunkindex.append((pos, self._tellfp()))
1501
1500
1502 yield chunk
1501 yield chunk
1503
1502
1504 def _findchunk(self, pos):
1503 def _findchunk(self, pos):
1505 '''for a given payload position, return a chunk number and offset'''
1504 '''for a given payload position, return a chunk number and offset'''
1506 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1505 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1507 if ppos == pos:
1506 if ppos == pos:
1508 return chunk, 0
1507 return chunk, 0
1509 elif ppos > pos:
1508 elif ppos > pos:
1510 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1509 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1511 raise ValueError(b'Unknown chunk')
1510 raise ValueError(b'Unknown chunk')
1512
1511
1513 def tell(self):
1512 def tell(self):
1514 return self._pos
1513 return self._pos
1515
1514
1516 def seek(self, offset, whence=os.SEEK_SET):
1515 def seek(self, offset, whence=os.SEEK_SET):
1517 if whence == os.SEEK_SET:
1516 if whence == os.SEEK_SET:
1518 newpos = offset
1517 newpos = offset
1519 elif whence == os.SEEK_CUR:
1518 elif whence == os.SEEK_CUR:
1520 newpos = self._pos + offset
1519 newpos = self._pos + offset
1521 elif whence == os.SEEK_END:
1520 elif whence == os.SEEK_END:
1522 if not self.consumed:
1521 if not self.consumed:
1523 # Can't use self.consume() here because it advances self._pos.
1522 # Can't use self.consume() here because it advances self._pos.
1524 chunk = self.read(32768)
1523 chunk = self.read(32768)
1525 while chunk:
1524 while chunk:
1526 chunk = self.read(32768)
1525 chunk = self.read(32768)
1527 newpos = self._chunkindex[-1][0] - offset
1526 newpos = self._chunkindex[-1][0] - offset
1528 else:
1527 else:
1529 raise ValueError(b'Unknown whence value: %r' % (whence,))
1528 raise ValueError(b'Unknown whence value: %r' % (whence,))
1530
1529
1531 if newpos > self._chunkindex[-1][0] and not self.consumed:
1530 if newpos > self._chunkindex[-1][0] and not self.consumed:
1532 # Can't use self.consume() here because it advances self._pos.
1531 # Can't use self.consume() here because it advances self._pos.
1533 chunk = self.read(32768)
1532 chunk = self.read(32768)
1534 while chunk:
1533 while chunk:
1535 chunk = self.read(32668)
1534 chunk = self.read(32668)
1536
1535
1537 if not 0 <= newpos <= self._chunkindex[-1][0]:
1536 if not 0 <= newpos <= self._chunkindex[-1][0]:
1538 raise ValueError(b'Offset out of range')
1537 raise ValueError(b'Offset out of range')
1539
1538
1540 if self._pos != newpos:
1539 if self._pos != newpos:
1541 chunk, internaloffset = self._findchunk(newpos)
1540 chunk, internaloffset = self._findchunk(newpos)
1542 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1541 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1543 adjust = self.read(internaloffset)
1542 adjust = self.read(internaloffset)
1544 if len(adjust) != internaloffset:
1543 if len(adjust) != internaloffset:
1545 raise error.Abort(_(b'Seek failed\n'))
1544 raise error.Abort(_(b'Seek failed\n'))
1546 self._pos = newpos
1545 self._pos = newpos
1547
1546
1548 def _seekfp(self, offset, whence=0):
1547 def _seekfp(self, offset, whence=0):
1549 """move the underlying file pointer
1548 """move the underlying file pointer
1550
1549
1551 This method is meant for internal usage by the bundle2 protocol only.
1550 This method is meant for internal usage by the bundle2 protocol only.
1552 They directly manipulate the low level stream including bundle2 level
1551 They directly manipulate the low level stream including bundle2 level
1553 instruction.
1552 instruction.
1554
1553
1555 Do not use it to implement higher-level logic or methods."""
1554 Do not use it to implement higher-level logic or methods."""
1556 if self._seekable:
1555 if self._seekable:
1557 return self._fp.seek(offset, whence)
1556 return self._fp.seek(offset, whence)
1558 else:
1557 else:
1559 raise NotImplementedError(_(b'File pointer is not seekable'))
1558 raise NotImplementedError(_(b'File pointer is not seekable'))
1560
1559
1561 def _tellfp(self):
1560 def _tellfp(self):
1562 """return the file offset, or None if file is not seekable
1561 """return the file offset, or None if file is not seekable
1563
1562
1564 This method is meant for internal usage by the bundle2 protocol only.
1563 This method is meant for internal usage by the bundle2 protocol only.
1565 They directly manipulate the low level stream including bundle2 level
1564 They directly manipulate the low level stream including bundle2 level
1566 instruction.
1565 instruction.
1567
1566
1568 Do not use it to implement higher-level logic or methods."""
1567 Do not use it to implement higher-level logic or methods."""
1569 if self._seekable:
1568 if self._seekable:
1570 try:
1569 try:
1571 return self._fp.tell()
1570 return self._fp.tell()
1572 except IOError as e:
1571 except IOError as e:
1573 if e.errno == errno.ESPIPE:
1572 if e.errno == errno.ESPIPE:
1574 self._seekable = False
1573 self._seekable = False
1575 else:
1574 else:
1576 raise
1575 raise
1577 return None
1576 return None
1578
1577
1579
1578
1580 # These are only the static capabilities.
1579 # These are only the static capabilities.
1581 # Check the 'getrepocaps' function for the rest.
1580 # Check the 'getrepocaps' function for the rest.
1582 capabilities = {
1581 capabilities = {
1583 b'HG20': (),
1582 b'HG20': (),
1584 b'bookmarks': (),
1583 b'bookmarks': (),
1585 b'error': (b'abort', b'unsupportedcontent', b'pushraced', b'pushkey'),
1584 b'error': (b'abort', b'unsupportedcontent', b'pushraced', b'pushkey'),
1586 b'listkeys': (),
1585 b'listkeys': (),
1587 b'pushkey': (),
1586 b'pushkey': (),
1588 b'digests': tuple(sorted(util.DIGESTS.keys())),
1587 b'digests': tuple(sorted(util.DIGESTS.keys())),
1589 b'remote-changegroup': (b'http', b'https'),
1588 b'remote-changegroup': (b'http', b'https'),
1590 b'hgtagsfnodes': (),
1589 b'hgtagsfnodes': (),
1591 b'rev-branch-cache': (),
1590 b'rev-branch-cache': (),
1592 b'phases': (b'heads',),
1591 b'phases': (b'heads',),
1593 b'stream': (b'v2',),
1592 b'stream': (b'v2',),
1594 }
1593 }
1595
1594
1596
1595
1597 def getrepocaps(repo, allowpushback=False, role=None):
1596 def getrepocaps(repo, allowpushback=False, role=None):
1598 """return the bundle2 capabilities for a given repo
1597 """return the bundle2 capabilities for a given repo
1599
1598
1600 Exists to allow extensions (like evolution) to mutate the capabilities.
1599 Exists to allow extensions (like evolution) to mutate the capabilities.
1601
1600
1602 The returned value is used for servers advertising their capabilities as
1601 The returned value is used for servers advertising their capabilities as
1603 well as clients advertising their capabilities to servers as part of
1602 well as clients advertising their capabilities to servers as part of
1604 bundle2 requests. The ``role`` argument specifies which is which.
1603 bundle2 requests. The ``role`` argument specifies which is which.
1605 """
1604 """
1606 if role not in (b'client', b'server'):
1605 if role not in (b'client', b'server'):
1607 raise error.ProgrammingError(b'role argument must be client or server')
1606 raise error.ProgrammingError(b'role argument must be client or server')
1608
1607
1609 caps = capabilities.copy()
1608 caps = capabilities.copy()
1610 caps[b'changegroup'] = tuple(
1609 caps[b'changegroup'] = tuple(
1611 sorted(changegroup.supportedincomingversions(repo))
1610 sorted(changegroup.supportedincomingversions(repo))
1612 )
1611 )
1613 if obsolete.isenabled(repo, obsolete.exchangeopt):
1612 if obsolete.isenabled(repo, obsolete.exchangeopt):
1614 supportedformat = tuple(b'V%i' % v for v in obsolete.formats)
1613 supportedformat = tuple(b'V%i' % v for v in obsolete.formats)
1615 caps[b'obsmarkers'] = supportedformat
1614 caps[b'obsmarkers'] = supportedformat
1616 if allowpushback:
1615 if allowpushback:
1617 caps[b'pushback'] = ()
1616 caps[b'pushback'] = ()
1618 cpmode = repo.ui.config(b'server', b'concurrent-push-mode')
1617 cpmode = repo.ui.config(b'server', b'concurrent-push-mode')
1619 if cpmode == b'check-related':
1618 if cpmode == b'check-related':
1620 caps[b'checkheads'] = (b'related',)
1619 caps[b'checkheads'] = (b'related',)
1621 if b'phases' in repo.ui.configlist(b'devel', b'legacy.exchange'):
1620 if b'phases' in repo.ui.configlist(b'devel', b'legacy.exchange'):
1622 caps.pop(b'phases')
1621 caps.pop(b'phases')
1623
1622
1624 # Don't advertise stream clone support in server mode if not configured.
1623 # Don't advertise stream clone support in server mode if not configured.
1625 if role == b'server':
1624 if role == b'server':
1626 streamsupported = repo.ui.configbool(
1625 streamsupported = repo.ui.configbool(
1627 b'server', b'uncompressed', untrusted=True
1626 b'server', b'uncompressed', untrusted=True
1628 )
1627 )
1629 featuresupported = repo.ui.configbool(b'server', b'bundle2.stream')
1628 featuresupported = repo.ui.configbool(b'server', b'bundle2.stream')
1630
1629
1631 if not streamsupported or not featuresupported:
1630 if not streamsupported or not featuresupported:
1632 caps.pop(b'stream')
1631 caps.pop(b'stream')
1633 # Else always advertise support on client, because payload support
1632 # Else always advertise support on client, because payload support
1634 # should always be advertised.
1633 # should always be advertised.
1635
1634
1636 return caps
1635 return caps
1637
1636
1638
1637
1639 def bundle2caps(remote):
1638 def bundle2caps(remote):
1640 """return the bundle capabilities of a peer as dict"""
1639 """return the bundle capabilities of a peer as dict"""
1641 raw = remote.capable(b'bundle2')
1640 raw = remote.capable(b'bundle2')
1642 if not raw and raw != b'':
1641 if not raw and raw != b'':
1643 return {}
1642 return {}
1644 capsblob = urlreq.unquote(remote.capable(b'bundle2'))
1643 capsblob = urlreq.unquote(remote.capable(b'bundle2'))
1645 return decodecaps(capsblob)
1644 return decodecaps(capsblob)
1646
1645
1647
1646
1648 def obsmarkersversion(caps):
1647 def obsmarkersversion(caps):
1649 """extract the list of supported obsmarkers versions from a bundle2caps dict
1648 """extract the list of supported obsmarkers versions from a bundle2caps dict
1650 """
1649 """
1651 obscaps = caps.get(b'obsmarkers', ())
1650 obscaps = caps.get(b'obsmarkers', ())
1652 return [int(c[1:]) for c in obscaps if c.startswith(b'V')]
1651 return [int(c[1:]) for c in obscaps if c.startswith(b'V')]
1653
1652
1654
1653
1655 def writenewbundle(
1654 def writenewbundle(
1656 ui,
1655 ui,
1657 repo,
1656 repo,
1658 source,
1657 source,
1659 filename,
1658 filename,
1660 bundletype,
1659 bundletype,
1661 outgoing,
1660 outgoing,
1662 opts,
1661 opts,
1663 vfs=None,
1662 vfs=None,
1664 compression=None,
1663 compression=None,
1665 compopts=None,
1664 compopts=None,
1666 ):
1665 ):
1667 if bundletype.startswith(b'HG10'):
1666 if bundletype.startswith(b'HG10'):
1668 cg = changegroup.makechangegroup(repo, outgoing, b'01', source)
1667 cg = changegroup.makechangegroup(repo, outgoing, b'01', source)
1669 return writebundle(
1668 return writebundle(
1670 ui,
1669 ui,
1671 cg,
1670 cg,
1672 filename,
1671 filename,
1673 bundletype,
1672 bundletype,
1674 vfs=vfs,
1673 vfs=vfs,
1675 compression=compression,
1674 compression=compression,
1676 compopts=compopts,
1675 compopts=compopts,
1677 )
1676 )
1678 elif not bundletype.startswith(b'HG20'):
1677 elif not bundletype.startswith(b'HG20'):
1679 raise error.ProgrammingError(b'unknown bundle type: %s' % bundletype)
1678 raise error.ProgrammingError(b'unknown bundle type: %s' % bundletype)
1680
1679
1681 caps = {}
1680 caps = {}
1682 if b'obsolescence' in opts:
1681 if b'obsolescence' in opts:
1683 caps[b'obsmarkers'] = (b'V1',)
1682 caps[b'obsmarkers'] = (b'V1',)
1684 bundle = bundle20(ui, caps)
1683 bundle = bundle20(ui, caps)
1685 bundle.setcompression(compression, compopts)
1684 bundle.setcompression(compression, compopts)
1686 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1685 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1687 chunkiter = bundle.getchunks()
1686 chunkiter = bundle.getchunks()
1688
1687
1689 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1688 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1690
1689
1691
1690
1692 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1691 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1693 # We should eventually reconcile this logic with the one behind
1692 # We should eventually reconcile this logic with the one behind
1694 # 'exchange.getbundle2partsgenerator'.
1693 # 'exchange.getbundle2partsgenerator'.
1695 #
1694 #
1696 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1695 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1697 # different right now. So we keep them separated for now for the sake of
1696 # different right now. So we keep them separated for now for the sake of
1698 # simplicity.
1697 # simplicity.
1699
1698
1700 # we might not always want a changegroup in such bundle, for example in
1699 # we might not always want a changegroup in such bundle, for example in
1701 # stream bundles
1700 # stream bundles
1702 if opts.get(b'changegroup', True):
1701 if opts.get(b'changegroup', True):
1703 cgversion = opts.get(b'cg.version')
1702 cgversion = opts.get(b'cg.version')
1704 if cgversion is None:
1703 if cgversion is None:
1705 cgversion = changegroup.safeversion(repo)
1704 cgversion = changegroup.safeversion(repo)
1706 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1705 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1707 part = bundler.newpart(b'changegroup', data=cg.getchunks())
1706 part = bundler.newpart(b'changegroup', data=cg.getchunks())
1708 part.addparam(b'version', cg.version)
1707 part.addparam(b'version', cg.version)
1709 if b'clcount' in cg.extras:
1708 if b'clcount' in cg.extras:
1710 part.addparam(
1709 part.addparam(
1711 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1710 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1712 )
1711 )
1713 if opts.get(b'phases') and repo.revs(
1712 if opts.get(b'phases') and repo.revs(
1714 b'%ln and secret()', outgoing.missingheads
1713 b'%ln and secret()', outgoing.missingheads
1715 ):
1714 ):
1716 part.addparam(
1715 part.addparam(
1717 b'targetphase', b'%d' % phases.secret, mandatory=False
1716 b'targetphase', b'%d' % phases.secret, mandatory=False
1718 )
1717 )
1719 if b'exp-sidedata-flag' in repo.requirements:
1718 if b'exp-sidedata-flag' in repo.requirements:
1720 part.addparam(b'exp-sidedata', b'1')
1719 part.addparam(b'exp-sidedata', b'1')
1721
1720
1722 if opts.get(b'streamv2', False):
1721 if opts.get(b'streamv2', False):
1723 addpartbundlestream2(bundler, repo, stream=True)
1722 addpartbundlestream2(bundler, repo, stream=True)
1724
1723
1725 if opts.get(b'tagsfnodescache', True):
1724 if opts.get(b'tagsfnodescache', True):
1726 addparttagsfnodescache(repo, bundler, outgoing)
1725 addparttagsfnodescache(repo, bundler, outgoing)
1727
1726
1728 if opts.get(b'revbranchcache', True):
1727 if opts.get(b'revbranchcache', True):
1729 addpartrevbranchcache(repo, bundler, outgoing)
1728 addpartrevbranchcache(repo, bundler, outgoing)
1730
1729
1731 if opts.get(b'obsolescence', False):
1730 if opts.get(b'obsolescence', False):
1732 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1731 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1733 buildobsmarkerspart(bundler, obsmarkers)
1732 buildobsmarkerspart(bundler, obsmarkers)
1734
1733
1735 if opts.get(b'phases', False):
1734 if opts.get(b'phases', False):
1736 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1735 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1737 phasedata = phases.binaryencode(headsbyphase)
1736 phasedata = phases.binaryencode(headsbyphase)
1738 bundler.newpart(b'phase-heads', data=phasedata)
1737 bundler.newpart(b'phase-heads', data=phasedata)
1739
1738
1740
1739
1741 def addparttagsfnodescache(repo, bundler, outgoing):
1740 def addparttagsfnodescache(repo, bundler, outgoing):
1742 # we include the tags fnode cache for the bundle changeset
1741 # we include the tags fnode cache for the bundle changeset
1743 # (as an optional parts)
1742 # (as an optional parts)
1744 cache = tags.hgtagsfnodescache(repo.unfiltered())
1743 cache = tags.hgtagsfnodescache(repo.unfiltered())
1745 chunks = []
1744 chunks = []
1746
1745
1747 # .hgtags fnodes are only relevant for head changesets. While we could
1746 # .hgtags fnodes are only relevant for head changesets. While we could
1748 # transfer values for all known nodes, there will likely be little to
1747 # transfer values for all known nodes, there will likely be little to
1749 # no benefit.
1748 # no benefit.
1750 #
1749 #
1751 # We don't bother using a generator to produce output data because
1750 # We don't bother using a generator to produce output data because
1752 # a) we only have 40 bytes per head and even esoteric numbers of heads
1751 # a) we only have 40 bytes per head and even esoteric numbers of heads
1753 # consume little memory (1M heads is 40MB) b) we don't want to send the
1752 # consume little memory (1M heads is 40MB) b) we don't want to send the
1754 # part if we don't have entries and knowing if we have entries requires
1753 # part if we don't have entries and knowing if we have entries requires
1755 # cache lookups.
1754 # cache lookups.
1756 for node in outgoing.missingheads:
1755 for node in outgoing.missingheads:
1757 # Don't compute missing, as this may slow down serving.
1756 # Don't compute missing, as this may slow down serving.
1758 fnode = cache.getfnode(node, computemissing=False)
1757 fnode = cache.getfnode(node, computemissing=False)
1759 if fnode is not None:
1758 if fnode is not None:
1760 chunks.extend([node, fnode])
1759 chunks.extend([node, fnode])
1761
1760
1762 if chunks:
1761 if chunks:
1763 bundler.newpart(b'hgtagsfnodes', data=b''.join(chunks))
1762 bundler.newpart(b'hgtagsfnodes', data=b''.join(chunks))
1764
1763
1765
1764
1766 def addpartrevbranchcache(repo, bundler, outgoing):
1765 def addpartrevbranchcache(repo, bundler, outgoing):
1767 # we include the rev branch cache for the bundle changeset
1766 # we include the rev branch cache for the bundle changeset
1768 # (as an optional parts)
1767 # (as an optional parts)
1769 cache = repo.revbranchcache()
1768 cache = repo.revbranchcache()
1770 cl = repo.unfiltered().changelog
1769 cl = repo.unfiltered().changelog
1771 branchesdata = collections.defaultdict(lambda: (set(), set()))
1770 branchesdata = collections.defaultdict(lambda: (set(), set()))
1772 for node in outgoing.missing:
1771 for node in outgoing.missing:
1773 branch, close = cache.branchinfo(cl.rev(node))
1772 branch, close = cache.branchinfo(cl.rev(node))
1774 branchesdata[branch][close].add(node)
1773 branchesdata[branch][close].add(node)
1775
1774
1776 def generate():
1775 def generate():
1777 for branch, (nodes, closed) in sorted(branchesdata.items()):
1776 for branch, (nodes, closed) in sorted(branchesdata.items()):
1778 utf8branch = encoding.fromlocal(branch)
1777 utf8branch = encoding.fromlocal(branch)
1779 yield rbcstruct.pack(len(utf8branch), len(nodes), len(closed))
1778 yield rbcstruct.pack(len(utf8branch), len(nodes), len(closed))
1780 yield utf8branch
1779 yield utf8branch
1781 for n in sorted(nodes):
1780 for n in sorted(nodes):
1782 yield n
1781 yield n
1783 for n in sorted(closed):
1782 for n in sorted(closed):
1784 yield n
1783 yield n
1785
1784
1786 bundler.newpart(b'cache:rev-branch-cache', data=generate(), mandatory=False)
1785 bundler.newpart(b'cache:rev-branch-cache', data=generate(), mandatory=False)
1787
1786
1788
1787
1789 def _formatrequirementsspec(requirements):
1788 def _formatrequirementsspec(requirements):
1790 requirements = [req for req in requirements if req != b"shared"]
1789 requirements = [req for req in requirements if req != b"shared"]
1791 return urlreq.quote(b','.join(sorted(requirements)))
1790 return urlreq.quote(b','.join(sorted(requirements)))
1792
1791
1793
1792
1794 def _formatrequirementsparams(requirements):
1793 def _formatrequirementsparams(requirements):
1795 requirements = _formatrequirementsspec(requirements)
1794 requirements = _formatrequirementsspec(requirements)
1796 params = b"%s%s" % (urlreq.quote(b"requirements="), requirements)
1795 params = b"%s%s" % (urlreq.quote(b"requirements="), requirements)
1797 return params
1796 return params
1798
1797
1799
1798
1800 def addpartbundlestream2(bundler, repo, **kwargs):
1799 def addpartbundlestream2(bundler, repo, **kwargs):
1801 if not kwargs.get('stream', False):
1800 if not kwargs.get('stream', False):
1802 return
1801 return
1803
1802
1804 if not streamclone.allowservergeneration(repo):
1803 if not streamclone.allowservergeneration(repo):
1805 raise error.Abort(
1804 raise error.Abort(
1806 _(
1805 _(
1807 b'stream data requested but server does not allow '
1806 b'stream data requested but server does not allow '
1808 b'this feature'
1807 b'this feature'
1809 ),
1808 ),
1810 hint=_(
1809 hint=_(
1811 b'well-behaved clients should not be '
1810 b'well-behaved clients should not be '
1812 b'requesting stream data from servers not '
1811 b'requesting stream data from servers not '
1813 b'advertising it; the client may be buggy'
1812 b'advertising it; the client may be buggy'
1814 ),
1813 ),
1815 )
1814 )
1816
1815
1817 # Stream clones don't compress well. And compression undermines a
1816 # Stream clones don't compress well. And compression undermines a
1818 # goal of stream clones, which is to be fast. Communicate the desire
1817 # goal of stream clones, which is to be fast. Communicate the desire
1819 # to avoid compression to consumers of the bundle.
1818 # to avoid compression to consumers of the bundle.
1820 bundler.prefercompressed = False
1819 bundler.prefercompressed = False
1821
1820
1822 # get the includes and excludes
1821 # get the includes and excludes
1823 includepats = kwargs.get('includepats')
1822 includepats = kwargs.get('includepats')
1824 excludepats = kwargs.get('excludepats')
1823 excludepats = kwargs.get('excludepats')
1825
1824
1826 narrowstream = repo.ui.configbool(
1825 narrowstream = repo.ui.configbool(
1827 b'experimental', b'server.stream-narrow-clones'
1826 b'experimental', b'server.stream-narrow-clones'
1828 )
1827 )
1829
1828
1830 if (includepats or excludepats) and not narrowstream:
1829 if (includepats or excludepats) and not narrowstream:
1831 raise error.Abort(_(b'server does not support narrow stream clones'))
1830 raise error.Abort(_(b'server does not support narrow stream clones'))
1832
1831
1833 includeobsmarkers = False
1832 includeobsmarkers = False
1834 if repo.obsstore:
1833 if repo.obsstore:
1835 remoteversions = obsmarkersversion(bundler.capabilities)
1834 remoteversions = obsmarkersversion(bundler.capabilities)
1836 if not remoteversions:
1835 if not remoteversions:
1837 raise error.Abort(
1836 raise error.Abort(
1838 _(
1837 _(
1839 b'server has obsolescence markers, but client '
1838 b'server has obsolescence markers, but client '
1840 b'cannot receive them via stream clone'
1839 b'cannot receive them via stream clone'
1841 )
1840 )
1842 )
1841 )
1843 elif repo.obsstore._version in remoteversions:
1842 elif repo.obsstore._version in remoteversions:
1844 includeobsmarkers = True
1843 includeobsmarkers = True
1845
1844
1846 filecount, bytecount, it = streamclone.generatev2(
1845 filecount, bytecount, it = streamclone.generatev2(
1847 repo, includepats, excludepats, includeobsmarkers
1846 repo, includepats, excludepats, includeobsmarkers
1848 )
1847 )
1849 requirements = _formatrequirementsspec(repo.requirements)
1848 requirements = _formatrequirementsspec(repo.requirements)
1850 part = bundler.newpart(b'stream2', data=it)
1849 part = bundler.newpart(b'stream2', data=it)
1851 part.addparam(b'bytecount', b'%d' % bytecount, mandatory=True)
1850 part.addparam(b'bytecount', b'%d' % bytecount, mandatory=True)
1852 part.addparam(b'filecount', b'%d' % filecount, mandatory=True)
1851 part.addparam(b'filecount', b'%d' % filecount, mandatory=True)
1853 part.addparam(b'requirements', requirements, mandatory=True)
1852 part.addparam(b'requirements', requirements, mandatory=True)
1854
1853
1855
1854
1856 def buildobsmarkerspart(bundler, markers):
1855 def buildobsmarkerspart(bundler, markers):
1857 """add an obsmarker part to the bundler with <markers>
1856 """add an obsmarker part to the bundler with <markers>
1858
1857
1859 No part is created if markers is empty.
1858 No part is created if markers is empty.
1860 Raises ValueError if the bundler doesn't support any known obsmarker format.
1859 Raises ValueError if the bundler doesn't support any known obsmarker format.
1861 """
1860 """
1862 if not markers:
1861 if not markers:
1863 return None
1862 return None
1864
1863
1865 remoteversions = obsmarkersversion(bundler.capabilities)
1864 remoteversions = obsmarkersversion(bundler.capabilities)
1866 version = obsolete.commonversion(remoteversions)
1865 version = obsolete.commonversion(remoteversions)
1867 if version is None:
1866 if version is None:
1868 raise ValueError(b'bundler does not support common obsmarker format')
1867 raise ValueError(b'bundler does not support common obsmarker format')
1869 stream = obsolete.encodemarkers(markers, True, version=version)
1868 stream = obsolete.encodemarkers(markers, True, version=version)
1870 return bundler.newpart(b'obsmarkers', data=stream)
1869 return bundler.newpart(b'obsmarkers', data=stream)
1871
1870
1872
1871
1873 def writebundle(
1872 def writebundle(
1874 ui, cg, filename, bundletype, vfs=None, compression=None, compopts=None
1873 ui, cg, filename, bundletype, vfs=None, compression=None, compopts=None
1875 ):
1874 ):
1876 """Write a bundle file and return its filename.
1875 """Write a bundle file and return its filename.
1877
1876
1878 Existing files will not be overwritten.
1877 Existing files will not be overwritten.
1879 If no filename is specified, a temporary file is created.
1878 If no filename is specified, a temporary file is created.
1880 bz2 compression can be turned off.
1879 bz2 compression can be turned off.
1881 The bundle file will be deleted in case of errors.
1880 The bundle file will be deleted in case of errors.
1882 """
1881 """
1883
1882
1884 if bundletype == b"HG20":
1883 if bundletype == b"HG20":
1885 bundle = bundle20(ui)
1884 bundle = bundle20(ui)
1886 bundle.setcompression(compression, compopts)
1885 bundle.setcompression(compression, compopts)
1887 part = bundle.newpart(b'changegroup', data=cg.getchunks())
1886 part = bundle.newpart(b'changegroup', data=cg.getchunks())
1888 part.addparam(b'version', cg.version)
1887 part.addparam(b'version', cg.version)
1889 if b'clcount' in cg.extras:
1888 if b'clcount' in cg.extras:
1890 part.addparam(
1889 part.addparam(
1891 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1890 b'nbchanges', b'%d' % cg.extras[b'clcount'], mandatory=False
1892 )
1891 )
1893 chunkiter = bundle.getchunks()
1892 chunkiter = bundle.getchunks()
1894 else:
1893 else:
1895 # compression argument is only for the bundle2 case
1894 # compression argument is only for the bundle2 case
1896 assert compression is None
1895 assert compression is None
1897 if cg.version != b'01':
1896 if cg.version != b'01':
1898 raise error.Abort(
1897 raise error.Abort(
1899 _(b'old bundle types only supports v1 changegroups')
1898 _(b'old bundle types only supports v1 changegroups')
1900 )
1899 )
1901 header, comp = bundletypes[bundletype]
1900 header, comp = bundletypes[bundletype]
1902 if comp not in util.compengines.supportedbundletypes:
1901 if comp not in util.compengines.supportedbundletypes:
1903 raise error.Abort(_(b'unknown stream compression type: %s') % comp)
1902 raise error.Abort(_(b'unknown stream compression type: %s') % comp)
1904 compengine = util.compengines.forbundletype(comp)
1903 compengine = util.compengines.forbundletype(comp)
1905
1904
1906 def chunkiter():
1905 def chunkiter():
1907 yield header
1906 yield header
1908 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1907 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1909 yield chunk
1908 yield chunk
1910
1909
1911 chunkiter = chunkiter()
1910 chunkiter = chunkiter()
1912
1911
1913 # parse the changegroup data, otherwise we will block
1912 # parse the changegroup data, otherwise we will block
1914 # in case of sshrepo because we don't know the end of the stream
1913 # in case of sshrepo because we don't know the end of the stream
1915 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1914 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1916
1915
1917
1916
1918 def combinechangegroupresults(op):
1917 def combinechangegroupresults(op):
1919 """logic to combine 0 or more addchangegroup results into one"""
1918 """logic to combine 0 or more addchangegroup results into one"""
1920 results = [r.get(b'return', 0) for r in op.records[b'changegroup']]
1919 results = [r.get(b'return', 0) for r in op.records[b'changegroup']]
1921 changedheads = 0
1920 changedheads = 0
1922 result = 1
1921 result = 1
1923 for ret in results:
1922 for ret in results:
1924 # If any changegroup result is 0, return 0
1923 # If any changegroup result is 0, return 0
1925 if ret == 0:
1924 if ret == 0:
1926 result = 0
1925 result = 0
1927 break
1926 break
1928 if ret < -1:
1927 if ret < -1:
1929 changedheads += ret + 1
1928 changedheads += ret + 1
1930 elif ret > 1:
1929 elif ret > 1:
1931 changedheads += ret - 1
1930 changedheads += ret - 1
1932 if changedheads > 0:
1931 if changedheads > 0:
1933 result = 1 + changedheads
1932 result = 1 + changedheads
1934 elif changedheads < 0:
1933 elif changedheads < 0:
1935 result = -1 + changedheads
1934 result = -1 + changedheads
1936 return result
1935 return result
1937
1936
1938
1937
1939 @parthandler(
1938 @parthandler(
1940 b'changegroup',
1939 b'changegroup',
1941 (
1940 (
1942 b'version',
1941 b'version',
1943 b'nbchanges',
1942 b'nbchanges',
1944 b'exp-sidedata',
1943 b'exp-sidedata',
1945 b'treemanifest',
1944 b'treemanifest',
1946 b'targetphase',
1945 b'targetphase',
1947 ),
1946 ),
1948 )
1947 )
1949 def handlechangegroup(op, inpart):
1948 def handlechangegroup(op, inpart):
1950 """apply a changegroup part on the repo
1949 """apply a changegroup part on the repo
1951
1950
1952 This is a very early implementation that will massive rework before being
1951 This is a very early implementation that will massive rework before being
1953 inflicted to any end-user.
1952 inflicted to any end-user.
1954 """
1953 """
1955 from . import localrepo
1954 from . import localrepo
1956
1955
1957 tr = op.gettransaction()
1956 tr = op.gettransaction()
1958 unpackerversion = inpart.params.get(b'version', b'01')
1957 unpackerversion = inpart.params.get(b'version', b'01')
1959 # We should raise an appropriate exception here
1958 # We should raise an appropriate exception here
1960 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1959 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1961 # the source and url passed here are overwritten by the one contained in
1960 # the source and url passed here are overwritten by the one contained in
1962 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1961 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1963 nbchangesets = None
1962 nbchangesets = None
1964 if b'nbchanges' in inpart.params:
1963 if b'nbchanges' in inpart.params:
1965 nbchangesets = int(inpart.params.get(b'nbchanges'))
1964 nbchangesets = int(inpart.params.get(b'nbchanges'))
1966 if (
1965 if (
1967 b'treemanifest' in inpart.params
1966 b'treemanifest' in inpart.params
1968 and b'treemanifest' not in op.repo.requirements
1967 and b'treemanifest' not in op.repo.requirements
1969 ):
1968 ):
1970 if len(op.repo.changelog) != 0:
1969 if len(op.repo.changelog) != 0:
1971 raise error.Abort(
1970 raise error.Abort(
1972 _(
1971 _(
1973 b"bundle contains tree manifests, but local repo is "
1972 b"bundle contains tree manifests, but local repo is "
1974 b"non-empty and does not use tree manifests"
1973 b"non-empty and does not use tree manifests"
1975 )
1974 )
1976 )
1975 )
1977 op.repo.requirements.add(b'treemanifest')
1976 op.repo.requirements.add(b'treemanifest')
1978 op.repo.svfs.options = localrepo.resolvestorevfsoptions(
1977 op.repo.svfs.options = localrepo.resolvestorevfsoptions(
1979 op.repo.ui, op.repo.requirements, op.repo.features
1978 op.repo.ui, op.repo.requirements, op.repo.features
1980 )
1979 )
1981 op.repo._writerequirements()
1980 op.repo._writerequirements()
1982
1981
1983 bundlesidedata = bool(b'exp-sidedata' in inpart.params)
1982 bundlesidedata = bool(b'exp-sidedata' in inpart.params)
1984 reposidedata = bool(b'exp-sidedata-flag' in op.repo.requirements)
1983 reposidedata = bool(b'exp-sidedata-flag' in op.repo.requirements)
1985 if reposidedata and not bundlesidedata:
1984 if reposidedata and not bundlesidedata:
1986 msg = b"repository is using sidedata but the bundle source do not"
1985 msg = b"repository is using sidedata but the bundle source do not"
1987 hint = b'this is currently unsupported'
1986 hint = b'this is currently unsupported'
1988 raise error.Abort(msg, hint=hint)
1987 raise error.Abort(msg, hint=hint)
1989
1988
1990 extrakwargs = {}
1989 extrakwargs = {}
1991 targetphase = inpart.params.get(b'targetphase')
1990 targetphase = inpart.params.get(b'targetphase')
1992 if targetphase is not None:
1991 if targetphase is not None:
1993 extrakwargs['targetphase'] = int(targetphase)
1992 extrakwargs['targetphase'] = int(targetphase)
1994 ret = _processchangegroup(
1993 ret = _processchangegroup(
1995 op,
1994 op,
1996 cg,
1995 cg,
1997 tr,
1996 tr,
1998 b'bundle2',
1997 b'bundle2',
1999 b'bundle2',
1998 b'bundle2',
2000 expectedtotal=nbchangesets,
1999 expectedtotal=nbchangesets,
2001 **extrakwargs
2000 **extrakwargs
2002 )
2001 )
2003 if op.reply is not None:
2002 if op.reply is not None:
2004 # This is definitely not the final form of this
2003 # This is definitely not the final form of this
2005 # return. But one need to start somewhere.
2004 # return. But one need to start somewhere.
2006 part = op.reply.newpart(b'reply:changegroup', mandatory=False)
2005 part = op.reply.newpart(b'reply:changegroup', mandatory=False)
2007 part.addparam(
2006 part.addparam(
2008 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2007 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2009 )
2008 )
2010 part.addparam(b'return', b'%i' % ret, mandatory=False)
2009 part.addparam(b'return', b'%i' % ret, mandatory=False)
2011 assert not inpart.read()
2010 assert not inpart.read()
2012
2011
2013
2012
2014 _remotechangegroupparams = tuple(
2013 _remotechangegroupparams = tuple(
2015 [b'url', b'size', b'digests']
2014 [b'url', b'size', b'digests']
2016 + [b'digest:%s' % k for k in util.DIGESTS.keys()]
2015 + [b'digest:%s' % k for k in util.DIGESTS.keys()]
2017 )
2016 )
2018
2017
2019
2018
2020 @parthandler(b'remote-changegroup', _remotechangegroupparams)
2019 @parthandler(b'remote-changegroup', _remotechangegroupparams)
2021 def handleremotechangegroup(op, inpart):
2020 def handleremotechangegroup(op, inpart):
2022 """apply a bundle10 on the repo, given an url and validation information
2021 """apply a bundle10 on the repo, given an url and validation information
2023
2022
2024 All the information about the remote bundle to import are given as
2023 All the information about the remote bundle to import are given as
2025 parameters. The parameters include:
2024 parameters. The parameters include:
2026 - url: the url to the bundle10.
2025 - url: the url to the bundle10.
2027 - size: the bundle10 file size. It is used to validate what was
2026 - size: the bundle10 file size. It is used to validate what was
2028 retrieved by the client matches the server knowledge about the bundle.
2027 retrieved by the client matches the server knowledge about the bundle.
2029 - digests: a space separated list of the digest types provided as
2028 - digests: a space separated list of the digest types provided as
2030 parameters.
2029 parameters.
2031 - digest:<digest-type>: the hexadecimal representation of the digest with
2030 - digest:<digest-type>: the hexadecimal representation of the digest with
2032 that name. Like the size, it is used to validate what was retrieved by
2031 that name. Like the size, it is used to validate what was retrieved by
2033 the client matches what the server knows about the bundle.
2032 the client matches what the server knows about the bundle.
2034
2033
2035 When multiple digest types are given, all of them are checked.
2034 When multiple digest types are given, all of them are checked.
2036 """
2035 """
2037 try:
2036 try:
2038 raw_url = inpart.params[b'url']
2037 raw_url = inpart.params[b'url']
2039 except KeyError:
2038 except KeyError:
2040 raise error.Abort(_(b'remote-changegroup: missing "%s" param') % b'url')
2039 raise error.Abort(_(b'remote-changegroup: missing "%s" param') % b'url')
2041 parsed_url = util.url(raw_url)
2040 parsed_url = util.url(raw_url)
2042 if parsed_url.scheme not in capabilities[b'remote-changegroup']:
2041 if parsed_url.scheme not in capabilities[b'remote-changegroup']:
2043 raise error.Abort(
2042 raise error.Abort(
2044 _(b'remote-changegroup does not support %s urls')
2043 _(b'remote-changegroup does not support %s urls')
2045 % parsed_url.scheme
2044 % parsed_url.scheme
2046 )
2045 )
2047
2046
2048 try:
2047 try:
2049 size = int(inpart.params[b'size'])
2048 size = int(inpart.params[b'size'])
2050 except ValueError:
2049 except ValueError:
2051 raise error.Abort(
2050 raise error.Abort(
2052 _(b'remote-changegroup: invalid value for param "%s"') % b'size'
2051 _(b'remote-changegroup: invalid value for param "%s"') % b'size'
2053 )
2052 )
2054 except KeyError:
2053 except KeyError:
2055 raise error.Abort(
2054 raise error.Abort(
2056 _(b'remote-changegroup: missing "%s" param') % b'size'
2055 _(b'remote-changegroup: missing "%s" param') % b'size'
2057 )
2056 )
2058
2057
2059 digests = {}
2058 digests = {}
2060 for typ in inpart.params.get(b'digests', b'').split():
2059 for typ in inpart.params.get(b'digests', b'').split():
2061 param = b'digest:%s' % typ
2060 param = b'digest:%s' % typ
2062 try:
2061 try:
2063 value = inpart.params[param]
2062 value = inpart.params[param]
2064 except KeyError:
2063 except KeyError:
2065 raise error.Abort(
2064 raise error.Abort(
2066 _(b'remote-changegroup: missing "%s" param') % param
2065 _(b'remote-changegroup: missing "%s" param') % param
2067 )
2066 )
2068 digests[typ] = value
2067 digests[typ] = value
2069
2068
2070 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
2069 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
2071
2070
2072 tr = op.gettransaction()
2071 tr = op.gettransaction()
2073 from . import exchange
2072 from . import exchange
2074
2073
2075 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
2074 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
2076 if not isinstance(cg, changegroup.cg1unpacker):
2075 if not isinstance(cg, changegroup.cg1unpacker):
2077 raise error.Abort(
2076 raise error.Abort(
2078 _(b'%s: not a bundle version 1.0') % util.hidepassword(raw_url)
2077 _(b'%s: not a bundle version 1.0') % util.hidepassword(raw_url)
2079 )
2078 )
2080 ret = _processchangegroup(op, cg, tr, b'bundle2', b'bundle2')
2079 ret = _processchangegroup(op, cg, tr, b'bundle2', b'bundle2')
2081 if op.reply is not None:
2080 if op.reply is not None:
2082 # This is definitely not the final form of this
2081 # This is definitely not the final form of this
2083 # return. But one need to start somewhere.
2082 # return. But one need to start somewhere.
2084 part = op.reply.newpart(b'reply:changegroup')
2083 part = op.reply.newpart(b'reply:changegroup')
2085 part.addparam(
2084 part.addparam(
2086 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2085 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2087 )
2086 )
2088 part.addparam(b'return', b'%i' % ret, mandatory=False)
2087 part.addparam(b'return', b'%i' % ret, mandatory=False)
2089 try:
2088 try:
2090 real_part.validate()
2089 real_part.validate()
2091 except error.Abort as e:
2090 except error.Abort as e:
2092 raise error.Abort(
2091 raise error.Abort(
2093 _(b'bundle at %s is corrupted:\n%s')
2092 _(b'bundle at %s is corrupted:\n%s')
2094 % (util.hidepassword(raw_url), bytes(e))
2093 % (util.hidepassword(raw_url), bytes(e))
2095 )
2094 )
2096 assert not inpart.read()
2095 assert not inpart.read()
2097
2096
2098
2097
2099 @parthandler(b'reply:changegroup', (b'return', b'in-reply-to'))
2098 @parthandler(b'reply:changegroup', (b'return', b'in-reply-to'))
2100 def handlereplychangegroup(op, inpart):
2099 def handlereplychangegroup(op, inpart):
2101 ret = int(inpart.params[b'return'])
2100 ret = int(inpart.params[b'return'])
2102 replyto = int(inpart.params[b'in-reply-to'])
2101 replyto = int(inpart.params[b'in-reply-to'])
2103 op.records.add(b'changegroup', {b'return': ret}, replyto)
2102 op.records.add(b'changegroup', {b'return': ret}, replyto)
2104
2103
2105
2104
2106 @parthandler(b'check:bookmarks')
2105 @parthandler(b'check:bookmarks')
2107 def handlecheckbookmarks(op, inpart):
2106 def handlecheckbookmarks(op, inpart):
2108 """check location of bookmarks
2107 """check location of bookmarks
2109
2108
2110 This part is to be used to detect push race regarding bookmark, it
2109 This part is to be used to detect push race regarding bookmark, it
2111 contains binary encoded (bookmark, node) tuple. If the local state does
2110 contains binary encoded (bookmark, node) tuple. If the local state does
2112 not marks the one in the part, a PushRaced exception is raised
2111 not marks the one in the part, a PushRaced exception is raised
2113 """
2112 """
2114 bookdata = bookmarks.binarydecode(inpart)
2113 bookdata = bookmarks.binarydecode(inpart)
2115
2114
2116 msgstandard = (
2115 msgstandard = (
2117 b'remote repository changed while pushing - please try again '
2116 b'remote repository changed while pushing - please try again '
2118 b'(bookmark "%s" move from %s to %s)'
2117 b'(bookmark "%s" move from %s to %s)'
2119 )
2118 )
2120 msgmissing = (
2119 msgmissing = (
2121 b'remote repository changed while pushing - please try again '
2120 b'remote repository changed while pushing - please try again '
2122 b'(bookmark "%s" is missing, expected %s)'
2121 b'(bookmark "%s" is missing, expected %s)'
2123 )
2122 )
2124 msgexist = (
2123 msgexist = (
2125 b'remote repository changed while pushing - please try again '
2124 b'remote repository changed while pushing - please try again '
2126 b'(bookmark "%s" set on %s, expected missing)'
2125 b'(bookmark "%s" set on %s, expected missing)'
2127 )
2126 )
2128 for book, node in bookdata:
2127 for book, node in bookdata:
2129 currentnode = op.repo._bookmarks.get(book)
2128 currentnode = op.repo._bookmarks.get(book)
2130 if currentnode != node:
2129 if currentnode != node:
2131 if node is None:
2130 if node is None:
2132 finalmsg = msgexist % (book, nodemod.short(currentnode))
2131 finalmsg = msgexist % (book, nodemod.short(currentnode))
2133 elif currentnode is None:
2132 elif currentnode is None:
2134 finalmsg = msgmissing % (book, nodemod.short(node))
2133 finalmsg = msgmissing % (book, nodemod.short(node))
2135 else:
2134 else:
2136 finalmsg = msgstandard % (
2135 finalmsg = msgstandard % (
2137 book,
2136 book,
2138 nodemod.short(node),
2137 nodemod.short(node),
2139 nodemod.short(currentnode),
2138 nodemod.short(currentnode),
2140 )
2139 )
2141 raise error.PushRaced(finalmsg)
2140 raise error.PushRaced(finalmsg)
2142
2141
2143
2142
2144 @parthandler(b'check:heads')
2143 @parthandler(b'check:heads')
2145 def handlecheckheads(op, inpart):
2144 def handlecheckheads(op, inpart):
2146 """check that head of the repo did not change
2145 """check that head of the repo did not change
2147
2146
2148 This is used to detect a push race when using unbundle.
2147 This is used to detect a push race when using unbundle.
2149 This replaces the "heads" argument of unbundle."""
2148 This replaces the "heads" argument of unbundle."""
2150 h = inpart.read(20)
2149 h = inpart.read(20)
2151 heads = []
2150 heads = []
2152 while len(h) == 20:
2151 while len(h) == 20:
2153 heads.append(h)
2152 heads.append(h)
2154 h = inpart.read(20)
2153 h = inpart.read(20)
2155 assert not h
2154 assert not h
2156 # Trigger a transaction so that we are guaranteed to have the lock now.
2155 # Trigger a transaction so that we are guaranteed to have the lock now.
2157 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2156 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2158 op.gettransaction()
2157 op.gettransaction()
2159 if sorted(heads) != sorted(op.repo.heads()):
2158 if sorted(heads) != sorted(op.repo.heads()):
2160 raise error.PushRaced(
2159 raise error.PushRaced(
2161 b'remote repository changed while pushing - please try again'
2160 b'remote repository changed while pushing - please try again'
2162 )
2161 )
2163
2162
2164
2163
2165 @parthandler(b'check:updated-heads')
2164 @parthandler(b'check:updated-heads')
2166 def handlecheckupdatedheads(op, inpart):
2165 def handlecheckupdatedheads(op, inpart):
2167 """check for race on the heads touched by a push
2166 """check for race on the heads touched by a push
2168
2167
2169 This is similar to 'check:heads' but focus on the heads actually updated
2168 This is similar to 'check:heads' but focus on the heads actually updated
2170 during the push. If other activities happen on unrelated heads, it is
2169 during the push. If other activities happen on unrelated heads, it is
2171 ignored.
2170 ignored.
2172
2171
2173 This allow server with high traffic to avoid push contention as long as
2172 This allow server with high traffic to avoid push contention as long as
2174 unrelated parts of the graph are involved."""
2173 unrelated parts of the graph are involved."""
2175 h = inpart.read(20)
2174 h = inpart.read(20)
2176 heads = []
2175 heads = []
2177 while len(h) == 20:
2176 while len(h) == 20:
2178 heads.append(h)
2177 heads.append(h)
2179 h = inpart.read(20)
2178 h = inpart.read(20)
2180 assert not h
2179 assert not h
2181 # trigger a transaction so that we are guaranteed to have the lock now.
2180 # trigger a transaction so that we are guaranteed to have the lock now.
2182 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2181 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2183 op.gettransaction()
2182 op.gettransaction()
2184
2183
2185 currentheads = set()
2184 currentheads = set()
2186 for ls in op.repo.branchmap().iterheads():
2185 for ls in op.repo.branchmap().iterheads():
2187 currentheads.update(ls)
2186 currentheads.update(ls)
2188
2187
2189 for h in heads:
2188 for h in heads:
2190 if h not in currentheads:
2189 if h not in currentheads:
2191 raise error.PushRaced(
2190 raise error.PushRaced(
2192 b'remote repository changed while pushing - '
2191 b'remote repository changed while pushing - '
2193 b'please try again'
2192 b'please try again'
2194 )
2193 )
2195
2194
2196
2195
2197 @parthandler(b'check:phases')
2196 @parthandler(b'check:phases')
2198 def handlecheckphases(op, inpart):
2197 def handlecheckphases(op, inpart):
2199 """check that phase boundaries of the repository did not change
2198 """check that phase boundaries of the repository did not change
2200
2199
2201 This is used to detect a push race.
2200 This is used to detect a push race.
2202 """
2201 """
2203 phasetonodes = phases.binarydecode(inpart)
2202 phasetonodes = phases.binarydecode(inpart)
2204 unfi = op.repo.unfiltered()
2203 unfi = op.repo.unfiltered()
2205 cl = unfi.changelog
2204 cl = unfi.changelog
2206 phasecache = unfi._phasecache
2205 phasecache = unfi._phasecache
2207 msg = (
2206 msg = (
2208 b'remote repository changed while pushing - please try again '
2207 b'remote repository changed while pushing - please try again '
2209 b'(%s is %s expected %s)'
2208 b'(%s is %s expected %s)'
2210 )
2209 )
2211 for expectedphase, nodes in enumerate(phasetonodes):
2210 for expectedphase, nodes in enumerate(phasetonodes):
2212 for n in nodes:
2211 for n in nodes:
2213 actualphase = phasecache.phase(unfi, cl.rev(n))
2212 actualphase = phasecache.phase(unfi, cl.rev(n))
2214 if actualphase != expectedphase:
2213 if actualphase != expectedphase:
2215 finalmsg = msg % (
2214 finalmsg = msg % (
2216 nodemod.short(n),
2215 nodemod.short(n),
2217 phases.phasenames[actualphase],
2216 phases.phasenames[actualphase],
2218 phases.phasenames[expectedphase],
2217 phases.phasenames[expectedphase],
2219 )
2218 )
2220 raise error.PushRaced(finalmsg)
2219 raise error.PushRaced(finalmsg)
2221
2220
2222
2221
2223 @parthandler(b'output')
2222 @parthandler(b'output')
2224 def handleoutput(op, inpart):
2223 def handleoutput(op, inpart):
2225 """forward output captured on the server to the client"""
2224 """forward output captured on the server to the client"""
2226 for line in inpart.read().splitlines():
2225 for line in inpart.read().splitlines():
2227 op.ui.status(_(b'remote: %s\n') % line)
2226 op.ui.status(_(b'remote: %s\n') % line)
2228
2227
2229
2228
2230 @parthandler(b'replycaps')
2229 @parthandler(b'replycaps')
2231 def handlereplycaps(op, inpart):
2230 def handlereplycaps(op, inpart):
2232 """Notify that a reply bundle should be created
2231 """Notify that a reply bundle should be created
2233
2232
2234 The payload contains the capabilities information for the reply"""
2233 The payload contains the capabilities information for the reply"""
2235 caps = decodecaps(inpart.read())
2234 caps = decodecaps(inpart.read())
2236 if op.reply is None:
2235 if op.reply is None:
2237 op.reply = bundle20(op.ui, caps)
2236 op.reply = bundle20(op.ui, caps)
2238
2237
2239
2238
2240 class AbortFromPart(error.Abort):
2239 class AbortFromPart(error.Abort):
2241 """Sub-class of Abort that denotes an error from a bundle2 part."""
2240 """Sub-class of Abort that denotes an error from a bundle2 part."""
2242
2241
2243
2242
2244 @parthandler(b'error:abort', (b'message', b'hint'))
2243 @parthandler(b'error:abort', (b'message', b'hint'))
2245 def handleerrorabort(op, inpart):
2244 def handleerrorabort(op, inpart):
2246 """Used to transmit abort error over the wire"""
2245 """Used to transmit abort error over the wire"""
2247 raise AbortFromPart(
2246 raise AbortFromPart(
2248 inpart.params[b'message'], hint=inpart.params.get(b'hint')
2247 inpart.params[b'message'], hint=inpart.params.get(b'hint')
2249 )
2248 )
2250
2249
2251
2250
2252 @parthandler(
2251 @parthandler(
2253 b'error:pushkey',
2252 b'error:pushkey',
2254 (b'namespace', b'key', b'new', b'old', b'ret', b'in-reply-to'),
2253 (b'namespace', b'key', b'new', b'old', b'ret', b'in-reply-to'),
2255 )
2254 )
2256 def handleerrorpushkey(op, inpart):
2255 def handleerrorpushkey(op, inpart):
2257 """Used to transmit failure of a mandatory pushkey over the wire"""
2256 """Used to transmit failure of a mandatory pushkey over the wire"""
2258 kwargs = {}
2257 kwargs = {}
2259 for name in (b'namespace', b'key', b'new', b'old', b'ret'):
2258 for name in (b'namespace', b'key', b'new', b'old', b'ret'):
2260 value = inpart.params.get(name)
2259 value = inpart.params.get(name)
2261 if value is not None:
2260 if value is not None:
2262 kwargs[name] = value
2261 kwargs[name] = value
2263 raise error.PushkeyFailed(
2262 raise error.PushkeyFailed(
2264 inpart.params[b'in-reply-to'], **pycompat.strkwargs(kwargs)
2263 inpart.params[b'in-reply-to'], **pycompat.strkwargs(kwargs)
2265 )
2264 )
2266
2265
2267
2266
2268 @parthandler(b'error:unsupportedcontent', (b'parttype', b'params'))
2267 @parthandler(b'error:unsupportedcontent', (b'parttype', b'params'))
2269 def handleerrorunsupportedcontent(op, inpart):
2268 def handleerrorunsupportedcontent(op, inpart):
2270 """Used to transmit unknown content error over the wire"""
2269 """Used to transmit unknown content error over the wire"""
2271 kwargs = {}
2270 kwargs = {}
2272 parttype = inpart.params.get(b'parttype')
2271 parttype = inpart.params.get(b'parttype')
2273 if parttype is not None:
2272 if parttype is not None:
2274 kwargs[b'parttype'] = parttype
2273 kwargs[b'parttype'] = parttype
2275 params = inpart.params.get(b'params')
2274 params = inpart.params.get(b'params')
2276 if params is not None:
2275 if params is not None:
2277 kwargs[b'params'] = params.split(b'\0')
2276 kwargs[b'params'] = params.split(b'\0')
2278
2277
2279 raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
2278 raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
2280
2279
2281
2280
2282 @parthandler(b'error:pushraced', (b'message',))
2281 @parthandler(b'error:pushraced', (b'message',))
2283 def handleerrorpushraced(op, inpart):
2282 def handleerrorpushraced(op, inpart):
2284 """Used to transmit push race error over the wire"""
2283 """Used to transmit push race error over the wire"""
2285 raise error.ResponseError(_(b'push failed:'), inpart.params[b'message'])
2284 raise error.ResponseError(_(b'push failed:'), inpart.params[b'message'])
2286
2285
2287
2286
2288 @parthandler(b'listkeys', (b'namespace',))
2287 @parthandler(b'listkeys', (b'namespace',))
2289 def handlelistkeys(op, inpart):
2288 def handlelistkeys(op, inpart):
2290 """retrieve pushkey namespace content stored in a bundle2"""
2289 """retrieve pushkey namespace content stored in a bundle2"""
2291 namespace = inpart.params[b'namespace']
2290 namespace = inpart.params[b'namespace']
2292 r = pushkey.decodekeys(inpart.read())
2291 r = pushkey.decodekeys(inpart.read())
2293 op.records.add(b'listkeys', (namespace, r))
2292 op.records.add(b'listkeys', (namespace, r))
2294
2293
2295
2294
2296 @parthandler(b'pushkey', (b'namespace', b'key', b'old', b'new'))
2295 @parthandler(b'pushkey', (b'namespace', b'key', b'old', b'new'))
2297 def handlepushkey(op, inpart):
2296 def handlepushkey(op, inpart):
2298 """process a pushkey request"""
2297 """process a pushkey request"""
2299 dec = pushkey.decode
2298 dec = pushkey.decode
2300 namespace = dec(inpart.params[b'namespace'])
2299 namespace = dec(inpart.params[b'namespace'])
2301 key = dec(inpart.params[b'key'])
2300 key = dec(inpart.params[b'key'])
2302 old = dec(inpart.params[b'old'])
2301 old = dec(inpart.params[b'old'])
2303 new = dec(inpart.params[b'new'])
2302 new = dec(inpart.params[b'new'])
2304 # Grab the transaction to ensure that we have the lock before performing the
2303 # Grab the transaction to ensure that we have the lock before performing the
2305 # pushkey.
2304 # pushkey.
2306 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2305 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2307 op.gettransaction()
2306 op.gettransaction()
2308 ret = op.repo.pushkey(namespace, key, old, new)
2307 ret = op.repo.pushkey(namespace, key, old, new)
2309 record = {b'namespace': namespace, b'key': key, b'old': old, b'new': new}
2308 record = {b'namespace': namespace, b'key': key, b'old': old, b'new': new}
2310 op.records.add(b'pushkey', record)
2309 op.records.add(b'pushkey', record)
2311 if op.reply is not None:
2310 if op.reply is not None:
2312 rpart = op.reply.newpart(b'reply:pushkey')
2311 rpart = op.reply.newpart(b'reply:pushkey')
2313 rpart.addparam(
2312 rpart.addparam(
2314 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2313 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2315 )
2314 )
2316 rpart.addparam(b'return', b'%i' % ret, mandatory=False)
2315 rpart.addparam(b'return', b'%i' % ret, mandatory=False)
2317 if inpart.mandatory and not ret:
2316 if inpart.mandatory and not ret:
2318 kwargs = {}
2317 kwargs = {}
2319 for key in (b'namespace', b'key', b'new', b'old', b'ret'):
2318 for key in (b'namespace', b'key', b'new', b'old', b'ret'):
2320 if key in inpart.params:
2319 if key in inpart.params:
2321 kwargs[key] = inpart.params[key]
2320 kwargs[key] = inpart.params[key]
2322 raise error.PushkeyFailed(
2321 raise error.PushkeyFailed(
2323 partid=b'%d' % inpart.id, **pycompat.strkwargs(kwargs)
2322 partid=b'%d' % inpart.id, **pycompat.strkwargs(kwargs)
2324 )
2323 )
2325
2324
2326
2325
2327 @parthandler(b'bookmarks')
2326 @parthandler(b'bookmarks')
2328 def handlebookmark(op, inpart):
2327 def handlebookmark(op, inpart):
2329 """transmit bookmark information
2328 """transmit bookmark information
2330
2329
2331 The part contains binary encoded bookmark information.
2330 The part contains binary encoded bookmark information.
2332
2331
2333 The exact behavior of this part can be controlled by the 'bookmarks' mode
2332 The exact behavior of this part can be controlled by the 'bookmarks' mode
2334 on the bundle operation.
2333 on the bundle operation.
2335
2334
2336 When mode is 'apply' (the default) the bookmark information is applied as
2335 When mode is 'apply' (the default) the bookmark information is applied as
2337 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2336 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2338 issued earlier to check for push races in such update. This behavior is
2337 issued earlier to check for push races in such update. This behavior is
2339 suitable for pushing.
2338 suitable for pushing.
2340
2339
2341 When mode is 'records', the information is recorded into the 'bookmarks'
2340 When mode is 'records', the information is recorded into the 'bookmarks'
2342 records of the bundle operation. This behavior is suitable for pulling.
2341 records of the bundle operation. This behavior is suitable for pulling.
2343 """
2342 """
2344 changes = bookmarks.binarydecode(inpart)
2343 changes = bookmarks.binarydecode(inpart)
2345
2344
2346 pushkeycompat = op.repo.ui.configbool(
2345 pushkeycompat = op.repo.ui.configbool(
2347 b'server', b'bookmarks-pushkey-compat'
2346 b'server', b'bookmarks-pushkey-compat'
2348 )
2347 )
2349 bookmarksmode = op.modes.get(b'bookmarks', b'apply')
2348 bookmarksmode = op.modes.get(b'bookmarks', b'apply')
2350
2349
2351 if bookmarksmode == b'apply':
2350 if bookmarksmode == b'apply':
2352 tr = op.gettransaction()
2351 tr = op.gettransaction()
2353 bookstore = op.repo._bookmarks
2352 bookstore = op.repo._bookmarks
2354 if pushkeycompat:
2353 if pushkeycompat:
2355 allhooks = []
2354 allhooks = []
2356 for book, node in changes:
2355 for book, node in changes:
2357 hookargs = tr.hookargs.copy()
2356 hookargs = tr.hookargs.copy()
2358 hookargs[b'pushkeycompat'] = b'1'
2357 hookargs[b'pushkeycompat'] = b'1'
2359 hookargs[b'namespace'] = b'bookmarks'
2358 hookargs[b'namespace'] = b'bookmarks'
2360 hookargs[b'key'] = book
2359 hookargs[b'key'] = book
2361 hookargs[b'old'] = nodemod.hex(bookstore.get(book, b''))
2360 hookargs[b'old'] = nodemod.hex(bookstore.get(book, b''))
2362 hookargs[b'new'] = nodemod.hex(
2361 hookargs[b'new'] = nodemod.hex(
2363 node if node is not None else b''
2362 node if node is not None else b''
2364 )
2363 )
2365 allhooks.append(hookargs)
2364 allhooks.append(hookargs)
2366
2365
2367 for hookargs in allhooks:
2366 for hookargs in allhooks:
2368 op.repo.hook(
2367 op.repo.hook(
2369 b'prepushkey', throw=True, **pycompat.strkwargs(hookargs)
2368 b'prepushkey', throw=True, **pycompat.strkwargs(hookargs)
2370 )
2369 )
2371
2370
2372 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2371 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2373
2372
2374 if pushkeycompat:
2373 if pushkeycompat:
2375
2374
2376 def runhook(unused_success):
2375 def runhook(unused_success):
2377 for hookargs in allhooks:
2376 for hookargs in allhooks:
2378 op.repo.hook(b'pushkey', **pycompat.strkwargs(hookargs))
2377 op.repo.hook(b'pushkey', **pycompat.strkwargs(hookargs))
2379
2378
2380 op.repo._afterlock(runhook)
2379 op.repo._afterlock(runhook)
2381
2380
2382 elif bookmarksmode == b'records':
2381 elif bookmarksmode == b'records':
2383 for book, node in changes:
2382 for book, node in changes:
2384 record = {b'bookmark': book, b'node': node}
2383 record = {b'bookmark': book, b'node': node}
2385 op.records.add(b'bookmarks', record)
2384 op.records.add(b'bookmarks', record)
2386 else:
2385 else:
2387 raise error.ProgrammingError(
2386 raise error.ProgrammingError(
2388 b'unkown bookmark mode: %s' % bookmarksmode
2387 b'unkown bookmark mode: %s' % bookmarksmode
2389 )
2388 )
2390
2389
2391
2390
2392 @parthandler(b'phase-heads')
2391 @parthandler(b'phase-heads')
2393 def handlephases(op, inpart):
2392 def handlephases(op, inpart):
2394 """apply phases from bundle part to repo"""
2393 """apply phases from bundle part to repo"""
2395 headsbyphase = phases.binarydecode(inpart)
2394 headsbyphase = phases.binarydecode(inpart)
2396 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2395 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2397
2396
2398
2397
2399 @parthandler(b'reply:pushkey', (b'return', b'in-reply-to'))
2398 @parthandler(b'reply:pushkey', (b'return', b'in-reply-to'))
2400 def handlepushkeyreply(op, inpart):
2399 def handlepushkeyreply(op, inpart):
2401 """retrieve the result of a pushkey request"""
2400 """retrieve the result of a pushkey request"""
2402 ret = int(inpart.params[b'return'])
2401 ret = int(inpart.params[b'return'])
2403 partid = int(inpart.params[b'in-reply-to'])
2402 partid = int(inpart.params[b'in-reply-to'])
2404 op.records.add(b'pushkey', {b'return': ret}, partid)
2403 op.records.add(b'pushkey', {b'return': ret}, partid)
2405
2404
2406
2405
2407 @parthandler(b'obsmarkers')
2406 @parthandler(b'obsmarkers')
2408 def handleobsmarker(op, inpart):
2407 def handleobsmarker(op, inpart):
2409 """add a stream of obsmarkers to the repo"""
2408 """add a stream of obsmarkers to the repo"""
2410 tr = op.gettransaction()
2409 tr = op.gettransaction()
2411 markerdata = inpart.read()
2410 markerdata = inpart.read()
2412 if op.ui.config(b'experimental', b'obsmarkers-exchange-debug'):
2411 if op.ui.config(b'experimental', b'obsmarkers-exchange-debug'):
2413 op.ui.writenoi18n(
2412 op.ui.writenoi18n(
2414 b'obsmarker-exchange: %i bytes received\n' % len(markerdata)
2413 b'obsmarker-exchange: %i bytes received\n' % len(markerdata)
2415 )
2414 )
2416 # The mergemarkers call will crash if marker creation is not enabled.
2415 # The mergemarkers call will crash if marker creation is not enabled.
2417 # we want to avoid this if the part is advisory.
2416 # we want to avoid this if the part is advisory.
2418 if not inpart.mandatory and op.repo.obsstore.readonly:
2417 if not inpart.mandatory and op.repo.obsstore.readonly:
2419 op.repo.ui.debug(
2418 op.repo.ui.debug(
2420 b'ignoring obsolescence markers, feature not enabled\n'
2419 b'ignoring obsolescence markers, feature not enabled\n'
2421 )
2420 )
2422 return
2421 return
2423 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2422 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2424 op.repo.invalidatevolatilesets()
2423 op.repo.invalidatevolatilesets()
2425 op.records.add(b'obsmarkers', {b'new': new})
2424 op.records.add(b'obsmarkers', {b'new': new})
2426 if op.reply is not None:
2425 if op.reply is not None:
2427 rpart = op.reply.newpart(b'reply:obsmarkers')
2426 rpart = op.reply.newpart(b'reply:obsmarkers')
2428 rpart.addparam(
2427 rpart.addparam(
2429 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2428 b'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
2430 )
2429 )
2431 rpart.addparam(b'new', b'%i' % new, mandatory=False)
2430 rpart.addparam(b'new', b'%i' % new, mandatory=False)
2432
2431
2433
2432
2434 @parthandler(b'reply:obsmarkers', (b'new', b'in-reply-to'))
2433 @parthandler(b'reply:obsmarkers', (b'new', b'in-reply-to'))
2435 def handleobsmarkerreply(op, inpart):
2434 def handleobsmarkerreply(op, inpart):
2436 """retrieve the result of a pushkey request"""
2435 """retrieve the result of a pushkey request"""
2437 ret = int(inpart.params[b'new'])
2436 ret = int(inpart.params[b'new'])
2438 partid = int(inpart.params[b'in-reply-to'])
2437 partid = int(inpart.params[b'in-reply-to'])
2439 op.records.add(b'obsmarkers', {b'new': ret}, partid)
2438 op.records.add(b'obsmarkers', {b'new': ret}, partid)
2440
2439
2441
2440
2442 @parthandler(b'hgtagsfnodes')
2441 @parthandler(b'hgtagsfnodes')
2443 def handlehgtagsfnodes(op, inpart):
2442 def handlehgtagsfnodes(op, inpart):
2444 """Applies .hgtags fnodes cache entries to the local repo.
2443 """Applies .hgtags fnodes cache entries to the local repo.
2445
2444
2446 Payload is pairs of 20 byte changeset nodes and filenodes.
2445 Payload is pairs of 20 byte changeset nodes and filenodes.
2447 """
2446 """
2448 # Grab the transaction so we ensure that we have the lock at this point.
2447 # Grab the transaction so we ensure that we have the lock at this point.
2449 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2448 if op.ui.configbool(b'experimental', b'bundle2lazylocking'):
2450 op.gettransaction()
2449 op.gettransaction()
2451 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2450 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2452
2451
2453 count = 0
2452 count = 0
2454 while True:
2453 while True:
2455 node = inpart.read(20)
2454 node = inpart.read(20)
2456 fnode = inpart.read(20)
2455 fnode = inpart.read(20)
2457 if len(node) < 20 or len(fnode) < 20:
2456 if len(node) < 20 or len(fnode) < 20:
2458 op.ui.debug(b'ignoring incomplete received .hgtags fnodes data\n')
2457 op.ui.debug(b'ignoring incomplete received .hgtags fnodes data\n')
2459 break
2458 break
2460 cache.setfnode(node, fnode)
2459 cache.setfnode(node, fnode)
2461 count += 1
2460 count += 1
2462
2461
2463 cache.write()
2462 cache.write()
2464 op.ui.debug(b'applied %i hgtags fnodes cache entries\n' % count)
2463 op.ui.debug(b'applied %i hgtags fnodes cache entries\n' % count)
2465
2464
2466
2465
2467 rbcstruct = struct.Struct(b'>III')
2466 rbcstruct = struct.Struct(b'>III')
2468
2467
2469
2468
2470 @parthandler(b'cache:rev-branch-cache')
2469 @parthandler(b'cache:rev-branch-cache')
2471 def handlerbc(op, inpart):
2470 def handlerbc(op, inpart):
2472 """receive a rev-branch-cache payload and update the local cache
2471 """receive a rev-branch-cache payload and update the local cache
2473
2472
2474 The payload is a series of data related to each branch
2473 The payload is a series of data related to each branch
2475
2474
2476 1) branch name length
2475 1) branch name length
2477 2) number of open heads
2476 2) number of open heads
2478 3) number of closed heads
2477 3) number of closed heads
2479 4) open heads nodes
2478 4) open heads nodes
2480 5) closed heads nodes
2479 5) closed heads nodes
2481 """
2480 """
2482 total = 0
2481 total = 0
2483 rawheader = inpart.read(rbcstruct.size)
2482 rawheader = inpart.read(rbcstruct.size)
2484 cache = op.repo.revbranchcache()
2483 cache = op.repo.revbranchcache()
2485 cl = op.repo.unfiltered().changelog
2484 cl = op.repo.unfiltered().changelog
2486 while rawheader:
2485 while rawheader:
2487 header = rbcstruct.unpack(rawheader)
2486 header = rbcstruct.unpack(rawheader)
2488 total += header[1] + header[2]
2487 total += header[1] + header[2]
2489 utf8branch = inpart.read(header[0])
2488 utf8branch = inpart.read(header[0])
2490 branch = encoding.tolocal(utf8branch)
2489 branch = encoding.tolocal(utf8branch)
2491 for x in pycompat.xrange(header[1]):
2490 for x in pycompat.xrange(header[1]):
2492 node = inpart.read(20)
2491 node = inpart.read(20)
2493 rev = cl.rev(node)
2492 rev = cl.rev(node)
2494 cache.setdata(branch, rev, node, False)
2493 cache.setdata(branch, rev, node, False)
2495 for x in pycompat.xrange(header[2]):
2494 for x in pycompat.xrange(header[2]):
2496 node = inpart.read(20)
2495 node = inpart.read(20)
2497 rev = cl.rev(node)
2496 rev = cl.rev(node)
2498 cache.setdata(branch, rev, node, True)
2497 cache.setdata(branch, rev, node, True)
2499 rawheader = inpart.read(rbcstruct.size)
2498 rawheader = inpart.read(rbcstruct.size)
2500 cache.write()
2499 cache.write()
2501
2500
2502
2501
2503 @parthandler(b'pushvars')
2502 @parthandler(b'pushvars')
2504 def bundle2getvars(op, part):
2503 def bundle2getvars(op, part):
2505 '''unbundle a bundle2 containing shellvars on the server'''
2504 '''unbundle a bundle2 containing shellvars on the server'''
2506 # An option to disable unbundling on server-side for security reasons
2505 # An option to disable unbundling on server-side for security reasons
2507 if op.ui.configbool(b'push', b'pushvars.server'):
2506 if op.ui.configbool(b'push', b'pushvars.server'):
2508 hookargs = {}
2507 hookargs = {}
2509 for key, value in part.advisoryparams:
2508 for key, value in part.advisoryparams:
2510 key = key.upper()
2509 key = key.upper()
2511 # We want pushed variables to have USERVAR_ prepended so we know
2510 # We want pushed variables to have USERVAR_ prepended so we know
2512 # they came from the --pushvar flag.
2511 # they came from the --pushvar flag.
2513 key = b"USERVAR_" + key
2512 key = b"USERVAR_" + key
2514 hookargs[key] = value
2513 hookargs[key] = value
2515 op.addhookargs(hookargs)
2514 op.addhookargs(hookargs)
2516
2515
2517
2516
2518 @parthandler(b'stream2', (b'requirements', b'filecount', b'bytecount'))
2517 @parthandler(b'stream2', (b'requirements', b'filecount', b'bytecount'))
2519 def handlestreamv2bundle(op, part):
2518 def handlestreamv2bundle(op, part):
2520
2519
2521 requirements = urlreq.unquote(part.params[b'requirements']).split(b',')
2520 requirements = urlreq.unquote(part.params[b'requirements']).split(b',')
2522 filecount = int(part.params[b'filecount'])
2521 filecount = int(part.params[b'filecount'])
2523 bytecount = int(part.params[b'bytecount'])
2522 bytecount = int(part.params[b'bytecount'])
2524
2523
2525 repo = op.repo
2524 repo = op.repo
2526 if len(repo):
2525 if len(repo):
2527 msg = _(b'cannot apply stream clone to non empty repository')
2526 msg = _(b'cannot apply stream clone to non empty repository')
2528 raise error.Abort(msg)
2527 raise error.Abort(msg)
2529
2528
2530 repo.ui.debug(b'applying stream bundle\n')
2529 repo.ui.debug(b'applying stream bundle\n')
2531 streamclone.applybundlev2(repo, part, filecount, bytecount, requirements)
2530 streamclone.applybundlev2(repo, part, filecount, bytecount, requirements)
2532
2531
2533
2532
2534 def widen_bundle(
2533 def widen_bundle(
2535 bundler, repo, oldmatcher, newmatcher, common, known, cgversion, ellipses
2534 bundler, repo, oldmatcher, newmatcher, common, known, cgversion, ellipses
2536 ):
2535 ):
2537 """generates bundle2 for widening a narrow clone
2536 """generates bundle2 for widening a narrow clone
2538
2537
2539 bundler is the bundle to which data should be added
2538 bundler is the bundle to which data should be added
2540 repo is the localrepository instance
2539 repo is the localrepository instance
2541 oldmatcher matches what the client already has
2540 oldmatcher matches what the client already has
2542 newmatcher matches what the client needs (including what it already has)
2541 newmatcher matches what the client needs (including what it already has)
2543 common is set of common heads between server and client
2542 common is set of common heads between server and client
2544 known is a set of revs known on the client side (used in ellipses)
2543 known is a set of revs known on the client side (used in ellipses)
2545 cgversion is the changegroup version to send
2544 cgversion is the changegroup version to send
2546 ellipses is boolean value telling whether to send ellipses data or not
2545 ellipses is boolean value telling whether to send ellipses data or not
2547
2546
2548 returns bundle2 of the data required for extending
2547 returns bundle2 of the data required for extending
2549 """
2548 """
2550 commonnodes = set()
2549 commonnodes = set()
2551 cl = repo.changelog
2550 cl = repo.changelog
2552 for r in repo.revs(b"::%ln", common):
2551 for r in repo.revs(b"::%ln", common):
2553 commonnodes.add(cl.node(r))
2552 commonnodes.add(cl.node(r))
2554 if commonnodes:
2553 if commonnodes:
2555 # XXX: we should only send the filelogs (and treemanifest). user
2554 # XXX: we should only send the filelogs (and treemanifest). user
2556 # already has the changelog and manifest
2555 # already has the changelog and manifest
2557 packer = changegroup.getbundler(
2556 packer = changegroup.getbundler(
2558 cgversion,
2557 cgversion,
2559 repo,
2558 repo,
2560 oldmatcher=oldmatcher,
2559 oldmatcher=oldmatcher,
2561 matcher=newmatcher,
2560 matcher=newmatcher,
2562 fullnodes=commonnodes,
2561 fullnodes=commonnodes,
2563 )
2562 )
2564 cgdata = packer.generate(
2563 cgdata = packer.generate(
2565 {nodemod.nullid},
2564 {nodemod.nullid},
2566 list(commonnodes),
2565 list(commonnodes),
2567 False,
2566 False,
2568 b'narrow_widen',
2567 b'narrow_widen',
2569 changelog=False,
2568 changelog=False,
2570 )
2569 )
2571
2570
2572 part = bundler.newpart(b'changegroup', data=cgdata)
2571 part = bundler.newpart(b'changegroup', data=cgdata)
2573 part.addparam(b'version', cgversion)
2572 part.addparam(b'version', cgversion)
2574 if b'treemanifest' in repo.requirements:
2573 if b'treemanifest' in repo.requirements:
2575 part.addparam(b'treemanifest', b'1')
2574 part.addparam(b'treemanifest', b'1')
2576 if b'exp-sidedata-flag' in repo.requirements:
2575 if b'exp-sidedata-flag' in repo.requirements:
2577 part.addparam(b'exp-sidedata', b'1')
2576 part.addparam(b'exp-sidedata', b'1')
2578
2577
2579 return bundler
2578 return bundler
General Comments 0
You need to be logged in to leave comments. Login now