##// END OF EJS Templates
bundle2: avoid unbound read when seeking...
Gregory Szorc -
r35117:699b2a75 default
parent child Browse files
Show More
@@ -1,2023 +1,2030
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import errno
150 import errno
151 import os
151 import os
152 import re
152 import re
153 import string
153 import string
154 import struct
154 import struct
155 import sys
155 import sys
156
156
157 from .i18n import _
157 from .i18n import _
158 from . import (
158 from . import (
159 changegroup,
159 changegroup,
160 error,
160 error,
161 node as nodemod,
161 node as nodemod,
162 obsolete,
162 obsolete,
163 phases,
163 phases,
164 pushkey,
164 pushkey,
165 pycompat,
165 pycompat,
166 tags,
166 tags,
167 url,
167 url,
168 util,
168 util,
169 )
169 )
170
170
171 urlerr = util.urlerr
171 urlerr = util.urlerr
172 urlreq = util.urlreq
172 urlreq = util.urlreq
173
173
174 _pack = struct.pack
174 _pack = struct.pack
175 _unpack = struct.unpack
175 _unpack = struct.unpack
176
176
177 _fstreamparamsize = '>i'
177 _fstreamparamsize = '>i'
178 _fpartheadersize = '>i'
178 _fpartheadersize = '>i'
179 _fparttypesize = '>B'
179 _fparttypesize = '>B'
180 _fpartid = '>I'
180 _fpartid = '>I'
181 _fpayloadsize = '>i'
181 _fpayloadsize = '>i'
182 _fpartparamcount = '>BB'
182 _fpartparamcount = '>BB'
183
183
184 preferedchunksize = 4096
184 preferedchunksize = 4096
185
185
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187
187
188 def outdebug(ui, message):
188 def outdebug(ui, message):
189 """debug regarding output stream (bundling)"""
189 """debug regarding output stream (bundling)"""
190 if ui.configbool('devel', 'bundle2.debug'):
190 if ui.configbool('devel', 'bundle2.debug'):
191 ui.debug('bundle2-output: %s\n' % message)
191 ui.debug('bundle2-output: %s\n' % message)
192
192
193 def indebug(ui, message):
193 def indebug(ui, message):
194 """debug on input stream (unbundling)"""
194 """debug on input stream (unbundling)"""
195 if ui.configbool('devel', 'bundle2.debug'):
195 if ui.configbool('devel', 'bundle2.debug'):
196 ui.debug('bundle2-input: %s\n' % message)
196 ui.debug('bundle2-input: %s\n' % message)
197
197
198 def validateparttype(parttype):
198 def validateparttype(parttype):
199 """raise ValueError if a parttype contains invalid character"""
199 """raise ValueError if a parttype contains invalid character"""
200 if _parttypeforbidden.search(parttype):
200 if _parttypeforbidden.search(parttype):
201 raise ValueError(parttype)
201 raise ValueError(parttype)
202
202
203 def _makefpartparamsizes(nbparams):
203 def _makefpartparamsizes(nbparams):
204 """return a struct format to read part parameter sizes
204 """return a struct format to read part parameter sizes
205
205
206 The number parameters is variable so we need to build that format
206 The number parameters is variable so we need to build that format
207 dynamically.
207 dynamically.
208 """
208 """
209 return '>'+('BB'*nbparams)
209 return '>'+('BB'*nbparams)
210
210
211 parthandlermapping = {}
211 parthandlermapping = {}
212
212
213 def parthandler(parttype, params=()):
213 def parthandler(parttype, params=()):
214 """decorator that register a function as a bundle2 part handler
214 """decorator that register a function as a bundle2 part handler
215
215
216 eg::
216 eg::
217
217
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 def myparttypehandler(...):
219 def myparttypehandler(...):
220 '''process a part of type "my part".'''
220 '''process a part of type "my part".'''
221 ...
221 ...
222 """
222 """
223 validateparttype(parttype)
223 validateparttype(parttype)
224 def _decorator(func):
224 def _decorator(func):
225 lparttype = parttype.lower() # enforce lower case matching.
225 lparttype = parttype.lower() # enforce lower case matching.
226 assert lparttype not in parthandlermapping
226 assert lparttype not in parthandlermapping
227 parthandlermapping[lparttype] = func
227 parthandlermapping[lparttype] = func
228 func.params = frozenset(params)
228 func.params = frozenset(params)
229 return func
229 return func
230 return _decorator
230 return _decorator
231
231
232 class unbundlerecords(object):
232 class unbundlerecords(object):
233 """keep record of what happens during and unbundle
233 """keep record of what happens during and unbundle
234
234
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 category of record and obj is an arbitrary object.
236 category of record and obj is an arbitrary object.
237
237
238 `records['cat']` will return all entries of this category 'cat'.
238 `records['cat']` will return all entries of this category 'cat'.
239
239
240 Iterating on the object itself will yield `('category', obj)` tuples
240 Iterating on the object itself will yield `('category', obj)` tuples
241 for all entries.
241 for all entries.
242
242
243 All iterations happens in chronological order.
243 All iterations happens in chronological order.
244 """
244 """
245
245
246 def __init__(self):
246 def __init__(self):
247 self._categories = {}
247 self._categories = {}
248 self._sequences = []
248 self._sequences = []
249 self._replies = {}
249 self._replies = {}
250
250
251 def add(self, category, entry, inreplyto=None):
251 def add(self, category, entry, inreplyto=None):
252 """add a new record of a given category.
252 """add a new record of a given category.
253
253
254 The entry can then be retrieved in the list returned by
254 The entry can then be retrieved in the list returned by
255 self['category']."""
255 self['category']."""
256 self._categories.setdefault(category, []).append(entry)
256 self._categories.setdefault(category, []).append(entry)
257 self._sequences.append((category, entry))
257 self._sequences.append((category, entry))
258 if inreplyto is not None:
258 if inreplyto is not None:
259 self.getreplies(inreplyto).add(category, entry)
259 self.getreplies(inreplyto).add(category, entry)
260
260
261 def getreplies(self, partid):
261 def getreplies(self, partid):
262 """get the records that are replies to a specific part"""
262 """get the records that are replies to a specific part"""
263 return self._replies.setdefault(partid, unbundlerecords())
263 return self._replies.setdefault(partid, unbundlerecords())
264
264
265 def __getitem__(self, cat):
265 def __getitem__(self, cat):
266 return tuple(self._categories.get(cat, ()))
266 return tuple(self._categories.get(cat, ()))
267
267
268 def __iter__(self):
268 def __iter__(self):
269 return iter(self._sequences)
269 return iter(self._sequences)
270
270
271 def __len__(self):
271 def __len__(self):
272 return len(self._sequences)
272 return len(self._sequences)
273
273
274 def __nonzero__(self):
274 def __nonzero__(self):
275 return bool(self._sequences)
275 return bool(self._sequences)
276
276
277 __bool__ = __nonzero__
277 __bool__ = __nonzero__
278
278
279 class bundleoperation(object):
279 class bundleoperation(object):
280 """an object that represents a single bundling process
280 """an object that represents a single bundling process
281
281
282 Its purpose is to carry unbundle-related objects and states.
282 Its purpose is to carry unbundle-related objects and states.
283
283
284 A new object should be created at the beginning of each bundle processing.
284 A new object should be created at the beginning of each bundle processing.
285 The object is to be returned by the processing function.
285 The object is to be returned by the processing function.
286
286
287 The object has very little content now it will ultimately contain:
287 The object has very little content now it will ultimately contain:
288 * an access to the repo the bundle is applied to,
288 * an access to the repo the bundle is applied to,
289 * a ui object,
289 * a ui object,
290 * a way to retrieve a transaction to add changes to the repo,
290 * a way to retrieve a transaction to add changes to the repo,
291 * a way to record the result of processing each part,
291 * a way to record the result of processing each part,
292 * a way to construct a bundle response when applicable.
292 * a way to construct a bundle response when applicable.
293 """
293 """
294
294
295 def __init__(self, repo, transactiongetter, captureoutput=True):
295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 self.repo = repo
296 self.repo = repo
297 self.ui = repo.ui
297 self.ui = repo.ui
298 self.records = unbundlerecords()
298 self.records = unbundlerecords()
299 self.reply = None
299 self.reply = None
300 self.captureoutput = captureoutput
300 self.captureoutput = captureoutput
301 self.hookargs = {}
301 self.hookargs = {}
302 self._gettransaction = transactiongetter
302 self._gettransaction = transactiongetter
303
303
304 def gettransaction(self):
304 def gettransaction(self):
305 transaction = self._gettransaction()
305 transaction = self._gettransaction()
306
306
307 if self.hookargs:
307 if self.hookargs:
308 # the ones added to the transaction supercede those added
308 # the ones added to the transaction supercede those added
309 # to the operation.
309 # to the operation.
310 self.hookargs.update(transaction.hookargs)
310 self.hookargs.update(transaction.hookargs)
311 transaction.hookargs = self.hookargs
311 transaction.hookargs = self.hookargs
312
312
313 # mark the hookargs as flushed. further attempts to add to
313 # mark the hookargs as flushed. further attempts to add to
314 # hookargs will result in an abort.
314 # hookargs will result in an abort.
315 self.hookargs = None
315 self.hookargs = None
316
316
317 return transaction
317 return transaction
318
318
319 def addhookargs(self, hookargs):
319 def addhookargs(self, hookargs):
320 if self.hookargs is None:
320 if self.hookargs is None:
321 raise error.ProgrammingError('attempted to add hookargs to '
321 raise error.ProgrammingError('attempted to add hookargs to '
322 'operation after transaction started')
322 'operation after transaction started')
323 self.hookargs.update(hookargs)
323 self.hookargs.update(hookargs)
324
324
325 class TransactionUnavailable(RuntimeError):
325 class TransactionUnavailable(RuntimeError):
326 pass
326 pass
327
327
328 def _notransaction():
328 def _notransaction():
329 """default method to get a transaction while processing a bundle
329 """default method to get a transaction while processing a bundle
330
330
331 Raise an exception to highlight the fact that no transaction was expected
331 Raise an exception to highlight the fact that no transaction was expected
332 to be created"""
332 to be created"""
333 raise TransactionUnavailable()
333 raise TransactionUnavailable()
334
334
335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
336 # transform me into unbundler.apply() as soon as the freeze is lifted
336 # transform me into unbundler.apply() as soon as the freeze is lifted
337 if isinstance(unbundler, unbundle20):
337 if isinstance(unbundler, unbundle20):
338 tr.hookargs['bundle2'] = '1'
338 tr.hookargs['bundle2'] = '1'
339 if source is not None and 'source' not in tr.hookargs:
339 if source is not None and 'source' not in tr.hookargs:
340 tr.hookargs['source'] = source
340 tr.hookargs['source'] = source
341 if url is not None and 'url' not in tr.hookargs:
341 if url is not None and 'url' not in tr.hookargs:
342 tr.hookargs['url'] = url
342 tr.hookargs['url'] = url
343 return processbundle(repo, unbundler, lambda: tr)
343 return processbundle(repo, unbundler, lambda: tr)
344 else:
344 else:
345 # the transactiongetter won't be used, but we might as well set it
345 # the transactiongetter won't be used, but we might as well set it
346 op = bundleoperation(repo, lambda: tr)
346 op = bundleoperation(repo, lambda: tr)
347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
348 return op
348 return op
349
349
350 class partiterator(object):
350 class partiterator(object):
351 def __init__(self, repo, op, unbundler):
351 def __init__(self, repo, op, unbundler):
352 self.repo = repo
352 self.repo = repo
353 self.op = op
353 self.op = op
354 self.unbundler = unbundler
354 self.unbundler = unbundler
355 self.iterator = None
355 self.iterator = None
356 self.count = 0
356 self.count = 0
357 self.current = None
357 self.current = None
358
358
359 def __enter__(self):
359 def __enter__(self):
360 def func():
360 def func():
361 itr = enumerate(self.unbundler.iterparts())
361 itr = enumerate(self.unbundler.iterparts())
362 for count, p in itr:
362 for count, p in itr:
363 self.count = count
363 self.count = count
364 self.current = p
364 self.current = p
365 yield p
365 yield p
366 p.consume()
366 p.consume()
367 self.current = None
367 self.current = None
368 self.iterator = func()
368 self.iterator = func()
369 return self.iterator
369 return self.iterator
370
370
371 def __exit__(self, type, exc, tb):
371 def __exit__(self, type, exc, tb):
372 if not self.iterator:
372 if not self.iterator:
373 return
373 return
374
374
375 # Only gracefully abort in a normal exception situation. User aborts
375 # Only gracefully abort in a normal exception situation. User aborts
376 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
376 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
377 # and should not gracefully cleanup.
377 # and should not gracefully cleanup.
378 if isinstance(exc, Exception):
378 if isinstance(exc, Exception):
379 # Any exceptions seeking to the end of the bundle at this point are
379 # Any exceptions seeking to the end of the bundle at this point are
380 # almost certainly related to the underlying stream being bad.
380 # almost certainly related to the underlying stream being bad.
381 # And, chances are that the exception we're handling is related to
381 # And, chances are that the exception we're handling is related to
382 # getting in that bad state. So, we swallow the seeking error and
382 # getting in that bad state. So, we swallow the seeking error and
383 # re-raise the original error.
383 # re-raise the original error.
384 seekerror = False
384 seekerror = False
385 try:
385 try:
386 if self.current:
386 if self.current:
387 # consume the part content to not corrupt the stream.
387 # consume the part content to not corrupt the stream.
388 self.current.consume()
388 self.current.consume()
389
389
390 for part in self.iterator:
390 for part in self.iterator:
391 # consume the bundle content
391 # consume the bundle content
392 part.consume()
392 part.consume()
393 except Exception:
393 except Exception:
394 seekerror = True
394 seekerror = True
395
395
396 # Small hack to let caller code distinguish exceptions from bundle2
396 # Small hack to let caller code distinguish exceptions from bundle2
397 # processing from processing the old format. This is mostly needed
397 # processing from processing the old format. This is mostly needed
398 # to handle different return codes to unbundle according to the type
398 # to handle different return codes to unbundle according to the type
399 # of bundle. We should probably clean up or drop this return code
399 # of bundle. We should probably clean up or drop this return code
400 # craziness in a future version.
400 # craziness in a future version.
401 exc.duringunbundle2 = True
401 exc.duringunbundle2 = True
402 salvaged = []
402 salvaged = []
403 replycaps = None
403 replycaps = None
404 if self.op.reply is not None:
404 if self.op.reply is not None:
405 salvaged = self.op.reply.salvageoutput()
405 salvaged = self.op.reply.salvageoutput()
406 replycaps = self.op.reply.capabilities
406 replycaps = self.op.reply.capabilities
407 exc._replycaps = replycaps
407 exc._replycaps = replycaps
408 exc._bundle2salvagedoutput = salvaged
408 exc._bundle2salvagedoutput = salvaged
409
409
410 # Re-raising from a variable loses the original stack. So only use
410 # Re-raising from a variable loses the original stack. So only use
411 # that form if we need to.
411 # that form if we need to.
412 if seekerror:
412 if seekerror:
413 raise exc
413 raise exc
414
414
415 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
415 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
416 self.count)
416 self.count)
417
417
418 def processbundle(repo, unbundler, transactiongetter=None, op=None):
418 def processbundle(repo, unbundler, transactiongetter=None, op=None):
419 """This function process a bundle, apply effect to/from a repo
419 """This function process a bundle, apply effect to/from a repo
420
420
421 It iterates over each part then searches for and uses the proper handling
421 It iterates over each part then searches for and uses the proper handling
422 code to process the part. Parts are processed in order.
422 code to process the part. Parts are processed in order.
423
423
424 Unknown Mandatory part will abort the process.
424 Unknown Mandatory part will abort the process.
425
425
426 It is temporarily possible to provide a prebuilt bundleoperation to the
426 It is temporarily possible to provide a prebuilt bundleoperation to the
427 function. This is used to ensure output is properly propagated in case of
427 function. This is used to ensure output is properly propagated in case of
428 an error during the unbundling. This output capturing part will likely be
428 an error during the unbundling. This output capturing part will likely be
429 reworked and this ability will probably go away in the process.
429 reworked and this ability will probably go away in the process.
430 """
430 """
431 if op is None:
431 if op is None:
432 if transactiongetter is None:
432 if transactiongetter is None:
433 transactiongetter = _notransaction
433 transactiongetter = _notransaction
434 op = bundleoperation(repo, transactiongetter)
434 op = bundleoperation(repo, transactiongetter)
435 # todo:
435 # todo:
436 # - replace this is a init function soon.
436 # - replace this is a init function soon.
437 # - exception catching
437 # - exception catching
438 unbundler.params
438 unbundler.params
439 if repo.ui.debugflag:
439 if repo.ui.debugflag:
440 msg = ['bundle2-input-bundle:']
440 msg = ['bundle2-input-bundle:']
441 if unbundler.params:
441 if unbundler.params:
442 msg.append(' %i params' % len(unbundler.params))
442 msg.append(' %i params' % len(unbundler.params))
443 if op._gettransaction is None or op._gettransaction is _notransaction:
443 if op._gettransaction is None or op._gettransaction is _notransaction:
444 msg.append(' no-transaction')
444 msg.append(' no-transaction')
445 else:
445 else:
446 msg.append(' with-transaction')
446 msg.append(' with-transaction')
447 msg.append('\n')
447 msg.append('\n')
448 repo.ui.debug(''.join(msg))
448 repo.ui.debug(''.join(msg))
449
449
450 processparts(repo, op, unbundler)
450 processparts(repo, op, unbundler)
451
451
452 return op
452 return op
453
453
454 def processparts(repo, op, unbundler):
454 def processparts(repo, op, unbundler):
455 with partiterator(repo, op, unbundler) as parts:
455 with partiterator(repo, op, unbundler) as parts:
456 for part in parts:
456 for part in parts:
457 _processpart(op, part)
457 _processpart(op, part)
458
458
459 def _processchangegroup(op, cg, tr, source, url, **kwargs):
459 def _processchangegroup(op, cg, tr, source, url, **kwargs):
460 ret = cg.apply(op.repo, tr, source, url, **kwargs)
460 ret = cg.apply(op.repo, tr, source, url, **kwargs)
461 op.records.add('changegroup', {
461 op.records.add('changegroup', {
462 'return': ret,
462 'return': ret,
463 })
463 })
464 return ret
464 return ret
465
465
466 def _gethandler(op, part):
466 def _gethandler(op, part):
467 status = 'unknown' # used by debug output
467 status = 'unknown' # used by debug output
468 try:
468 try:
469 handler = parthandlermapping.get(part.type)
469 handler = parthandlermapping.get(part.type)
470 if handler is None:
470 if handler is None:
471 status = 'unsupported-type'
471 status = 'unsupported-type'
472 raise error.BundleUnknownFeatureError(parttype=part.type)
472 raise error.BundleUnknownFeatureError(parttype=part.type)
473 indebug(op.ui, 'found a handler for part %s' % part.type)
473 indebug(op.ui, 'found a handler for part %s' % part.type)
474 unknownparams = part.mandatorykeys - handler.params
474 unknownparams = part.mandatorykeys - handler.params
475 if unknownparams:
475 if unknownparams:
476 unknownparams = list(unknownparams)
476 unknownparams = list(unknownparams)
477 unknownparams.sort()
477 unknownparams.sort()
478 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
478 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
479 raise error.BundleUnknownFeatureError(parttype=part.type,
479 raise error.BundleUnknownFeatureError(parttype=part.type,
480 params=unknownparams)
480 params=unknownparams)
481 status = 'supported'
481 status = 'supported'
482 except error.BundleUnknownFeatureError as exc:
482 except error.BundleUnknownFeatureError as exc:
483 if part.mandatory: # mandatory parts
483 if part.mandatory: # mandatory parts
484 raise
484 raise
485 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
485 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
486 return # skip to part processing
486 return # skip to part processing
487 finally:
487 finally:
488 if op.ui.debugflag:
488 if op.ui.debugflag:
489 msg = ['bundle2-input-part: "%s"' % part.type]
489 msg = ['bundle2-input-part: "%s"' % part.type]
490 if not part.mandatory:
490 if not part.mandatory:
491 msg.append(' (advisory)')
491 msg.append(' (advisory)')
492 nbmp = len(part.mandatorykeys)
492 nbmp = len(part.mandatorykeys)
493 nbap = len(part.params) - nbmp
493 nbap = len(part.params) - nbmp
494 if nbmp or nbap:
494 if nbmp or nbap:
495 msg.append(' (params:')
495 msg.append(' (params:')
496 if nbmp:
496 if nbmp:
497 msg.append(' %i mandatory' % nbmp)
497 msg.append(' %i mandatory' % nbmp)
498 if nbap:
498 if nbap:
499 msg.append(' %i advisory' % nbmp)
499 msg.append(' %i advisory' % nbmp)
500 msg.append(')')
500 msg.append(')')
501 msg.append(' %s\n' % status)
501 msg.append(' %s\n' % status)
502 op.ui.debug(''.join(msg))
502 op.ui.debug(''.join(msg))
503
503
504 return handler
504 return handler
505
505
506 def _processpart(op, part):
506 def _processpart(op, part):
507 """process a single part from a bundle
507 """process a single part from a bundle
508
508
509 The part is guaranteed to have been fully consumed when the function exits
509 The part is guaranteed to have been fully consumed when the function exits
510 (even if an exception is raised)."""
510 (even if an exception is raised)."""
511 handler = _gethandler(op, part)
511 handler = _gethandler(op, part)
512 if handler is None:
512 if handler is None:
513 return
513 return
514
514
515 # handler is called outside the above try block so that we don't
515 # handler is called outside the above try block so that we don't
516 # risk catching KeyErrors from anything other than the
516 # risk catching KeyErrors from anything other than the
517 # parthandlermapping lookup (any KeyError raised by handler()
517 # parthandlermapping lookup (any KeyError raised by handler()
518 # itself represents a defect of a different variety).
518 # itself represents a defect of a different variety).
519 output = None
519 output = None
520 if op.captureoutput and op.reply is not None:
520 if op.captureoutput and op.reply is not None:
521 op.ui.pushbuffer(error=True, subproc=True)
521 op.ui.pushbuffer(error=True, subproc=True)
522 output = ''
522 output = ''
523 try:
523 try:
524 handler(op, part)
524 handler(op, part)
525 finally:
525 finally:
526 if output is not None:
526 if output is not None:
527 output = op.ui.popbuffer()
527 output = op.ui.popbuffer()
528 if output:
528 if output:
529 outpart = op.reply.newpart('output', data=output,
529 outpart = op.reply.newpart('output', data=output,
530 mandatory=False)
530 mandatory=False)
531 outpart.addparam(
531 outpart.addparam(
532 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
532 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
533
533
534 def decodecaps(blob):
534 def decodecaps(blob):
535 """decode a bundle2 caps bytes blob into a dictionary
535 """decode a bundle2 caps bytes blob into a dictionary
536
536
537 The blob is a list of capabilities (one per line)
537 The blob is a list of capabilities (one per line)
538 Capabilities may have values using a line of the form::
538 Capabilities may have values using a line of the form::
539
539
540 capability=value1,value2,value3
540 capability=value1,value2,value3
541
541
542 The values are always a list."""
542 The values are always a list."""
543 caps = {}
543 caps = {}
544 for line in blob.splitlines():
544 for line in blob.splitlines():
545 if not line:
545 if not line:
546 continue
546 continue
547 if '=' not in line:
547 if '=' not in line:
548 key, vals = line, ()
548 key, vals = line, ()
549 else:
549 else:
550 key, vals = line.split('=', 1)
550 key, vals = line.split('=', 1)
551 vals = vals.split(',')
551 vals = vals.split(',')
552 key = urlreq.unquote(key)
552 key = urlreq.unquote(key)
553 vals = [urlreq.unquote(v) for v in vals]
553 vals = [urlreq.unquote(v) for v in vals]
554 caps[key] = vals
554 caps[key] = vals
555 return caps
555 return caps
556
556
557 def encodecaps(caps):
557 def encodecaps(caps):
558 """encode a bundle2 caps dictionary into a bytes blob"""
558 """encode a bundle2 caps dictionary into a bytes blob"""
559 chunks = []
559 chunks = []
560 for ca in sorted(caps):
560 for ca in sorted(caps):
561 vals = caps[ca]
561 vals = caps[ca]
562 ca = urlreq.quote(ca)
562 ca = urlreq.quote(ca)
563 vals = [urlreq.quote(v) for v in vals]
563 vals = [urlreq.quote(v) for v in vals]
564 if vals:
564 if vals:
565 ca = "%s=%s" % (ca, ','.join(vals))
565 ca = "%s=%s" % (ca, ','.join(vals))
566 chunks.append(ca)
566 chunks.append(ca)
567 return '\n'.join(chunks)
567 return '\n'.join(chunks)
568
568
569 bundletypes = {
569 bundletypes = {
570 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
570 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
571 # since the unification ssh accepts a header but there
571 # since the unification ssh accepts a header but there
572 # is no capability signaling it.
572 # is no capability signaling it.
573 "HG20": (), # special-cased below
573 "HG20": (), # special-cased below
574 "HG10UN": ("HG10UN", 'UN'),
574 "HG10UN": ("HG10UN", 'UN'),
575 "HG10BZ": ("HG10", 'BZ'),
575 "HG10BZ": ("HG10", 'BZ'),
576 "HG10GZ": ("HG10GZ", 'GZ'),
576 "HG10GZ": ("HG10GZ", 'GZ'),
577 }
577 }
578
578
579 # hgweb uses this list to communicate its preferred type
579 # hgweb uses this list to communicate its preferred type
580 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
580 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
581
581
582 class bundle20(object):
582 class bundle20(object):
583 """represent an outgoing bundle2 container
583 """represent an outgoing bundle2 container
584
584
585 Use the `addparam` method to add stream level parameter. and `newpart` to
585 Use the `addparam` method to add stream level parameter. and `newpart` to
586 populate it. Then call `getchunks` to retrieve all the binary chunks of
586 populate it. Then call `getchunks` to retrieve all the binary chunks of
587 data that compose the bundle2 container."""
587 data that compose the bundle2 container."""
588
588
589 _magicstring = 'HG20'
589 _magicstring = 'HG20'
590
590
591 def __init__(self, ui, capabilities=()):
591 def __init__(self, ui, capabilities=()):
592 self.ui = ui
592 self.ui = ui
593 self._params = []
593 self._params = []
594 self._parts = []
594 self._parts = []
595 self.capabilities = dict(capabilities)
595 self.capabilities = dict(capabilities)
596 self._compengine = util.compengines.forbundletype('UN')
596 self._compengine = util.compengines.forbundletype('UN')
597 self._compopts = None
597 self._compopts = None
598
598
599 def setcompression(self, alg, compopts=None):
599 def setcompression(self, alg, compopts=None):
600 """setup core part compression to <alg>"""
600 """setup core part compression to <alg>"""
601 if alg in (None, 'UN'):
601 if alg in (None, 'UN'):
602 return
602 return
603 assert not any(n.lower() == 'compression' for n, v in self._params)
603 assert not any(n.lower() == 'compression' for n, v in self._params)
604 self.addparam('Compression', alg)
604 self.addparam('Compression', alg)
605 self._compengine = util.compengines.forbundletype(alg)
605 self._compengine = util.compengines.forbundletype(alg)
606 self._compopts = compopts
606 self._compopts = compopts
607
607
608 @property
608 @property
609 def nbparts(self):
609 def nbparts(self):
610 """total number of parts added to the bundler"""
610 """total number of parts added to the bundler"""
611 return len(self._parts)
611 return len(self._parts)
612
612
613 # methods used to defines the bundle2 content
613 # methods used to defines the bundle2 content
614 def addparam(self, name, value=None):
614 def addparam(self, name, value=None):
615 """add a stream level parameter"""
615 """add a stream level parameter"""
616 if not name:
616 if not name:
617 raise ValueError(r'empty parameter name')
617 raise ValueError(r'empty parameter name')
618 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
618 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
619 raise ValueError(r'non letter first character: %s' % name)
619 raise ValueError(r'non letter first character: %s' % name)
620 self._params.append((name, value))
620 self._params.append((name, value))
621
621
622 def addpart(self, part):
622 def addpart(self, part):
623 """add a new part to the bundle2 container
623 """add a new part to the bundle2 container
624
624
625 Parts contains the actual applicative payload."""
625 Parts contains the actual applicative payload."""
626 assert part.id is None
626 assert part.id is None
627 part.id = len(self._parts) # very cheap counter
627 part.id = len(self._parts) # very cheap counter
628 self._parts.append(part)
628 self._parts.append(part)
629
629
630 def newpart(self, typeid, *args, **kwargs):
630 def newpart(self, typeid, *args, **kwargs):
631 """create a new part and add it to the containers
631 """create a new part and add it to the containers
632
632
633 As the part is directly added to the containers. For now, this means
633 As the part is directly added to the containers. For now, this means
634 that any failure to properly initialize the part after calling
634 that any failure to properly initialize the part after calling
635 ``newpart`` should result in a failure of the whole bundling process.
635 ``newpart`` should result in a failure of the whole bundling process.
636
636
637 You can still fall back to manually create and add if you need better
637 You can still fall back to manually create and add if you need better
638 control."""
638 control."""
639 part = bundlepart(typeid, *args, **kwargs)
639 part = bundlepart(typeid, *args, **kwargs)
640 self.addpart(part)
640 self.addpart(part)
641 return part
641 return part
642
642
643 # methods used to generate the bundle2 stream
643 # methods used to generate the bundle2 stream
644 def getchunks(self):
644 def getchunks(self):
645 if self.ui.debugflag:
645 if self.ui.debugflag:
646 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
646 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
647 if self._params:
647 if self._params:
648 msg.append(' (%i params)' % len(self._params))
648 msg.append(' (%i params)' % len(self._params))
649 msg.append(' %i parts total\n' % len(self._parts))
649 msg.append(' %i parts total\n' % len(self._parts))
650 self.ui.debug(''.join(msg))
650 self.ui.debug(''.join(msg))
651 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
651 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
652 yield self._magicstring
652 yield self._magicstring
653 param = self._paramchunk()
653 param = self._paramchunk()
654 outdebug(self.ui, 'bundle parameter: %s' % param)
654 outdebug(self.ui, 'bundle parameter: %s' % param)
655 yield _pack(_fstreamparamsize, len(param))
655 yield _pack(_fstreamparamsize, len(param))
656 if param:
656 if param:
657 yield param
657 yield param
658 for chunk in self._compengine.compressstream(self._getcorechunk(),
658 for chunk in self._compengine.compressstream(self._getcorechunk(),
659 self._compopts):
659 self._compopts):
660 yield chunk
660 yield chunk
661
661
662 def _paramchunk(self):
662 def _paramchunk(self):
663 """return a encoded version of all stream parameters"""
663 """return a encoded version of all stream parameters"""
664 blocks = []
664 blocks = []
665 for par, value in self._params:
665 for par, value in self._params:
666 par = urlreq.quote(par)
666 par = urlreq.quote(par)
667 if value is not None:
667 if value is not None:
668 value = urlreq.quote(value)
668 value = urlreq.quote(value)
669 par = '%s=%s' % (par, value)
669 par = '%s=%s' % (par, value)
670 blocks.append(par)
670 blocks.append(par)
671 return ' '.join(blocks)
671 return ' '.join(blocks)
672
672
673 def _getcorechunk(self):
673 def _getcorechunk(self):
674 """yield chunk for the core part of the bundle
674 """yield chunk for the core part of the bundle
675
675
676 (all but headers and parameters)"""
676 (all but headers and parameters)"""
677 outdebug(self.ui, 'start of parts')
677 outdebug(self.ui, 'start of parts')
678 for part in self._parts:
678 for part in self._parts:
679 outdebug(self.ui, 'bundle part: "%s"' % part.type)
679 outdebug(self.ui, 'bundle part: "%s"' % part.type)
680 for chunk in part.getchunks(ui=self.ui):
680 for chunk in part.getchunks(ui=self.ui):
681 yield chunk
681 yield chunk
682 outdebug(self.ui, 'end of bundle')
682 outdebug(self.ui, 'end of bundle')
683 yield _pack(_fpartheadersize, 0)
683 yield _pack(_fpartheadersize, 0)
684
684
685
685
686 def salvageoutput(self):
686 def salvageoutput(self):
687 """return a list with a copy of all output parts in the bundle
687 """return a list with a copy of all output parts in the bundle
688
688
689 This is meant to be used during error handling to make sure we preserve
689 This is meant to be used during error handling to make sure we preserve
690 server output"""
690 server output"""
691 salvaged = []
691 salvaged = []
692 for part in self._parts:
692 for part in self._parts:
693 if part.type.startswith('output'):
693 if part.type.startswith('output'):
694 salvaged.append(part.copy())
694 salvaged.append(part.copy())
695 return salvaged
695 return salvaged
696
696
697
697
698 class unpackermixin(object):
698 class unpackermixin(object):
699 """A mixin to extract bytes and struct data from a stream"""
699 """A mixin to extract bytes and struct data from a stream"""
700
700
701 def __init__(self, fp):
701 def __init__(self, fp):
702 self._fp = fp
702 self._fp = fp
703
703
704 def _unpack(self, format):
704 def _unpack(self, format):
705 """unpack this struct format from the stream
705 """unpack this struct format from the stream
706
706
707 This method is meant for internal usage by the bundle2 protocol only.
707 This method is meant for internal usage by the bundle2 protocol only.
708 They directly manipulate the low level stream including bundle2 level
708 They directly manipulate the low level stream including bundle2 level
709 instruction.
709 instruction.
710
710
711 Do not use it to implement higher-level logic or methods."""
711 Do not use it to implement higher-level logic or methods."""
712 data = self._readexact(struct.calcsize(format))
712 data = self._readexact(struct.calcsize(format))
713 return _unpack(format, data)
713 return _unpack(format, data)
714
714
715 def _readexact(self, size):
715 def _readexact(self, size):
716 """read exactly <size> bytes from the stream
716 """read exactly <size> bytes from the stream
717
717
718 This method is meant for internal usage by the bundle2 protocol only.
718 This method is meant for internal usage by the bundle2 protocol only.
719 They directly manipulate the low level stream including bundle2 level
719 They directly manipulate the low level stream including bundle2 level
720 instruction.
720 instruction.
721
721
722 Do not use it to implement higher-level logic or methods."""
722 Do not use it to implement higher-level logic or methods."""
723 return changegroup.readexactly(self._fp, size)
723 return changegroup.readexactly(self._fp, size)
724
724
725 def getunbundler(ui, fp, magicstring=None):
725 def getunbundler(ui, fp, magicstring=None):
726 """return a valid unbundler object for a given magicstring"""
726 """return a valid unbundler object for a given magicstring"""
727 if magicstring is None:
727 if magicstring is None:
728 magicstring = changegroup.readexactly(fp, 4)
728 magicstring = changegroup.readexactly(fp, 4)
729 magic, version = magicstring[0:2], magicstring[2:4]
729 magic, version = magicstring[0:2], magicstring[2:4]
730 if magic != 'HG':
730 if magic != 'HG':
731 ui.debug(
731 ui.debug(
732 "error: invalid magic: %r (version %r), should be 'HG'\n"
732 "error: invalid magic: %r (version %r), should be 'HG'\n"
733 % (magic, version))
733 % (magic, version))
734 raise error.Abort(_('not a Mercurial bundle'))
734 raise error.Abort(_('not a Mercurial bundle'))
735 unbundlerclass = formatmap.get(version)
735 unbundlerclass = formatmap.get(version)
736 if unbundlerclass is None:
736 if unbundlerclass is None:
737 raise error.Abort(_('unknown bundle version %s') % version)
737 raise error.Abort(_('unknown bundle version %s') % version)
738 unbundler = unbundlerclass(ui, fp)
738 unbundler = unbundlerclass(ui, fp)
739 indebug(ui, 'start processing of %s stream' % magicstring)
739 indebug(ui, 'start processing of %s stream' % magicstring)
740 return unbundler
740 return unbundler
741
741
742 class unbundle20(unpackermixin):
742 class unbundle20(unpackermixin):
743 """interpret a bundle2 stream
743 """interpret a bundle2 stream
744
744
745 This class is fed with a binary stream and yields parts through its
745 This class is fed with a binary stream and yields parts through its
746 `iterparts` methods."""
746 `iterparts` methods."""
747
747
748 _magicstring = 'HG20'
748 _magicstring = 'HG20'
749
749
750 def __init__(self, ui, fp):
750 def __init__(self, ui, fp):
751 """If header is specified, we do not read it out of the stream."""
751 """If header is specified, we do not read it out of the stream."""
752 self.ui = ui
752 self.ui = ui
753 self._compengine = util.compengines.forbundletype('UN')
753 self._compengine = util.compengines.forbundletype('UN')
754 self._compressed = None
754 self._compressed = None
755 super(unbundle20, self).__init__(fp)
755 super(unbundle20, self).__init__(fp)
756
756
757 @util.propertycache
757 @util.propertycache
758 def params(self):
758 def params(self):
759 """dictionary of stream level parameters"""
759 """dictionary of stream level parameters"""
760 indebug(self.ui, 'reading bundle2 stream parameters')
760 indebug(self.ui, 'reading bundle2 stream parameters')
761 params = {}
761 params = {}
762 paramssize = self._unpack(_fstreamparamsize)[0]
762 paramssize = self._unpack(_fstreamparamsize)[0]
763 if paramssize < 0:
763 if paramssize < 0:
764 raise error.BundleValueError('negative bundle param size: %i'
764 raise error.BundleValueError('negative bundle param size: %i'
765 % paramssize)
765 % paramssize)
766 if paramssize:
766 if paramssize:
767 params = self._readexact(paramssize)
767 params = self._readexact(paramssize)
768 params = self._processallparams(params)
768 params = self._processallparams(params)
769 return params
769 return params
770
770
771 def _processallparams(self, paramsblock):
771 def _processallparams(self, paramsblock):
772 """"""
772 """"""
773 params = util.sortdict()
773 params = util.sortdict()
774 for p in paramsblock.split(' '):
774 for p in paramsblock.split(' '):
775 p = p.split('=', 1)
775 p = p.split('=', 1)
776 p = [urlreq.unquote(i) for i in p]
776 p = [urlreq.unquote(i) for i in p]
777 if len(p) < 2:
777 if len(p) < 2:
778 p.append(None)
778 p.append(None)
779 self._processparam(*p)
779 self._processparam(*p)
780 params[p[0]] = p[1]
780 params[p[0]] = p[1]
781 return params
781 return params
782
782
783
783
784 def _processparam(self, name, value):
784 def _processparam(self, name, value):
785 """process a parameter, applying its effect if needed
785 """process a parameter, applying its effect if needed
786
786
787 Parameter starting with a lower case letter are advisory and will be
787 Parameter starting with a lower case letter are advisory and will be
788 ignored when unknown. Those starting with an upper case letter are
788 ignored when unknown. Those starting with an upper case letter are
789 mandatory and will this function will raise a KeyError when unknown.
789 mandatory and will this function will raise a KeyError when unknown.
790
790
791 Note: no option are currently supported. Any input will be either
791 Note: no option are currently supported. Any input will be either
792 ignored or failing.
792 ignored or failing.
793 """
793 """
794 if not name:
794 if not name:
795 raise ValueError(r'empty parameter name')
795 raise ValueError(r'empty parameter name')
796 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
796 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
797 raise ValueError(r'non letter first character: %s' % name)
797 raise ValueError(r'non letter first character: %s' % name)
798 try:
798 try:
799 handler = b2streamparamsmap[name.lower()]
799 handler = b2streamparamsmap[name.lower()]
800 except KeyError:
800 except KeyError:
801 if name[0:1].islower():
801 if name[0:1].islower():
802 indebug(self.ui, "ignoring unknown parameter %s" % name)
802 indebug(self.ui, "ignoring unknown parameter %s" % name)
803 else:
803 else:
804 raise error.BundleUnknownFeatureError(params=(name,))
804 raise error.BundleUnknownFeatureError(params=(name,))
805 else:
805 else:
806 handler(self, name, value)
806 handler(self, name, value)
807
807
808 def _forwardchunks(self):
808 def _forwardchunks(self):
809 """utility to transfer a bundle2 as binary
809 """utility to transfer a bundle2 as binary
810
810
811 This is made necessary by the fact the 'getbundle' command over 'ssh'
811 This is made necessary by the fact the 'getbundle' command over 'ssh'
812 have no way to know then the reply end, relying on the bundle to be
812 have no way to know then the reply end, relying on the bundle to be
813 interpreted to know its end. This is terrible and we are sorry, but we
813 interpreted to know its end. This is terrible and we are sorry, but we
814 needed to move forward to get general delta enabled.
814 needed to move forward to get general delta enabled.
815 """
815 """
816 yield self._magicstring
816 yield self._magicstring
817 assert 'params' not in vars(self)
817 assert 'params' not in vars(self)
818 paramssize = self._unpack(_fstreamparamsize)[0]
818 paramssize = self._unpack(_fstreamparamsize)[0]
819 if paramssize < 0:
819 if paramssize < 0:
820 raise error.BundleValueError('negative bundle param size: %i'
820 raise error.BundleValueError('negative bundle param size: %i'
821 % paramssize)
821 % paramssize)
822 yield _pack(_fstreamparamsize, paramssize)
822 yield _pack(_fstreamparamsize, paramssize)
823 if paramssize:
823 if paramssize:
824 params = self._readexact(paramssize)
824 params = self._readexact(paramssize)
825 self._processallparams(params)
825 self._processallparams(params)
826 yield params
826 yield params
827 assert self._compengine.bundletype == 'UN'
827 assert self._compengine.bundletype == 'UN'
828 # From there, payload might need to be decompressed
828 # From there, payload might need to be decompressed
829 self._fp = self._compengine.decompressorreader(self._fp)
829 self._fp = self._compengine.decompressorreader(self._fp)
830 emptycount = 0
830 emptycount = 0
831 while emptycount < 2:
831 while emptycount < 2:
832 # so we can brainlessly loop
832 # so we can brainlessly loop
833 assert _fpartheadersize == _fpayloadsize
833 assert _fpartheadersize == _fpayloadsize
834 size = self._unpack(_fpartheadersize)[0]
834 size = self._unpack(_fpartheadersize)[0]
835 yield _pack(_fpartheadersize, size)
835 yield _pack(_fpartheadersize, size)
836 if size:
836 if size:
837 emptycount = 0
837 emptycount = 0
838 else:
838 else:
839 emptycount += 1
839 emptycount += 1
840 continue
840 continue
841 if size == flaginterrupt:
841 if size == flaginterrupt:
842 continue
842 continue
843 elif size < 0:
843 elif size < 0:
844 raise error.BundleValueError('negative chunk size: %i')
844 raise error.BundleValueError('negative chunk size: %i')
845 yield self._readexact(size)
845 yield self._readexact(size)
846
846
847
847
848 def iterparts(self, seekable=False):
848 def iterparts(self, seekable=False):
849 """yield all parts contained in the stream"""
849 """yield all parts contained in the stream"""
850 cls = seekableunbundlepart if seekable else unbundlepart
850 cls = seekableunbundlepart if seekable else unbundlepart
851 # make sure param have been loaded
851 # make sure param have been loaded
852 self.params
852 self.params
853 # From there, payload need to be decompressed
853 # From there, payload need to be decompressed
854 self._fp = self._compengine.decompressorreader(self._fp)
854 self._fp = self._compengine.decompressorreader(self._fp)
855 indebug(self.ui, 'start extraction of bundle2 parts')
855 indebug(self.ui, 'start extraction of bundle2 parts')
856 headerblock = self._readpartheader()
856 headerblock = self._readpartheader()
857 while headerblock is not None:
857 while headerblock is not None:
858 part = cls(self.ui, headerblock, self._fp)
858 part = cls(self.ui, headerblock, self._fp)
859 yield part
859 yield part
860 # Ensure part is fully consumed so we can start reading the next
860 # Ensure part is fully consumed so we can start reading the next
861 # part.
861 # part.
862 part.consume()
862 part.consume()
863
863
864 headerblock = self._readpartheader()
864 headerblock = self._readpartheader()
865 indebug(self.ui, 'end of bundle2 stream')
865 indebug(self.ui, 'end of bundle2 stream')
866
866
867 def _readpartheader(self):
867 def _readpartheader(self):
868 """reads a part header size and return the bytes blob
868 """reads a part header size and return the bytes blob
869
869
870 returns None if empty"""
870 returns None if empty"""
871 headersize = self._unpack(_fpartheadersize)[0]
871 headersize = self._unpack(_fpartheadersize)[0]
872 if headersize < 0:
872 if headersize < 0:
873 raise error.BundleValueError('negative part header size: %i'
873 raise error.BundleValueError('negative part header size: %i'
874 % headersize)
874 % headersize)
875 indebug(self.ui, 'part header size: %i' % headersize)
875 indebug(self.ui, 'part header size: %i' % headersize)
876 if headersize:
876 if headersize:
877 return self._readexact(headersize)
877 return self._readexact(headersize)
878 return None
878 return None
879
879
880 def compressed(self):
880 def compressed(self):
881 self.params # load params
881 self.params # load params
882 return self._compressed
882 return self._compressed
883
883
884 def close(self):
884 def close(self):
885 """close underlying file"""
885 """close underlying file"""
886 if util.safehasattr(self._fp, 'close'):
886 if util.safehasattr(self._fp, 'close'):
887 return self._fp.close()
887 return self._fp.close()
888
888
889 formatmap = {'20': unbundle20}
889 formatmap = {'20': unbundle20}
890
890
891 b2streamparamsmap = {}
891 b2streamparamsmap = {}
892
892
893 def b2streamparamhandler(name):
893 def b2streamparamhandler(name):
894 """register a handler for a stream level parameter"""
894 """register a handler for a stream level parameter"""
895 def decorator(func):
895 def decorator(func):
896 assert name not in formatmap
896 assert name not in formatmap
897 b2streamparamsmap[name] = func
897 b2streamparamsmap[name] = func
898 return func
898 return func
899 return decorator
899 return decorator
900
900
901 @b2streamparamhandler('compression')
901 @b2streamparamhandler('compression')
902 def processcompression(unbundler, param, value):
902 def processcompression(unbundler, param, value):
903 """read compression parameter and install payload decompression"""
903 """read compression parameter and install payload decompression"""
904 if value not in util.compengines.supportedbundletypes:
904 if value not in util.compengines.supportedbundletypes:
905 raise error.BundleUnknownFeatureError(params=(param,),
905 raise error.BundleUnknownFeatureError(params=(param,),
906 values=(value,))
906 values=(value,))
907 unbundler._compengine = util.compengines.forbundletype(value)
907 unbundler._compengine = util.compengines.forbundletype(value)
908 if value is not None:
908 if value is not None:
909 unbundler._compressed = True
909 unbundler._compressed = True
910
910
911 class bundlepart(object):
911 class bundlepart(object):
912 """A bundle2 part contains application level payload
912 """A bundle2 part contains application level payload
913
913
914 The part `type` is used to route the part to the application level
914 The part `type` is used to route the part to the application level
915 handler.
915 handler.
916
916
917 The part payload is contained in ``part.data``. It could be raw bytes or a
917 The part payload is contained in ``part.data``. It could be raw bytes or a
918 generator of byte chunks.
918 generator of byte chunks.
919
919
920 You can add parameters to the part using the ``addparam`` method.
920 You can add parameters to the part using the ``addparam`` method.
921 Parameters can be either mandatory (default) or advisory. Remote side
921 Parameters can be either mandatory (default) or advisory. Remote side
922 should be able to safely ignore the advisory ones.
922 should be able to safely ignore the advisory ones.
923
923
924 Both data and parameters cannot be modified after the generation has begun.
924 Both data and parameters cannot be modified after the generation has begun.
925 """
925 """
926
926
927 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
927 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
928 data='', mandatory=True):
928 data='', mandatory=True):
929 validateparttype(parttype)
929 validateparttype(parttype)
930 self.id = None
930 self.id = None
931 self.type = parttype
931 self.type = parttype
932 self._data = data
932 self._data = data
933 self._mandatoryparams = list(mandatoryparams)
933 self._mandatoryparams = list(mandatoryparams)
934 self._advisoryparams = list(advisoryparams)
934 self._advisoryparams = list(advisoryparams)
935 # checking for duplicated entries
935 # checking for duplicated entries
936 self._seenparams = set()
936 self._seenparams = set()
937 for pname, __ in self._mandatoryparams + self._advisoryparams:
937 for pname, __ in self._mandatoryparams + self._advisoryparams:
938 if pname in self._seenparams:
938 if pname in self._seenparams:
939 raise error.ProgrammingError('duplicated params: %s' % pname)
939 raise error.ProgrammingError('duplicated params: %s' % pname)
940 self._seenparams.add(pname)
940 self._seenparams.add(pname)
941 # status of the part's generation:
941 # status of the part's generation:
942 # - None: not started,
942 # - None: not started,
943 # - False: currently generated,
943 # - False: currently generated,
944 # - True: generation done.
944 # - True: generation done.
945 self._generated = None
945 self._generated = None
946 self.mandatory = mandatory
946 self.mandatory = mandatory
947
947
948 def __repr__(self):
948 def __repr__(self):
949 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
949 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
950 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
950 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
951 % (cls, id(self), self.id, self.type, self.mandatory))
951 % (cls, id(self), self.id, self.type, self.mandatory))
952
952
953 def copy(self):
953 def copy(self):
954 """return a copy of the part
954 """return a copy of the part
955
955
956 The new part have the very same content but no partid assigned yet.
956 The new part have the very same content but no partid assigned yet.
957 Parts with generated data cannot be copied."""
957 Parts with generated data cannot be copied."""
958 assert not util.safehasattr(self.data, 'next')
958 assert not util.safehasattr(self.data, 'next')
959 return self.__class__(self.type, self._mandatoryparams,
959 return self.__class__(self.type, self._mandatoryparams,
960 self._advisoryparams, self._data, self.mandatory)
960 self._advisoryparams, self._data, self.mandatory)
961
961
962 # methods used to defines the part content
962 # methods used to defines the part content
963 @property
963 @property
964 def data(self):
964 def data(self):
965 return self._data
965 return self._data
966
966
967 @data.setter
967 @data.setter
968 def data(self, data):
968 def data(self, data):
969 if self._generated is not None:
969 if self._generated is not None:
970 raise error.ReadOnlyPartError('part is being generated')
970 raise error.ReadOnlyPartError('part is being generated')
971 self._data = data
971 self._data = data
972
972
973 @property
973 @property
974 def mandatoryparams(self):
974 def mandatoryparams(self):
975 # make it an immutable tuple to force people through ``addparam``
975 # make it an immutable tuple to force people through ``addparam``
976 return tuple(self._mandatoryparams)
976 return tuple(self._mandatoryparams)
977
977
978 @property
978 @property
979 def advisoryparams(self):
979 def advisoryparams(self):
980 # make it an immutable tuple to force people through ``addparam``
980 # make it an immutable tuple to force people through ``addparam``
981 return tuple(self._advisoryparams)
981 return tuple(self._advisoryparams)
982
982
983 def addparam(self, name, value='', mandatory=True):
983 def addparam(self, name, value='', mandatory=True):
984 """add a parameter to the part
984 """add a parameter to the part
985
985
986 If 'mandatory' is set to True, the remote handler must claim support
986 If 'mandatory' is set to True, the remote handler must claim support
987 for this parameter or the unbundling will be aborted.
987 for this parameter or the unbundling will be aborted.
988
988
989 The 'name' and 'value' cannot exceed 255 bytes each.
989 The 'name' and 'value' cannot exceed 255 bytes each.
990 """
990 """
991 if self._generated is not None:
991 if self._generated is not None:
992 raise error.ReadOnlyPartError('part is being generated')
992 raise error.ReadOnlyPartError('part is being generated')
993 if name in self._seenparams:
993 if name in self._seenparams:
994 raise ValueError('duplicated params: %s' % name)
994 raise ValueError('duplicated params: %s' % name)
995 self._seenparams.add(name)
995 self._seenparams.add(name)
996 params = self._advisoryparams
996 params = self._advisoryparams
997 if mandatory:
997 if mandatory:
998 params = self._mandatoryparams
998 params = self._mandatoryparams
999 params.append((name, value))
999 params.append((name, value))
1000
1000
1001 # methods used to generates the bundle2 stream
1001 # methods used to generates the bundle2 stream
1002 def getchunks(self, ui):
1002 def getchunks(self, ui):
1003 if self._generated is not None:
1003 if self._generated is not None:
1004 raise error.ProgrammingError('part can only be consumed once')
1004 raise error.ProgrammingError('part can only be consumed once')
1005 self._generated = False
1005 self._generated = False
1006
1006
1007 if ui.debugflag:
1007 if ui.debugflag:
1008 msg = ['bundle2-output-part: "%s"' % self.type]
1008 msg = ['bundle2-output-part: "%s"' % self.type]
1009 if not self.mandatory:
1009 if not self.mandatory:
1010 msg.append(' (advisory)')
1010 msg.append(' (advisory)')
1011 nbmp = len(self.mandatoryparams)
1011 nbmp = len(self.mandatoryparams)
1012 nbap = len(self.advisoryparams)
1012 nbap = len(self.advisoryparams)
1013 if nbmp or nbap:
1013 if nbmp or nbap:
1014 msg.append(' (params:')
1014 msg.append(' (params:')
1015 if nbmp:
1015 if nbmp:
1016 msg.append(' %i mandatory' % nbmp)
1016 msg.append(' %i mandatory' % nbmp)
1017 if nbap:
1017 if nbap:
1018 msg.append(' %i advisory' % nbmp)
1018 msg.append(' %i advisory' % nbmp)
1019 msg.append(')')
1019 msg.append(')')
1020 if not self.data:
1020 if not self.data:
1021 msg.append(' empty payload')
1021 msg.append(' empty payload')
1022 elif (util.safehasattr(self.data, 'next')
1022 elif (util.safehasattr(self.data, 'next')
1023 or util.safehasattr(self.data, '__next__')):
1023 or util.safehasattr(self.data, '__next__')):
1024 msg.append(' streamed payload')
1024 msg.append(' streamed payload')
1025 else:
1025 else:
1026 msg.append(' %i bytes payload' % len(self.data))
1026 msg.append(' %i bytes payload' % len(self.data))
1027 msg.append('\n')
1027 msg.append('\n')
1028 ui.debug(''.join(msg))
1028 ui.debug(''.join(msg))
1029
1029
1030 #### header
1030 #### header
1031 if self.mandatory:
1031 if self.mandatory:
1032 parttype = self.type.upper()
1032 parttype = self.type.upper()
1033 else:
1033 else:
1034 parttype = self.type.lower()
1034 parttype = self.type.lower()
1035 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1035 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1036 ## parttype
1036 ## parttype
1037 header = [_pack(_fparttypesize, len(parttype)),
1037 header = [_pack(_fparttypesize, len(parttype)),
1038 parttype, _pack(_fpartid, self.id),
1038 parttype, _pack(_fpartid, self.id),
1039 ]
1039 ]
1040 ## parameters
1040 ## parameters
1041 # count
1041 # count
1042 manpar = self.mandatoryparams
1042 manpar = self.mandatoryparams
1043 advpar = self.advisoryparams
1043 advpar = self.advisoryparams
1044 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1044 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1045 # size
1045 # size
1046 parsizes = []
1046 parsizes = []
1047 for key, value in manpar:
1047 for key, value in manpar:
1048 parsizes.append(len(key))
1048 parsizes.append(len(key))
1049 parsizes.append(len(value))
1049 parsizes.append(len(value))
1050 for key, value in advpar:
1050 for key, value in advpar:
1051 parsizes.append(len(key))
1051 parsizes.append(len(key))
1052 parsizes.append(len(value))
1052 parsizes.append(len(value))
1053 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1053 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1054 header.append(paramsizes)
1054 header.append(paramsizes)
1055 # key, value
1055 # key, value
1056 for key, value in manpar:
1056 for key, value in manpar:
1057 header.append(key)
1057 header.append(key)
1058 header.append(value)
1058 header.append(value)
1059 for key, value in advpar:
1059 for key, value in advpar:
1060 header.append(key)
1060 header.append(key)
1061 header.append(value)
1061 header.append(value)
1062 ## finalize header
1062 ## finalize header
1063 try:
1063 try:
1064 headerchunk = ''.join(header)
1064 headerchunk = ''.join(header)
1065 except TypeError:
1065 except TypeError:
1066 raise TypeError(r'Found a non-bytes trying to '
1066 raise TypeError(r'Found a non-bytes trying to '
1067 r'build bundle part header: %r' % header)
1067 r'build bundle part header: %r' % header)
1068 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1068 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1069 yield _pack(_fpartheadersize, len(headerchunk))
1069 yield _pack(_fpartheadersize, len(headerchunk))
1070 yield headerchunk
1070 yield headerchunk
1071 ## payload
1071 ## payload
1072 try:
1072 try:
1073 for chunk in self._payloadchunks():
1073 for chunk in self._payloadchunks():
1074 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1074 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1075 yield _pack(_fpayloadsize, len(chunk))
1075 yield _pack(_fpayloadsize, len(chunk))
1076 yield chunk
1076 yield chunk
1077 except GeneratorExit:
1077 except GeneratorExit:
1078 # GeneratorExit means that nobody is listening for our
1078 # GeneratorExit means that nobody is listening for our
1079 # results anyway, so just bail quickly rather than trying
1079 # results anyway, so just bail quickly rather than trying
1080 # to produce an error part.
1080 # to produce an error part.
1081 ui.debug('bundle2-generatorexit\n')
1081 ui.debug('bundle2-generatorexit\n')
1082 raise
1082 raise
1083 except BaseException as exc:
1083 except BaseException as exc:
1084 bexc = util.forcebytestr(exc)
1084 bexc = util.forcebytestr(exc)
1085 # backup exception data for later
1085 # backup exception data for later
1086 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1086 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1087 % bexc)
1087 % bexc)
1088 tb = sys.exc_info()[2]
1088 tb = sys.exc_info()[2]
1089 msg = 'unexpected error: %s' % bexc
1089 msg = 'unexpected error: %s' % bexc
1090 interpart = bundlepart('error:abort', [('message', msg)],
1090 interpart = bundlepart('error:abort', [('message', msg)],
1091 mandatory=False)
1091 mandatory=False)
1092 interpart.id = 0
1092 interpart.id = 0
1093 yield _pack(_fpayloadsize, -1)
1093 yield _pack(_fpayloadsize, -1)
1094 for chunk in interpart.getchunks(ui=ui):
1094 for chunk in interpart.getchunks(ui=ui):
1095 yield chunk
1095 yield chunk
1096 outdebug(ui, 'closing payload chunk')
1096 outdebug(ui, 'closing payload chunk')
1097 # abort current part payload
1097 # abort current part payload
1098 yield _pack(_fpayloadsize, 0)
1098 yield _pack(_fpayloadsize, 0)
1099 pycompat.raisewithtb(exc, tb)
1099 pycompat.raisewithtb(exc, tb)
1100 # end of payload
1100 # end of payload
1101 outdebug(ui, 'closing payload chunk')
1101 outdebug(ui, 'closing payload chunk')
1102 yield _pack(_fpayloadsize, 0)
1102 yield _pack(_fpayloadsize, 0)
1103 self._generated = True
1103 self._generated = True
1104
1104
1105 def _payloadchunks(self):
1105 def _payloadchunks(self):
1106 """yield chunks of a the part payload
1106 """yield chunks of a the part payload
1107
1107
1108 Exists to handle the different methods to provide data to a part."""
1108 Exists to handle the different methods to provide data to a part."""
1109 # we only support fixed size data now.
1109 # we only support fixed size data now.
1110 # This will be improved in the future.
1110 # This will be improved in the future.
1111 if (util.safehasattr(self.data, 'next')
1111 if (util.safehasattr(self.data, 'next')
1112 or util.safehasattr(self.data, '__next__')):
1112 or util.safehasattr(self.data, '__next__')):
1113 buff = util.chunkbuffer(self.data)
1113 buff = util.chunkbuffer(self.data)
1114 chunk = buff.read(preferedchunksize)
1114 chunk = buff.read(preferedchunksize)
1115 while chunk:
1115 while chunk:
1116 yield chunk
1116 yield chunk
1117 chunk = buff.read(preferedchunksize)
1117 chunk = buff.read(preferedchunksize)
1118 elif len(self.data):
1118 elif len(self.data):
1119 yield self.data
1119 yield self.data
1120
1120
1121
1121
1122 flaginterrupt = -1
1122 flaginterrupt = -1
1123
1123
1124 class interrupthandler(unpackermixin):
1124 class interrupthandler(unpackermixin):
1125 """read one part and process it with restricted capability
1125 """read one part and process it with restricted capability
1126
1126
1127 This allows to transmit exception raised on the producer size during part
1127 This allows to transmit exception raised on the producer size during part
1128 iteration while the consumer is reading a part.
1128 iteration while the consumer is reading a part.
1129
1129
1130 Part processed in this manner only have access to a ui object,"""
1130 Part processed in this manner only have access to a ui object,"""
1131
1131
1132 def __init__(self, ui, fp):
1132 def __init__(self, ui, fp):
1133 super(interrupthandler, self).__init__(fp)
1133 super(interrupthandler, self).__init__(fp)
1134 self.ui = ui
1134 self.ui = ui
1135
1135
1136 def _readpartheader(self):
1136 def _readpartheader(self):
1137 """reads a part header size and return the bytes blob
1137 """reads a part header size and return the bytes blob
1138
1138
1139 returns None if empty"""
1139 returns None if empty"""
1140 headersize = self._unpack(_fpartheadersize)[0]
1140 headersize = self._unpack(_fpartheadersize)[0]
1141 if headersize < 0:
1141 if headersize < 0:
1142 raise error.BundleValueError('negative part header size: %i'
1142 raise error.BundleValueError('negative part header size: %i'
1143 % headersize)
1143 % headersize)
1144 indebug(self.ui, 'part header size: %i\n' % headersize)
1144 indebug(self.ui, 'part header size: %i\n' % headersize)
1145 if headersize:
1145 if headersize:
1146 return self._readexact(headersize)
1146 return self._readexact(headersize)
1147 return None
1147 return None
1148
1148
1149 def __call__(self):
1149 def __call__(self):
1150
1150
1151 self.ui.debug('bundle2-input-stream-interrupt:'
1151 self.ui.debug('bundle2-input-stream-interrupt:'
1152 ' opening out of band context\n')
1152 ' opening out of band context\n')
1153 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1153 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1154 headerblock = self._readpartheader()
1154 headerblock = self._readpartheader()
1155 if headerblock is None:
1155 if headerblock is None:
1156 indebug(self.ui, 'no part found during interruption.')
1156 indebug(self.ui, 'no part found during interruption.')
1157 return
1157 return
1158 part = unbundlepart(self.ui, headerblock, self._fp)
1158 part = unbundlepart(self.ui, headerblock, self._fp)
1159 op = interruptoperation(self.ui)
1159 op = interruptoperation(self.ui)
1160 hardabort = False
1160 hardabort = False
1161 try:
1161 try:
1162 _processpart(op, part)
1162 _processpart(op, part)
1163 except (SystemExit, KeyboardInterrupt):
1163 except (SystemExit, KeyboardInterrupt):
1164 hardabort = True
1164 hardabort = True
1165 raise
1165 raise
1166 finally:
1166 finally:
1167 if not hardabort:
1167 if not hardabort:
1168 part.consume()
1168 part.consume()
1169 self.ui.debug('bundle2-input-stream-interrupt:'
1169 self.ui.debug('bundle2-input-stream-interrupt:'
1170 ' closing out of band context\n')
1170 ' closing out of band context\n')
1171
1171
1172 class interruptoperation(object):
1172 class interruptoperation(object):
1173 """A limited operation to be use by part handler during interruption
1173 """A limited operation to be use by part handler during interruption
1174
1174
1175 It only have access to an ui object.
1175 It only have access to an ui object.
1176 """
1176 """
1177
1177
1178 def __init__(self, ui):
1178 def __init__(self, ui):
1179 self.ui = ui
1179 self.ui = ui
1180 self.reply = None
1180 self.reply = None
1181 self.captureoutput = False
1181 self.captureoutput = False
1182
1182
1183 @property
1183 @property
1184 def repo(self):
1184 def repo(self):
1185 raise error.ProgrammingError('no repo access from stream interruption')
1185 raise error.ProgrammingError('no repo access from stream interruption')
1186
1186
1187 def gettransaction(self):
1187 def gettransaction(self):
1188 raise TransactionUnavailable('no repo access from stream interruption')
1188 raise TransactionUnavailable('no repo access from stream interruption')
1189
1189
1190 def decodepayloadchunks(ui, fh):
1190 def decodepayloadchunks(ui, fh):
1191 """Reads bundle2 part payload data into chunks.
1191 """Reads bundle2 part payload data into chunks.
1192
1192
1193 Part payload data consists of framed chunks. This function takes
1193 Part payload data consists of framed chunks. This function takes
1194 a file handle and emits those chunks.
1194 a file handle and emits those chunks.
1195 """
1195 """
1196 dolog = ui.configbool('devel', 'bundle2.debug')
1196 dolog = ui.configbool('devel', 'bundle2.debug')
1197 debug = ui.debug
1197 debug = ui.debug
1198
1198
1199 headerstruct = struct.Struct(_fpayloadsize)
1199 headerstruct = struct.Struct(_fpayloadsize)
1200 headersize = headerstruct.size
1200 headersize = headerstruct.size
1201 unpack = headerstruct.unpack
1201 unpack = headerstruct.unpack
1202
1202
1203 readexactly = changegroup.readexactly
1203 readexactly = changegroup.readexactly
1204 read = fh.read
1204 read = fh.read
1205
1205
1206 chunksize = unpack(readexactly(fh, headersize))[0]
1206 chunksize = unpack(readexactly(fh, headersize))[0]
1207 indebug(ui, 'payload chunk size: %i' % chunksize)
1207 indebug(ui, 'payload chunk size: %i' % chunksize)
1208
1208
1209 # changegroup.readexactly() is inlined below for performance.
1209 # changegroup.readexactly() is inlined below for performance.
1210 while chunksize:
1210 while chunksize:
1211 if chunksize >= 0:
1211 if chunksize >= 0:
1212 s = read(chunksize)
1212 s = read(chunksize)
1213 if len(s) < chunksize:
1213 if len(s) < chunksize:
1214 raise error.Abort(_('stream ended unexpectedly '
1214 raise error.Abort(_('stream ended unexpectedly '
1215 ' (got %d bytes, expected %d)') %
1215 ' (got %d bytes, expected %d)') %
1216 (len(s), chunksize))
1216 (len(s), chunksize))
1217
1217
1218 yield s
1218 yield s
1219 elif chunksize == flaginterrupt:
1219 elif chunksize == flaginterrupt:
1220 # Interrupt "signal" detected. The regular stream is interrupted
1220 # Interrupt "signal" detected. The regular stream is interrupted
1221 # and a bundle2 part follows. Consume it.
1221 # and a bundle2 part follows. Consume it.
1222 interrupthandler(ui, fh)()
1222 interrupthandler(ui, fh)()
1223 else:
1223 else:
1224 raise error.BundleValueError(
1224 raise error.BundleValueError(
1225 'negative payload chunk size: %s' % chunksize)
1225 'negative payload chunk size: %s' % chunksize)
1226
1226
1227 s = read(headersize)
1227 s = read(headersize)
1228 if len(s) < headersize:
1228 if len(s) < headersize:
1229 raise error.Abort(_('stream ended unexpectedly '
1229 raise error.Abort(_('stream ended unexpectedly '
1230 ' (got %d bytes, expected %d)') %
1230 ' (got %d bytes, expected %d)') %
1231 (len(s), chunksize))
1231 (len(s), chunksize))
1232
1232
1233 chunksize = unpack(s)[0]
1233 chunksize = unpack(s)[0]
1234
1234
1235 # indebug() inlined for performance.
1235 # indebug() inlined for performance.
1236 if dolog:
1236 if dolog:
1237 debug('bundle2-input: payload chunk size: %i\n' % chunksize)
1237 debug('bundle2-input: payload chunk size: %i\n' % chunksize)
1238
1238
1239 class unbundlepart(unpackermixin):
1239 class unbundlepart(unpackermixin):
1240 """a bundle part read from a bundle"""
1240 """a bundle part read from a bundle"""
1241
1241
1242 def __init__(self, ui, header, fp):
1242 def __init__(self, ui, header, fp):
1243 super(unbundlepart, self).__init__(fp)
1243 super(unbundlepart, self).__init__(fp)
1244 self._seekable = (util.safehasattr(fp, 'seek') and
1244 self._seekable = (util.safehasattr(fp, 'seek') and
1245 util.safehasattr(fp, 'tell'))
1245 util.safehasattr(fp, 'tell'))
1246 self.ui = ui
1246 self.ui = ui
1247 # unbundle state attr
1247 # unbundle state attr
1248 self._headerdata = header
1248 self._headerdata = header
1249 self._headeroffset = 0
1249 self._headeroffset = 0
1250 self._initialized = False
1250 self._initialized = False
1251 self.consumed = False
1251 self.consumed = False
1252 # part data
1252 # part data
1253 self.id = None
1253 self.id = None
1254 self.type = None
1254 self.type = None
1255 self.mandatoryparams = None
1255 self.mandatoryparams = None
1256 self.advisoryparams = None
1256 self.advisoryparams = None
1257 self.params = None
1257 self.params = None
1258 self.mandatorykeys = ()
1258 self.mandatorykeys = ()
1259 self._readheader()
1259 self._readheader()
1260 self._mandatory = None
1260 self._mandatory = None
1261 self._pos = 0
1261 self._pos = 0
1262
1262
1263 def _fromheader(self, size):
1263 def _fromheader(self, size):
1264 """return the next <size> byte from the header"""
1264 """return the next <size> byte from the header"""
1265 offset = self._headeroffset
1265 offset = self._headeroffset
1266 data = self._headerdata[offset:(offset + size)]
1266 data = self._headerdata[offset:(offset + size)]
1267 self._headeroffset = offset + size
1267 self._headeroffset = offset + size
1268 return data
1268 return data
1269
1269
1270 def _unpackheader(self, format):
1270 def _unpackheader(self, format):
1271 """read given format from header
1271 """read given format from header
1272
1272
1273 This automatically compute the size of the format to read."""
1273 This automatically compute the size of the format to read."""
1274 data = self._fromheader(struct.calcsize(format))
1274 data = self._fromheader(struct.calcsize(format))
1275 return _unpack(format, data)
1275 return _unpack(format, data)
1276
1276
1277 def _initparams(self, mandatoryparams, advisoryparams):
1277 def _initparams(self, mandatoryparams, advisoryparams):
1278 """internal function to setup all logic related parameters"""
1278 """internal function to setup all logic related parameters"""
1279 # make it read only to prevent people touching it by mistake.
1279 # make it read only to prevent people touching it by mistake.
1280 self.mandatoryparams = tuple(mandatoryparams)
1280 self.mandatoryparams = tuple(mandatoryparams)
1281 self.advisoryparams = tuple(advisoryparams)
1281 self.advisoryparams = tuple(advisoryparams)
1282 # user friendly UI
1282 # user friendly UI
1283 self.params = util.sortdict(self.mandatoryparams)
1283 self.params = util.sortdict(self.mandatoryparams)
1284 self.params.update(self.advisoryparams)
1284 self.params.update(self.advisoryparams)
1285 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1285 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1286
1286
1287 def _readheader(self):
1287 def _readheader(self):
1288 """read the header and setup the object"""
1288 """read the header and setup the object"""
1289 typesize = self._unpackheader(_fparttypesize)[0]
1289 typesize = self._unpackheader(_fparttypesize)[0]
1290 self.type = self._fromheader(typesize)
1290 self.type = self._fromheader(typesize)
1291 indebug(self.ui, 'part type: "%s"' % self.type)
1291 indebug(self.ui, 'part type: "%s"' % self.type)
1292 self.id = self._unpackheader(_fpartid)[0]
1292 self.id = self._unpackheader(_fpartid)[0]
1293 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1293 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1294 # extract mandatory bit from type
1294 # extract mandatory bit from type
1295 self.mandatory = (self.type != self.type.lower())
1295 self.mandatory = (self.type != self.type.lower())
1296 self.type = self.type.lower()
1296 self.type = self.type.lower()
1297 ## reading parameters
1297 ## reading parameters
1298 # param count
1298 # param count
1299 mancount, advcount = self._unpackheader(_fpartparamcount)
1299 mancount, advcount = self._unpackheader(_fpartparamcount)
1300 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1300 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1301 # param size
1301 # param size
1302 fparamsizes = _makefpartparamsizes(mancount + advcount)
1302 fparamsizes = _makefpartparamsizes(mancount + advcount)
1303 paramsizes = self._unpackheader(fparamsizes)
1303 paramsizes = self._unpackheader(fparamsizes)
1304 # make it a list of couple again
1304 # make it a list of couple again
1305 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1305 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1306 # split mandatory from advisory
1306 # split mandatory from advisory
1307 mansizes = paramsizes[:mancount]
1307 mansizes = paramsizes[:mancount]
1308 advsizes = paramsizes[mancount:]
1308 advsizes = paramsizes[mancount:]
1309 # retrieve param value
1309 # retrieve param value
1310 manparams = []
1310 manparams = []
1311 for key, value in mansizes:
1311 for key, value in mansizes:
1312 manparams.append((self._fromheader(key), self._fromheader(value)))
1312 manparams.append((self._fromheader(key), self._fromheader(value)))
1313 advparams = []
1313 advparams = []
1314 for key, value in advsizes:
1314 for key, value in advsizes:
1315 advparams.append((self._fromheader(key), self._fromheader(value)))
1315 advparams.append((self._fromheader(key), self._fromheader(value)))
1316 self._initparams(manparams, advparams)
1316 self._initparams(manparams, advparams)
1317 ## part payload
1317 ## part payload
1318 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1318 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1319 # we read the data, tell it
1319 # we read the data, tell it
1320 self._initialized = True
1320 self._initialized = True
1321
1321
1322 def _payloadchunks(self):
1322 def _payloadchunks(self):
1323 """Generator of decoded chunks in the payload."""
1323 """Generator of decoded chunks in the payload."""
1324 return decodepayloadchunks(self.ui, self._fp)
1324 return decodepayloadchunks(self.ui, self._fp)
1325
1325
1326 def consume(self):
1326 def consume(self):
1327 """Read the part payload until completion.
1327 """Read the part payload until completion.
1328
1328
1329 By consuming the part data, the underlying stream read offset will
1329 By consuming the part data, the underlying stream read offset will
1330 be advanced to the next part (or end of stream).
1330 be advanced to the next part (or end of stream).
1331 """
1331 """
1332 if self.consumed:
1332 if self.consumed:
1333 return
1333 return
1334
1334
1335 chunk = self.read(32768)
1335 chunk = self.read(32768)
1336 while chunk:
1336 while chunk:
1337 self._pos += len(chunk)
1337 self._pos += len(chunk)
1338 chunk = self.read(32768)
1338 chunk = self.read(32768)
1339
1339
1340 def read(self, size=None):
1340 def read(self, size=None):
1341 """read payload data"""
1341 """read payload data"""
1342 if not self._initialized:
1342 if not self._initialized:
1343 self._readheader()
1343 self._readheader()
1344 if size is None:
1344 if size is None:
1345 data = self._payloadstream.read()
1345 data = self._payloadstream.read()
1346 else:
1346 else:
1347 data = self._payloadstream.read(size)
1347 data = self._payloadstream.read(size)
1348 self._pos += len(data)
1348 self._pos += len(data)
1349 if size is None or len(data) < size:
1349 if size is None or len(data) < size:
1350 if not self.consumed and self._pos:
1350 if not self.consumed and self._pos:
1351 self.ui.debug('bundle2-input-part: total payload size %i\n'
1351 self.ui.debug('bundle2-input-part: total payload size %i\n'
1352 % self._pos)
1352 % self._pos)
1353 self.consumed = True
1353 self.consumed = True
1354 return data
1354 return data
1355
1355
1356 class seekableunbundlepart(unbundlepart):
1356 class seekableunbundlepart(unbundlepart):
1357 """A bundle2 part in a bundle that is seekable.
1357 """A bundle2 part in a bundle that is seekable.
1358
1358
1359 Regular ``unbundlepart`` instances can only be read once. This class
1359 Regular ``unbundlepart`` instances can only be read once. This class
1360 extends ``unbundlepart`` to enable bi-directional seeking within the
1360 extends ``unbundlepart`` to enable bi-directional seeking within the
1361 part.
1361 part.
1362
1362
1363 Bundle2 part data consists of framed chunks. Offsets when seeking
1363 Bundle2 part data consists of framed chunks. Offsets when seeking
1364 refer to the decoded data, not the offsets in the underlying bundle2
1364 refer to the decoded data, not the offsets in the underlying bundle2
1365 stream.
1365 stream.
1366
1366
1367 To facilitate quickly seeking within the decoded data, instances of this
1367 To facilitate quickly seeking within the decoded data, instances of this
1368 class maintain a mapping between offsets in the underlying stream and
1368 class maintain a mapping between offsets in the underlying stream and
1369 the decoded payload. This mapping will consume memory in proportion
1369 the decoded payload. This mapping will consume memory in proportion
1370 to the number of chunks within the payload (which almost certainly
1370 to the number of chunks within the payload (which almost certainly
1371 increases in proportion with the size of the part).
1371 increases in proportion with the size of the part).
1372 """
1372 """
1373 def __init__(self, ui, header, fp):
1373 def __init__(self, ui, header, fp):
1374 # (payload, file) offsets for chunk starts.
1374 # (payload, file) offsets for chunk starts.
1375 self._chunkindex = []
1375 self._chunkindex = []
1376
1376
1377 super(seekableunbundlepart, self).__init__(ui, header, fp)
1377 super(seekableunbundlepart, self).__init__(ui, header, fp)
1378
1378
1379 def _payloadchunks(self, chunknum=0):
1379 def _payloadchunks(self, chunknum=0):
1380 '''seek to specified chunk and start yielding data'''
1380 '''seek to specified chunk and start yielding data'''
1381 if len(self._chunkindex) == 0:
1381 if len(self._chunkindex) == 0:
1382 assert chunknum == 0, 'Must start with chunk 0'
1382 assert chunknum == 0, 'Must start with chunk 0'
1383 self._chunkindex.append((0, self._tellfp()))
1383 self._chunkindex.append((0, self._tellfp()))
1384 else:
1384 else:
1385 assert chunknum < len(self._chunkindex), \
1385 assert chunknum < len(self._chunkindex), \
1386 'Unknown chunk %d' % chunknum
1386 'Unknown chunk %d' % chunknum
1387 self._seekfp(self._chunkindex[chunknum][1])
1387 self._seekfp(self._chunkindex[chunknum][1])
1388
1388
1389 pos = self._chunkindex[chunknum][0]
1389 pos = self._chunkindex[chunknum][0]
1390
1390
1391 for chunk in decodepayloadchunks(self.ui, self._fp):
1391 for chunk in decodepayloadchunks(self.ui, self._fp):
1392 chunknum += 1
1392 chunknum += 1
1393 pos += len(chunk)
1393 pos += len(chunk)
1394 if chunknum == len(self._chunkindex):
1394 if chunknum == len(self._chunkindex):
1395 self._chunkindex.append((pos, self._tellfp()))
1395 self._chunkindex.append((pos, self._tellfp()))
1396
1396
1397 yield chunk
1397 yield chunk
1398
1398
1399 def _findchunk(self, pos):
1399 def _findchunk(self, pos):
1400 '''for a given payload position, return a chunk number and offset'''
1400 '''for a given payload position, return a chunk number and offset'''
1401 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1401 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1402 if ppos == pos:
1402 if ppos == pos:
1403 return chunk, 0
1403 return chunk, 0
1404 elif ppos > pos:
1404 elif ppos > pos:
1405 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1405 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1406 raise ValueError('Unknown chunk')
1406 raise ValueError('Unknown chunk')
1407
1407
1408 def tell(self):
1408 def tell(self):
1409 return self._pos
1409 return self._pos
1410
1410
1411 def seek(self, offset, whence=os.SEEK_SET):
1411 def seek(self, offset, whence=os.SEEK_SET):
1412 if whence == os.SEEK_SET:
1412 if whence == os.SEEK_SET:
1413 newpos = offset
1413 newpos = offset
1414 elif whence == os.SEEK_CUR:
1414 elif whence == os.SEEK_CUR:
1415 newpos = self._pos + offset
1415 newpos = self._pos + offset
1416 elif whence == os.SEEK_END:
1416 elif whence == os.SEEK_END:
1417 if not self.consumed:
1417 if not self.consumed:
1418 self.read()
1418 # Can't use self.consume() here because it advances self._pos.
1419 chunk = self.read(32768)
1420 while chunk:
1421 chunk = self.read(32768)
1419 newpos = self._chunkindex[-1][0] - offset
1422 newpos = self._chunkindex[-1][0] - offset
1420 else:
1423 else:
1421 raise ValueError('Unknown whence value: %r' % (whence,))
1424 raise ValueError('Unknown whence value: %r' % (whence,))
1422
1425
1423 if newpos > self._chunkindex[-1][0] and not self.consumed:
1426 if newpos > self._chunkindex[-1][0] and not self.consumed:
1424 self.read()
1427 # Can't use self.consume() here because it advances self._pos.
1428 chunk = self.read(32768)
1429 while chunk:
1430 chunk = self.read(32668)
1431
1425 if not 0 <= newpos <= self._chunkindex[-1][0]:
1432 if not 0 <= newpos <= self._chunkindex[-1][0]:
1426 raise ValueError('Offset out of range')
1433 raise ValueError('Offset out of range')
1427
1434
1428 if self._pos != newpos:
1435 if self._pos != newpos:
1429 chunk, internaloffset = self._findchunk(newpos)
1436 chunk, internaloffset = self._findchunk(newpos)
1430 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1437 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1431 adjust = self.read(internaloffset)
1438 adjust = self.read(internaloffset)
1432 if len(adjust) != internaloffset:
1439 if len(adjust) != internaloffset:
1433 raise error.Abort(_('Seek failed\n'))
1440 raise error.Abort(_('Seek failed\n'))
1434 self._pos = newpos
1441 self._pos = newpos
1435
1442
1436 def _seekfp(self, offset, whence=0):
1443 def _seekfp(self, offset, whence=0):
1437 """move the underlying file pointer
1444 """move the underlying file pointer
1438
1445
1439 This method is meant for internal usage by the bundle2 protocol only.
1446 This method is meant for internal usage by the bundle2 protocol only.
1440 They directly manipulate the low level stream including bundle2 level
1447 They directly manipulate the low level stream including bundle2 level
1441 instruction.
1448 instruction.
1442
1449
1443 Do not use it to implement higher-level logic or methods."""
1450 Do not use it to implement higher-level logic or methods."""
1444 if self._seekable:
1451 if self._seekable:
1445 return self._fp.seek(offset, whence)
1452 return self._fp.seek(offset, whence)
1446 else:
1453 else:
1447 raise NotImplementedError(_('File pointer is not seekable'))
1454 raise NotImplementedError(_('File pointer is not seekable'))
1448
1455
1449 def _tellfp(self):
1456 def _tellfp(self):
1450 """return the file offset, or None if file is not seekable
1457 """return the file offset, or None if file is not seekable
1451
1458
1452 This method is meant for internal usage by the bundle2 protocol only.
1459 This method is meant for internal usage by the bundle2 protocol only.
1453 They directly manipulate the low level stream including bundle2 level
1460 They directly manipulate the low level stream including bundle2 level
1454 instruction.
1461 instruction.
1455
1462
1456 Do not use it to implement higher-level logic or methods."""
1463 Do not use it to implement higher-level logic or methods."""
1457 if self._seekable:
1464 if self._seekable:
1458 try:
1465 try:
1459 return self._fp.tell()
1466 return self._fp.tell()
1460 except IOError as e:
1467 except IOError as e:
1461 if e.errno == errno.ESPIPE:
1468 if e.errno == errno.ESPIPE:
1462 self._seekable = False
1469 self._seekable = False
1463 else:
1470 else:
1464 raise
1471 raise
1465 return None
1472 return None
1466
1473
1467 # These are only the static capabilities.
1474 # These are only the static capabilities.
1468 # Check the 'getrepocaps' function for the rest.
1475 # Check the 'getrepocaps' function for the rest.
1469 capabilities = {'HG20': (),
1476 capabilities = {'HG20': (),
1470 'error': ('abort', 'unsupportedcontent', 'pushraced',
1477 'error': ('abort', 'unsupportedcontent', 'pushraced',
1471 'pushkey'),
1478 'pushkey'),
1472 'listkeys': (),
1479 'listkeys': (),
1473 'pushkey': (),
1480 'pushkey': (),
1474 'digests': tuple(sorted(util.DIGESTS.keys())),
1481 'digests': tuple(sorted(util.DIGESTS.keys())),
1475 'remote-changegroup': ('http', 'https'),
1482 'remote-changegroup': ('http', 'https'),
1476 'hgtagsfnodes': (),
1483 'hgtagsfnodes': (),
1477 'phases': ('heads',),
1484 'phases': ('heads',),
1478 }
1485 }
1479
1486
1480 def getrepocaps(repo, allowpushback=False):
1487 def getrepocaps(repo, allowpushback=False):
1481 """return the bundle2 capabilities for a given repo
1488 """return the bundle2 capabilities for a given repo
1482
1489
1483 Exists to allow extensions (like evolution) to mutate the capabilities.
1490 Exists to allow extensions (like evolution) to mutate the capabilities.
1484 """
1491 """
1485 caps = capabilities.copy()
1492 caps = capabilities.copy()
1486 caps['changegroup'] = tuple(sorted(
1493 caps['changegroup'] = tuple(sorted(
1487 changegroup.supportedincomingversions(repo)))
1494 changegroup.supportedincomingversions(repo)))
1488 if obsolete.isenabled(repo, obsolete.exchangeopt):
1495 if obsolete.isenabled(repo, obsolete.exchangeopt):
1489 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1496 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1490 caps['obsmarkers'] = supportedformat
1497 caps['obsmarkers'] = supportedformat
1491 if allowpushback:
1498 if allowpushback:
1492 caps['pushback'] = ()
1499 caps['pushback'] = ()
1493 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1500 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1494 if cpmode == 'check-related':
1501 if cpmode == 'check-related':
1495 caps['checkheads'] = ('related',)
1502 caps['checkheads'] = ('related',)
1496 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1503 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1497 caps.pop('phases')
1504 caps.pop('phases')
1498 return caps
1505 return caps
1499
1506
1500 def bundle2caps(remote):
1507 def bundle2caps(remote):
1501 """return the bundle capabilities of a peer as dict"""
1508 """return the bundle capabilities of a peer as dict"""
1502 raw = remote.capable('bundle2')
1509 raw = remote.capable('bundle2')
1503 if not raw and raw != '':
1510 if not raw and raw != '':
1504 return {}
1511 return {}
1505 capsblob = urlreq.unquote(remote.capable('bundle2'))
1512 capsblob = urlreq.unquote(remote.capable('bundle2'))
1506 return decodecaps(capsblob)
1513 return decodecaps(capsblob)
1507
1514
1508 def obsmarkersversion(caps):
1515 def obsmarkersversion(caps):
1509 """extract the list of supported obsmarkers versions from a bundle2caps dict
1516 """extract the list of supported obsmarkers versions from a bundle2caps dict
1510 """
1517 """
1511 obscaps = caps.get('obsmarkers', ())
1518 obscaps = caps.get('obsmarkers', ())
1512 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1519 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1513
1520
1514 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1521 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1515 vfs=None, compression=None, compopts=None):
1522 vfs=None, compression=None, compopts=None):
1516 if bundletype.startswith('HG10'):
1523 if bundletype.startswith('HG10'):
1517 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1524 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1518 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1525 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1519 compression=compression, compopts=compopts)
1526 compression=compression, compopts=compopts)
1520 elif not bundletype.startswith('HG20'):
1527 elif not bundletype.startswith('HG20'):
1521 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1528 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1522
1529
1523 caps = {}
1530 caps = {}
1524 if 'obsolescence' in opts:
1531 if 'obsolescence' in opts:
1525 caps['obsmarkers'] = ('V1',)
1532 caps['obsmarkers'] = ('V1',)
1526 bundle = bundle20(ui, caps)
1533 bundle = bundle20(ui, caps)
1527 bundle.setcompression(compression, compopts)
1534 bundle.setcompression(compression, compopts)
1528 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1535 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1529 chunkiter = bundle.getchunks()
1536 chunkiter = bundle.getchunks()
1530
1537
1531 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1538 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1532
1539
1533 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1540 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1534 # We should eventually reconcile this logic with the one behind
1541 # We should eventually reconcile this logic with the one behind
1535 # 'exchange.getbundle2partsgenerator'.
1542 # 'exchange.getbundle2partsgenerator'.
1536 #
1543 #
1537 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1544 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1538 # different right now. So we keep them separated for now for the sake of
1545 # different right now. So we keep them separated for now for the sake of
1539 # simplicity.
1546 # simplicity.
1540
1547
1541 # we always want a changegroup in such bundle
1548 # we always want a changegroup in such bundle
1542 cgversion = opts.get('cg.version')
1549 cgversion = opts.get('cg.version')
1543 if cgversion is None:
1550 if cgversion is None:
1544 cgversion = changegroup.safeversion(repo)
1551 cgversion = changegroup.safeversion(repo)
1545 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1552 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1546 part = bundler.newpart('changegroup', data=cg.getchunks())
1553 part = bundler.newpart('changegroup', data=cg.getchunks())
1547 part.addparam('version', cg.version)
1554 part.addparam('version', cg.version)
1548 if 'clcount' in cg.extras:
1555 if 'clcount' in cg.extras:
1549 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1556 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1550 mandatory=False)
1557 mandatory=False)
1551 if opts.get('phases') and repo.revs('%ln and secret()',
1558 if opts.get('phases') and repo.revs('%ln and secret()',
1552 outgoing.missingheads):
1559 outgoing.missingheads):
1553 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1560 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1554
1561
1555 addparttagsfnodescache(repo, bundler, outgoing)
1562 addparttagsfnodescache(repo, bundler, outgoing)
1556
1563
1557 if opts.get('obsolescence', False):
1564 if opts.get('obsolescence', False):
1558 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1565 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1559 buildobsmarkerspart(bundler, obsmarkers)
1566 buildobsmarkerspart(bundler, obsmarkers)
1560
1567
1561 if opts.get('phases', False):
1568 if opts.get('phases', False):
1562 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1569 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1563 phasedata = phases.binaryencode(headsbyphase)
1570 phasedata = phases.binaryencode(headsbyphase)
1564 bundler.newpart('phase-heads', data=phasedata)
1571 bundler.newpart('phase-heads', data=phasedata)
1565
1572
1566 def addparttagsfnodescache(repo, bundler, outgoing):
1573 def addparttagsfnodescache(repo, bundler, outgoing):
1567 # we include the tags fnode cache for the bundle changeset
1574 # we include the tags fnode cache for the bundle changeset
1568 # (as an optional parts)
1575 # (as an optional parts)
1569 cache = tags.hgtagsfnodescache(repo.unfiltered())
1576 cache = tags.hgtagsfnodescache(repo.unfiltered())
1570 chunks = []
1577 chunks = []
1571
1578
1572 # .hgtags fnodes are only relevant for head changesets. While we could
1579 # .hgtags fnodes are only relevant for head changesets. While we could
1573 # transfer values for all known nodes, there will likely be little to
1580 # transfer values for all known nodes, there will likely be little to
1574 # no benefit.
1581 # no benefit.
1575 #
1582 #
1576 # We don't bother using a generator to produce output data because
1583 # We don't bother using a generator to produce output data because
1577 # a) we only have 40 bytes per head and even esoteric numbers of heads
1584 # a) we only have 40 bytes per head and even esoteric numbers of heads
1578 # consume little memory (1M heads is 40MB) b) we don't want to send the
1585 # consume little memory (1M heads is 40MB) b) we don't want to send the
1579 # part if we don't have entries and knowing if we have entries requires
1586 # part if we don't have entries and knowing if we have entries requires
1580 # cache lookups.
1587 # cache lookups.
1581 for node in outgoing.missingheads:
1588 for node in outgoing.missingheads:
1582 # Don't compute missing, as this may slow down serving.
1589 # Don't compute missing, as this may slow down serving.
1583 fnode = cache.getfnode(node, computemissing=False)
1590 fnode = cache.getfnode(node, computemissing=False)
1584 if fnode is not None:
1591 if fnode is not None:
1585 chunks.extend([node, fnode])
1592 chunks.extend([node, fnode])
1586
1593
1587 if chunks:
1594 if chunks:
1588 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1595 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1589
1596
1590 def buildobsmarkerspart(bundler, markers):
1597 def buildobsmarkerspart(bundler, markers):
1591 """add an obsmarker part to the bundler with <markers>
1598 """add an obsmarker part to the bundler with <markers>
1592
1599
1593 No part is created if markers is empty.
1600 No part is created if markers is empty.
1594 Raises ValueError if the bundler doesn't support any known obsmarker format.
1601 Raises ValueError if the bundler doesn't support any known obsmarker format.
1595 """
1602 """
1596 if not markers:
1603 if not markers:
1597 return None
1604 return None
1598
1605
1599 remoteversions = obsmarkersversion(bundler.capabilities)
1606 remoteversions = obsmarkersversion(bundler.capabilities)
1600 version = obsolete.commonversion(remoteversions)
1607 version = obsolete.commonversion(remoteversions)
1601 if version is None:
1608 if version is None:
1602 raise ValueError('bundler does not support common obsmarker format')
1609 raise ValueError('bundler does not support common obsmarker format')
1603 stream = obsolete.encodemarkers(markers, True, version=version)
1610 stream = obsolete.encodemarkers(markers, True, version=version)
1604 return bundler.newpart('obsmarkers', data=stream)
1611 return bundler.newpart('obsmarkers', data=stream)
1605
1612
1606 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1613 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1607 compopts=None):
1614 compopts=None):
1608 """Write a bundle file and return its filename.
1615 """Write a bundle file and return its filename.
1609
1616
1610 Existing files will not be overwritten.
1617 Existing files will not be overwritten.
1611 If no filename is specified, a temporary file is created.
1618 If no filename is specified, a temporary file is created.
1612 bz2 compression can be turned off.
1619 bz2 compression can be turned off.
1613 The bundle file will be deleted in case of errors.
1620 The bundle file will be deleted in case of errors.
1614 """
1621 """
1615
1622
1616 if bundletype == "HG20":
1623 if bundletype == "HG20":
1617 bundle = bundle20(ui)
1624 bundle = bundle20(ui)
1618 bundle.setcompression(compression, compopts)
1625 bundle.setcompression(compression, compopts)
1619 part = bundle.newpart('changegroup', data=cg.getchunks())
1626 part = bundle.newpart('changegroup', data=cg.getchunks())
1620 part.addparam('version', cg.version)
1627 part.addparam('version', cg.version)
1621 if 'clcount' in cg.extras:
1628 if 'clcount' in cg.extras:
1622 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1629 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1623 mandatory=False)
1630 mandatory=False)
1624 chunkiter = bundle.getchunks()
1631 chunkiter = bundle.getchunks()
1625 else:
1632 else:
1626 # compression argument is only for the bundle2 case
1633 # compression argument is only for the bundle2 case
1627 assert compression is None
1634 assert compression is None
1628 if cg.version != '01':
1635 if cg.version != '01':
1629 raise error.Abort(_('old bundle types only supports v1 '
1636 raise error.Abort(_('old bundle types only supports v1 '
1630 'changegroups'))
1637 'changegroups'))
1631 header, comp = bundletypes[bundletype]
1638 header, comp = bundletypes[bundletype]
1632 if comp not in util.compengines.supportedbundletypes:
1639 if comp not in util.compengines.supportedbundletypes:
1633 raise error.Abort(_('unknown stream compression type: %s')
1640 raise error.Abort(_('unknown stream compression type: %s')
1634 % comp)
1641 % comp)
1635 compengine = util.compengines.forbundletype(comp)
1642 compengine = util.compengines.forbundletype(comp)
1636 def chunkiter():
1643 def chunkiter():
1637 yield header
1644 yield header
1638 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1645 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1639 yield chunk
1646 yield chunk
1640 chunkiter = chunkiter()
1647 chunkiter = chunkiter()
1641
1648
1642 # parse the changegroup data, otherwise we will block
1649 # parse the changegroup data, otherwise we will block
1643 # in case of sshrepo because we don't know the end of the stream
1650 # in case of sshrepo because we don't know the end of the stream
1644 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1651 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1645
1652
1646 def combinechangegroupresults(op):
1653 def combinechangegroupresults(op):
1647 """logic to combine 0 or more addchangegroup results into one"""
1654 """logic to combine 0 or more addchangegroup results into one"""
1648 results = [r.get('return', 0)
1655 results = [r.get('return', 0)
1649 for r in op.records['changegroup']]
1656 for r in op.records['changegroup']]
1650 changedheads = 0
1657 changedheads = 0
1651 result = 1
1658 result = 1
1652 for ret in results:
1659 for ret in results:
1653 # If any changegroup result is 0, return 0
1660 # If any changegroup result is 0, return 0
1654 if ret == 0:
1661 if ret == 0:
1655 result = 0
1662 result = 0
1656 break
1663 break
1657 if ret < -1:
1664 if ret < -1:
1658 changedheads += ret + 1
1665 changedheads += ret + 1
1659 elif ret > 1:
1666 elif ret > 1:
1660 changedheads += ret - 1
1667 changedheads += ret - 1
1661 if changedheads > 0:
1668 if changedheads > 0:
1662 result = 1 + changedheads
1669 result = 1 + changedheads
1663 elif changedheads < 0:
1670 elif changedheads < 0:
1664 result = -1 + changedheads
1671 result = -1 + changedheads
1665 return result
1672 return result
1666
1673
1667 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1674 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1668 'targetphase'))
1675 'targetphase'))
1669 def handlechangegroup(op, inpart):
1676 def handlechangegroup(op, inpart):
1670 """apply a changegroup part on the repo
1677 """apply a changegroup part on the repo
1671
1678
1672 This is a very early implementation that will massive rework before being
1679 This is a very early implementation that will massive rework before being
1673 inflicted to any end-user.
1680 inflicted to any end-user.
1674 """
1681 """
1675 tr = op.gettransaction()
1682 tr = op.gettransaction()
1676 unpackerversion = inpart.params.get('version', '01')
1683 unpackerversion = inpart.params.get('version', '01')
1677 # We should raise an appropriate exception here
1684 # We should raise an appropriate exception here
1678 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1685 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1679 # the source and url passed here are overwritten by the one contained in
1686 # the source and url passed here are overwritten by the one contained in
1680 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1687 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1681 nbchangesets = None
1688 nbchangesets = None
1682 if 'nbchanges' in inpart.params:
1689 if 'nbchanges' in inpart.params:
1683 nbchangesets = int(inpart.params.get('nbchanges'))
1690 nbchangesets = int(inpart.params.get('nbchanges'))
1684 if ('treemanifest' in inpart.params and
1691 if ('treemanifest' in inpart.params and
1685 'treemanifest' not in op.repo.requirements):
1692 'treemanifest' not in op.repo.requirements):
1686 if len(op.repo.changelog) != 0:
1693 if len(op.repo.changelog) != 0:
1687 raise error.Abort(_(
1694 raise error.Abort(_(
1688 "bundle contains tree manifests, but local repo is "
1695 "bundle contains tree manifests, but local repo is "
1689 "non-empty and does not use tree manifests"))
1696 "non-empty and does not use tree manifests"))
1690 op.repo.requirements.add('treemanifest')
1697 op.repo.requirements.add('treemanifest')
1691 op.repo._applyopenerreqs()
1698 op.repo._applyopenerreqs()
1692 op.repo._writerequirements()
1699 op.repo._writerequirements()
1693 extrakwargs = {}
1700 extrakwargs = {}
1694 targetphase = inpart.params.get('targetphase')
1701 targetphase = inpart.params.get('targetphase')
1695 if targetphase is not None:
1702 if targetphase is not None:
1696 extrakwargs['targetphase'] = int(targetphase)
1703 extrakwargs['targetphase'] = int(targetphase)
1697 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1704 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1698 expectedtotal=nbchangesets, **extrakwargs)
1705 expectedtotal=nbchangesets, **extrakwargs)
1699 if op.reply is not None:
1706 if op.reply is not None:
1700 # This is definitely not the final form of this
1707 # This is definitely not the final form of this
1701 # return. But one need to start somewhere.
1708 # return. But one need to start somewhere.
1702 part = op.reply.newpart('reply:changegroup', mandatory=False)
1709 part = op.reply.newpart('reply:changegroup', mandatory=False)
1703 part.addparam(
1710 part.addparam(
1704 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1711 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1705 part.addparam('return', '%i' % ret, mandatory=False)
1712 part.addparam('return', '%i' % ret, mandatory=False)
1706 assert not inpart.read()
1713 assert not inpart.read()
1707
1714
1708 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1715 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1709 ['digest:%s' % k for k in util.DIGESTS.keys()])
1716 ['digest:%s' % k for k in util.DIGESTS.keys()])
1710 @parthandler('remote-changegroup', _remotechangegroupparams)
1717 @parthandler('remote-changegroup', _remotechangegroupparams)
1711 def handleremotechangegroup(op, inpart):
1718 def handleremotechangegroup(op, inpart):
1712 """apply a bundle10 on the repo, given an url and validation information
1719 """apply a bundle10 on the repo, given an url and validation information
1713
1720
1714 All the information about the remote bundle to import are given as
1721 All the information about the remote bundle to import are given as
1715 parameters. The parameters include:
1722 parameters. The parameters include:
1716 - url: the url to the bundle10.
1723 - url: the url to the bundle10.
1717 - size: the bundle10 file size. It is used to validate what was
1724 - size: the bundle10 file size. It is used to validate what was
1718 retrieved by the client matches the server knowledge about the bundle.
1725 retrieved by the client matches the server knowledge about the bundle.
1719 - digests: a space separated list of the digest types provided as
1726 - digests: a space separated list of the digest types provided as
1720 parameters.
1727 parameters.
1721 - digest:<digest-type>: the hexadecimal representation of the digest with
1728 - digest:<digest-type>: the hexadecimal representation of the digest with
1722 that name. Like the size, it is used to validate what was retrieved by
1729 that name. Like the size, it is used to validate what was retrieved by
1723 the client matches what the server knows about the bundle.
1730 the client matches what the server knows about the bundle.
1724
1731
1725 When multiple digest types are given, all of them are checked.
1732 When multiple digest types are given, all of them are checked.
1726 """
1733 """
1727 try:
1734 try:
1728 raw_url = inpart.params['url']
1735 raw_url = inpart.params['url']
1729 except KeyError:
1736 except KeyError:
1730 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1737 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1731 parsed_url = util.url(raw_url)
1738 parsed_url = util.url(raw_url)
1732 if parsed_url.scheme not in capabilities['remote-changegroup']:
1739 if parsed_url.scheme not in capabilities['remote-changegroup']:
1733 raise error.Abort(_('remote-changegroup does not support %s urls') %
1740 raise error.Abort(_('remote-changegroup does not support %s urls') %
1734 parsed_url.scheme)
1741 parsed_url.scheme)
1735
1742
1736 try:
1743 try:
1737 size = int(inpart.params['size'])
1744 size = int(inpart.params['size'])
1738 except ValueError:
1745 except ValueError:
1739 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1746 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1740 % 'size')
1747 % 'size')
1741 except KeyError:
1748 except KeyError:
1742 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1749 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1743
1750
1744 digests = {}
1751 digests = {}
1745 for typ in inpart.params.get('digests', '').split():
1752 for typ in inpart.params.get('digests', '').split():
1746 param = 'digest:%s' % typ
1753 param = 'digest:%s' % typ
1747 try:
1754 try:
1748 value = inpart.params[param]
1755 value = inpart.params[param]
1749 except KeyError:
1756 except KeyError:
1750 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1757 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1751 param)
1758 param)
1752 digests[typ] = value
1759 digests[typ] = value
1753
1760
1754 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1761 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1755
1762
1756 tr = op.gettransaction()
1763 tr = op.gettransaction()
1757 from . import exchange
1764 from . import exchange
1758 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1765 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1759 if not isinstance(cg, changegroup.cg1unpacker):
1766 if not isinstance(cg, changegroup.cg1unpacker):
1760 raise error.Abort(_('%s: not a bundle version 1.0') %
1767 raise error.Abort(_('%s: not a bundle version 1.0') %
1761 util.hidepassword(raw_url))
1768 util.hidepassword(raw_url))
1762 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1769 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1763 if op.reply is not None:
1770 if op.reply is not None:
1764 # This is definitely not the final form of this
1771 # This is definitely not the final form of this
1765 # return. But one need to start somewhere.
1772 # return. But one need to start somewhere.
1766 part = op.reply.newpart('reply:changegroup')
1773 part = op.reply.newpart('reply:changegroup')
1767 part.addparam(
1774 part.addparam(
1768 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1775 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1769 part.addparam('return', '%i' % ret, mandatory=False)
1776 part.addparam('return', '%i' % ret, mandatory=False)
1770 try:
1777 try:
1771 real_part.validate()
1778 real_part.validate()
1772 except error.Abort as e:
1779 except error.Abort as e:
1773 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1780 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1774 (util.hidepassword(raw_url), str(e)))
1781 (util.hidepassword(raw_url), str(e)))
1775 assert not inpart.read()
1782 assert not inpart.read()
1776
1783
1777 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1784 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1778 def handlereplychangegroup(op, inpart):
1785 def handlereplychangegroup(op, inpart):
1779 ret = int(inpart.params['return'])
1786 ret = int(inpart.params['return'])
1780 replyto = int(inpart.params['in-reply-to'])
1787 replyto = int(inpart.params['in-reply-to'])
1781 op.records.add('changegroup', {'return': ret}, replyto)
1788 op.records.add('changegroup', {'return': ret}, replyto)
1782
1789
1783 @parthandler('check:heads')
1790 @parthandler('check:heads')
1784 def handlecheckheads(op, inpart):
1791 def handlecheckheads(op, inpart):
1785 """check that head of the repo did not change
1792 """check that head of the repo did not change
1786
1793
1787 This is used to detect a push race when using unbundle.
1794 This is used to detect a push race when using unbundle.
1788 This replaces the "heads" argument of unbundle."""
1795 This replaces the "heads" argument of unbundle."""
1789 h = inpart.read(20)
1796 h = inpart.read(20)
1790 heads = []
1797 heads = []
1791 while len(h) == 20:
1798 while len(h) == 20:
1792 heads.append(h)
1799 heads.append(h)
1793 h = inpart.read(20)
1800 h = inpart.read(20)
1794 assert not h
1801 assert not h
1795 # Trigger a transaction so that we are guaranteed to have the lock now.
1802 # Trigger a transaction so that we are guaranteed to have the lock now.
1796 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1803 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1797 op.gettransaction()
1804 op.gettransaction()
1798 if sorted(heads) != sorted(op.repo.heads()):
1805 if sorted(heads) != sorted(op.repo.heads()):
1799 raise error.PushRaced('repository changed while pushing - '
1806 raise error.PushRaced('repository changed while pushing - '
1800 'please try again')
1807 'please try again')
1801
1808
1802 @parthandler('check:updated-heads')
1809 @parthandler('check:updated-heads')
1803 def handlecheckupdatedheads(op, inpart):
1810 def handlecheckupdatedheads(op, inpart):
1804 """check for race on the heads touched by a push
1811 """check for race on the heads touched by a push
1805
1812
1806 This is similar to 'check:heads' but focus on the heads actually updated
1813 This is similar to 'check:heads' but focus on the heads actually updated
1807 during the push. If other activities happen on unrelated heads, it is
1814 during the push. If other activities happen on unrelated heads, it is
1808 ignored.
1815 ignored.
1809
1816
1810 This allow server with high traffic to avoid push contention as long as
1817 This allow server with high traffic to avoid push contention as long as
1811 unrelated parts of the graph are involved."""
1818 unrelated parts of the graph are involved."""
1812 h = inpart.read(20)
1819 h = inpart.read(20)
1813 heads = []
1820 heads = []
1814 while len(h) == 20:
1821 while len(h) == 20:
1815 heads.append(h)
1822 heads.append(h)
1816 h = inpart.read(20)
1823 h = inpart.read(20)
1817 assert not h
1824 assert not h
1818 # trigger a transaction so that we are guaranteed to have the lock now.
1825 # trigger a transaction so that we are guaranteed to have the lock now.
1819 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1826 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1820 op.gettransaction()
1827 op.gettransaction()
1821
1828
1822 currentheads = set()
1829 currentheads = set()
1823 for ls in op.repo.branchmap().itervalues():
1830 for ls in op.repo.branchmap().itervalues():
1824 currentheads.update(ls)
1831 currentheads.update(ls)
1825
1832
1826 for h in heads:
1833 for h in heads:
1827 if h not in currentheads:
1834 if h not in currentheads:
1828 raise error.PushRaced('repository changed while pushing - '
1835 raise error.PushRaced('repository changed while pushing - '
1829 'please try again')
1836 'please try again')
1830
1837
1831 @parthandler('check:phases')
1838 @parthandler('check:phases')
1832 def handlecheckphases(op, inpart):
1839 def handlecheckphases(op, inpart):
1833 """check that phase boundaries of the repository did not change
1840 """check that phase boundaries of the repository did not change
1834
1841
1835 This is used to detect a push race.
1842 This is used to detect a push race.
1836 """
1843 """
1837 phasetonodes = phases.binarydecode(inpart)
1844 phasetonodes = phases.binarydecode(inpart)
1838 unfi = op.repo.unfiltered()
1845 unfi = op.repo.unfiltered()
1839 cl = unfi.changelog
1846 cl = unfi.changelog
1840 phasecache = unfi._phasecache
1847 phasecache = unfi._phasecache
1841 msg = ('repository changed while pushing - please try again '
1848 msg = ('repository changed while pushing - please try again '
1842 '(%s is %s expected %s)')
1849 '(%s is %s expected %s)')
1843 for expectedphase, nodes in enumerate(phasetonodes):
1850 for expectedphase, nodes in enumerate(phasetonodes):
1844 for n in nodes:
1851 for n in nodes:
1845 actualphase = phasecache.phase(unfi, cl.rev(n))
1852 actualphase = phasecache.phase(unfi, cl.rev(n))
1846 if actualphase != expectedphase:
1853 if actualphase != expectedphase:
1847 finalmsg = msg % (nodemod.short(n),
1854 finalmsg = msg % (nodemod.short(n),
1848 phases.phasenames[actualphase],
1855 phases.phasenames[actualphase],
1849 phases.phasenames[expectedphase])
1856 phases.phasenames[expectedphase])
1850 raise error.PushRaced(finalmsg)
1857 raise error.PushRaced(finalmsg)
1851
1858
1852 @parthandler('output')
1859 @parthandler('output')
1853 def handleoutput(op, inpart):
1860 def handleoutput(op, inpart):
1854 """forward output captured on the server to the client"""
1861 """forward output captured on the server to the client"""
1855 for line in inpart.read().splitlines():
1862 for line in inpart.read().splitlines():
1856 op.ui.status(_('remote: %s\n') % line)
1863 op.ui.status(_('remote: %s\n') % line)
1857
1864
1858 @parthandler('replycaps')
1865 @parthandler('replycaps')
1859 def handlereplycaps(op, inpart):
1866 def handlereplycaps(op, inpart):
1860 """Notify that a reply bundle should be created
1867 """Notify that a reply bundle should be created
1861
1868
1862 The payload contains the capabilities information for the reply"""
1869 The payload contains the capabilities information for the reply"""
1863 caps = decodecaps(inpart.read())
1870 caps = decodecaps(inpart.read())
1864 if op.reply is None:
1871 if op.reply is None:
1865 op.reply = bundle20(op.ui, caps)
1872 op.reply = bundle20(op.ui, caps)
1866
1873
1867 class AbortFromPart(error.Abort):
1874 class AbortFromPart(error.Abort):
1868 """Sub-class of Abort that denotes an error from a bundle2 part."""
1875 """Sub-class of Abort that denotes an error from a bundle2 part."""
1869
1876
1870 @parthandler('error:abort', ('message', 'hint'))
1877 @parthandler('error:abort', ('message', 'hint'))
1871 def handleerrorabort(op, inpart):
1878 def handleerrorabort(op, inpart):
1872 """Used to transmit abort error over the wire"""
1879 """Used to transmit abort error over the wire"""
1873 raise AbortFromPart(inpart.params['message'],
1880 raise AbortFromPart(inpart.params['message'],
1874 hint=inpart.params.get('hint'))
1881 hint=inpart.params.get('hint'))
1875
1882
1876 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1883 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1877 'in-reply-to'))
1884 'in-reply-to'))
1878 def handleerrorpushkey(op, inpart):
1885 def handleerrorpushkey(op, inpart):
1879 """Used to transmit failure of a mandatory pushkey over the wire"""
1886 """Used to transmit failure of a mandatory pushkey over the wire"""
1880 kwargs = {}
1887 kwargs = {}
1881 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1888 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1882 value = inpart.params.get(name)
1889 value = inpart.params.get(name)
1883 if value is not None:
1890 if value is not None:
1884 kwargs[name] = value
1891 kwargs[name] = value
1885 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1892 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1886
1893
1887 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1894 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1888 def handleerrorunsupportedcontent(op, inpart):
1895 def handleerrorunsupportedcontent(op, inpart):
1889 """Used to transmit unknown content error over the wire"""
1896 """Used to transmit unknown content error over the wire"""
1890 kwargs = {}
1897 kwargs = {}
1891 parttype = inpart.params.get('parttype')
1898 parttype = inpart.params.get('parttype')
1892 if parttype is not None:
1899 if parttype is not None:
1893 kwargs['parttype'] = parttype
1900 kwargs['parttype'] = parttype
1894 params = inpart.params.get('params')
1901 params = inpart.params.get('params')
1895 if params is not None:
1902 if params is not None:
1896 kwargs['params'] = params.split('\0')
1903 kwargs['params'] = params.split('\0')
1897
1904
1898 raise error.BundleUnknownFeatureError(**kwargs)
1905 raise error.BundleUnknownFeatureError(**kwargs)
1899
1906
1900 @parthandler('error:pushraced', ('message',))
1907 @parthandler('error:pushraced', ('message',))
1901 def handleerrorpushraced(op, inpart):
1908 def handleerrorpushraced(op, inpart):
1902 """Used to transmit push race error over the wire"""
1909 """Used to transmit push race error over the wire"""
1903 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1910 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1904
1911
1905 @parthandler('listkeys', ('namespace',))
1912 @parthandler('listkeys', ('namespace',))
1906 def handlelistkeys(op, inpart):
1913 def handlelistkeys(op, inpart):
1907 """retrieve pushkey namespace content stored in a bundle2"""
1914 """retrieve pushkey namespace content stored in a bundle2"""
1908 namespace = inpart.params['namespace']
1915 namespace = inpart.params['namespace']
1909 r = pushkey.decodekeys(inpart.read())
1916 r = pushkey.decodekeys(inpart.read())
1910 op.records.add('listkeys', (namespace, r))
1917 op.records.add('listkeys', (namespace, r))
1911
1918
1912 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1919 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1913 def handlepushkey(op, inpart):
1920 def handlepushkey(op, inpart):
1914 """process a pushkey request"""
1921 """process a pushkey request"""
1915 dec = pushkey.decode
1922 dec = pushkey.decode
1916 namespace = dec(inpart.params['namespace'])
1923 namespace = dec(inpart.params['namespace'])
1917 key = dec(inpart.params['key'])
1924 key = dec(inpart.params['key'])
1918 old = dec(inpart.params['old'])
1925 old = dec(inpart.params['old'])
1919 new = dec(inpart.params['new'])
1926 new = dec(inpart.params['new'])
1920 # Grab the transaction to ensure that we have the lock before performing the
1927 # Grab the transaction to ensure that we have the lock before performing the
1921 # pushkey.
1928 # pushkey.
1922 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1929 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1923 op.gettransaction()
1930 op.gettransaction()
1924 ret = op.repo.pushkey(namespace, key, old, new)
1931 ret = op.repo.pushkey(namespace, key, old, new)
1925 record = {'namespace': namespace,
1932 record = {'namespace': namespace,
1926 'key': key,
1933 'key': key,
1927 'old': old,
1934 'old': old,
1928 'new': new}
1935 'new': new}
1929 op.records.add('pushkey', record)
1936 op.records.add('pushkey', record)
1930 if op.reply is not None:
1937 if op.reply is not None:
1931 rpart = op.reply.newpart('reply:pushkey')
1938 rpart = op.reply.newpart('reply:pushkey')
1932 rpart.addparam(
1939 rpart.addparam(
1933 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1940 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1934 rpart.addparam('return', '%i' % ret, mandatory=False)
1941 rpart.addparam('return', '%i' % ret, mandatory=False)
1935 if inpart.mandatory and not ret:
1942 if inpart.mandatory and not ret:
1936 kwargs = {}
1943 kwargs = {}
1937 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1944 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1938 if key in inpart.params:
1945 if key in inpart.params:
1939 kwargs[key] = inpart.params[key]
1946 kwargs[key] = inpart.params[key]
1940 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1947 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1941
1948
1942 @parthandler('phase-heads')
1949 @parthandler('phase-heads')
1943 def handlephases(op, inpart):
1950 def handlephases(op, inpart):
1944 """apply phases from bundle part to repo"""
1951 """apply phases from bundle part to repo"""
1945 headsbyphase = phases.binarydecode(inpart)
1952 headsbyphase = phases.binarydecode(inpart)
1946 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
1953 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
1947
1954
1948 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1955 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1949 def handlepushkeyreply(op, inpart):
1956 def handlepushkeyreply(op, inpart):
1950 """retrieve the result of a pushkey request"""
1957 """retrieve the result of a pushkey request"""
1951 ret = int(inpart.params['return'])
1958 ret = int(inpart.params['return'])
1952 partid = int(inpart.params['in-reply-to'])
1959 partid = int(inpart.params['in-reply-to'])
1953 op.records.add('pushkey', {'return': ret}, partid)
1960 op.records.add('pushkey', {'return': ret}, partid)
1954
1961
1955 @parthandler('obsmarkers')
1962 @parthandler('obsmarkers')
1956 def handleobsmarker(op, inpart):
1963 def handleobsmarker(op, inpart):
1957 """add a stream of obsmarkers to the repo"""
1964 """add a stream of obsmarkers to the repo"""
1958 tr = op.gettransaction()
1965 tr = op.gettransaction()
1959 markerdata = inpart.read()
1966 markerdata = inpart.read()
1960 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1967 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1961 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1968 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1962 % len(markerdata))
1969 % len(markerdata))
1963 # The mergemarkers call will crash if marker creation is not enabled.
1970 # The mergemarkers call will crash if marker creation is not enabled.
1964 # we want to avoid this if the part is advisory.
1971 # we want to avoid this if the part is advisory.
1965 if not inpart.mandatory and op.repo.obsstore.readonly:
1972 if not inpart.mandatory and op.repo.obsstore.readonly:
1966 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1973 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1967 return
1974 return
1968 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1975 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1969 op.repo.invalidatevolatilesets()
1976 op.repo.invalidatevolatilesets()
1970 if new:
1977 if new:
1971 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1978 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1972 op.records.add('obsmarkers', {'new': new})
1979 op.records.add('obsmarkers', {'new': new})
1973 if op.reply is not None:
1980 if op.reply is not None:
1974 rpart = op.reply.newpart('reply:obsmarkers')
1981 rpart = op.reply.newpart('reply:obsmarkers')
1975 rpart.addparam(
1982 rpart.addparam(
1976 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1983 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1977 rpart.addparam('new', '%i' % new, mandatory=False)
1984 rpart.addparam('new', '%i' % new, mandatory=False)
1978
1985
1979
1986
1980 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1987 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1981 def handleobsmarkerreply(op, inpart):
1988 def handleobsmarkerreply(op, inpart):
1982 """retrieve the result of a pushkey request"""
1989 """retrieve the result of a pushkey request"""
1983 ret = int(inpart.params['new'])
1990 ret = int(inpart.params['new'])
1984 partid = int(inpart.params['in-reply-to'])
1991 partid = int(inpart.params['in-reply-to'])
1985 op.records.add('obsmarkers', {'new': ret}, partid)
1992 op.records.add('obsmarkers', {'new': ret}, partid)
1986
1993
1987 @parthandler('hgtagsfnodes')
1994 @parthandler('hgtagsfnodes')
1988 def handlehgtagsfnodes(op, inpart):
1995 def handlehgtagsfnodes(op, inpart):
1989 """Applies .hgtags fnodes cache entries to the local repo.
1996 """Applies .hgtags fnodes cache entries to the local repo.
1990
1997
1991 Payload is pairs of 20 byte changeset nodes and filenodes.
1998 Payload is pairs of 20 byte changeset nodes and filenodes.
1992 """
1999 """
1993 # Grab the transaction so we ensure that we have the lock at this point.
2000 # Grab the transaction so we ensure that we have the lock at this point.
1994 if op.ui.configbool('experimental', 'bundle2lazylocking'):
2001 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1995 op.gettransaction()
2002 op.gettransaction()
1996 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2003 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1997
2004
1998 count = 0
2005 count = 0
1999 while True:
2006 while True:
2000 node = inpart.read(20)
2007 node = inpart.read(20)
2001 fnode = inpart.read(20)
2008 fnode = inpart.read(20)
2002 if len(node) < 20 or len(fnode) < 20:
2009 if len(node) < 20 or len(fnode) < 20:
2003 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
2010 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
2004 break
2011 break
2005 cache.setfnode(node, fnode)
2012 cache.setfnode(node, fnode)
2006 count += 1
2013 count += 1
2007
2014
2008 cache.write()
2015 cache.write()
2009 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
2016 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
2010
2017
2011 @parthandler('pushvars')
2018 @parthandler('pushvars')
2012 def bundle2getvars(op, part):
2019 def bundle2getvars(op, part):
2013 '''unbundle a bundle2 containing shellvars on the server'''
2020 '''unbundle a bundle2 containing shellvars on the server'''
2014 # An option to disable unbundling on server-side for security reasons
2021 # An option to disable unbundling on server-side for security reasons
2015 if op.ui.configbool('push', 'pushvars.server'):
2022 if op.ui.configbool('push', 'pushvars.server'):
2016 hookargs = {}
2023 hookargs = {}
2017 for key, value in part.advisoryparams:
2024 for key, value in part.advisoryparams:
2018 key = key.upper()
2025 key = key.upper()
2019 # We want pushed variables to have USERVAR_ prepended so we know
2026 # We want pushed variables to have USERVAR_ prepended so we know
2020 # they came from the --pushvar flag.
2027 # they came from the --pushvar flag.
2021 key = "USERVAR_" + key
2028 key = "USERVAR_" + key
2022 hookargs[key] = value
2029 hookargs[key] = value
2023 op.addhookargs(hookargs)
2030 op.addhookargs(hookargs)
General Comments 0
You need to be logged in to leave comments. Login now