##// END OF EJS Templates
bundle2: add documention to 'part.addparams'...
Pierre-Yves David -
r31861:6d055cd6 default
parent child Browse files
Show More
@@ -1,1626 +1,1633
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 pushkey,
161 pushkey,
162 pycompat,
162 pycompat,
163 tags,
163 tags,
164 url,
164 url,
165 util,
165 util,
166 )
166 )
167
167
168 urlerr = util.urlerr
168 urlerr = util.urlerr
169 urlreq = util.urlreq
169 urlreq = util.urlreq
170
170
171 _pack = struct.pack
171 _pack = struct.pack
172 _unpack = struct.unpack
172 _unpack = struct.unpack
173
173
174 _fstreamparamsize = '>i'
174 _fstreamparamsize = '>i'
175 _fpartheadersize = '>i'
175 _fpartheadersize = '>i'
176 _fparttypesize = '>B'
176 _fparttypesize = '>B'
177 _fpartid = '>I'
177 _fpartid = '>I'
178 _fpayloadsize = '>i'
178 _fpayloadsize = '>i'
179 _fpartparamcount = '>BB'
179 _fpartparamcount = '>BB'
180
180
181 preferedchunksize = 4096
181 preferedchunksize = 4096
182
182
183 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
183 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
184
184
185 def outdebug(ui, message):
185 def outdebug(ui, message):
186 """debug regarding output stream (bundling)"""
186 """debug regarding output stream (bundling)"""
187 if ui.configbool('devel', 'bundle2.debug', False):
187 if ui.configbool('devel', 'bundle2.debug', False):
188 ui.debug('bundle2-output: %s\n' % message)
188 ui.debug('bundle2-output: %s\n' % message)
189
189
190 def indebug(ui, message):
190 def indebug(ui, message):
191 """debug on input stream (unbundling)"""
191 """debug on input stream (unbundling)"""
192 if ui.configbool('devel', 'bundle2.debug', False):
192 if ui.configbool('devel', 'bundle2.debug', False):
193 ui.debug('bundle2-input: %s\n' % message)
193 ui.debug('bundle2-input: %s\n' % message)
194
194
195 def validateparttype(parttype):
195 def validateparttype(parttype):
196 """raise ValueError if a parttype contains invalid character"""
196 """raise ValueError if a parttype contains invalid character"""
197 if _parttypeforbidden.search(parttype):
197 if _parttypeforbidden.search(parttype):
198 raise ValueError(parttype)
198 raise ValueError(parttype)
199
199
200 def _makefpartparamsizes(nbparams):
200 def _makefpartparamsizes(nbparams):
201 """return a struct format to read part parameter sizes
201 """return a struct format to read part parameter sizes
202
202
203 The number parameters is variable so we need to build that format
203 The number parameters is variable so we need to build that format
204 dynamically.
204 dynamically.
205 """
205 """
206 return '>'+('BB'*nbparams)
206 return '>'+('BB'*nbparams)
207
207
208 parthandlermapping = {}
208 parthandlermapping = {}
209
209
210 def parthandler(parttype, params=()):
210 def parthandler(parttype, params=()):
211 """decorator that register a function as a bundle2 part handler
211 """decorator that register a function as a bundle2 part handler
212
212
213 eg::
213 eg::
214
214
215 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
215 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
216 def myparttypehandler(...):
216 def myparttypehandler(...):
217 '''process a part of type "my part".'''
217 '''process a part of type "my part".'''
218 ...
218 ...
219 """
219 """
220 validateparttype(parttype)
220 validateparttype(parttype)
221 def _decorator(func):
221 def _decorator(func):
222 lparttype = parttype.lower() # enforce lower case matching.
222 lparttype = parttype.lower() # enforce lower case matching.
223 assert lparttype not in parthandlermapping
223 assert lparttype not in parthandlermapping
224 parthandlermapping[lparttype] = func
224 parthandlermapping[lparttype] = func
225 func.params = frozenset(params)
225 func.params = frozenset(params)
226 return func
226 return func
227 return _decorator
227 return _decorator
228
228
229 class unbundlerecords(object):
229 class unbundlerecords(object):
230 """keep record of what happens during and unbundle
230 """keep record of what happens during and unbundle
231
231
232 New records are added using `records.add('cat', obj)`. Where 'cat' is a
232 New records are added using `records.add('cat', obj)`. Where 'cat' is a
233 category of record and obj is an arbitrary object.
233 category of record and obj is an arbitrary object.
234
234
235 `records['cat']` will return all entries of this category 'cat'.
235 `records['cat']` will return all entries of this category 'cat'.
236
236
237 Iterating on the object itself will yield `('category', obj)` tuples
237 Iterating on the object itself will yield `('category', obj)` tuples
238 for all entries.
238 for all entries.
239
239
240 All iterations happens in chronological order.
240 All iterations happens in chronological order.
241 """
241 """
242
242
243 def __init__(self):
243 def __init__(self):
244 self._categories = {}
244 self._categories = {}
245 self._sequences = []
245 self._sequences = []
246 self._replies = {}
246 self._replies = {}
247
247
248 def add(self, category, entry, inreplyto=None):
248 def add(self, category, entry, inreplyto=None):
249 """add a new record of a given category.
249 """add a new record of a given category.
250
250
251 The entry can then be retrieved in the list returned by
251 The entry can then be retrieved in the list returned by
252 self['category']."""
252 self['category']."""
253 self._categories.setdefault(category, []).append(entry)
253 self._categories.setdefault(category, []).append(entry)
254 self._sequences.append((category, entry))
254 self._sequences.append((category, entry))
255 if inreplyto is not None:
255 if inreplyto is not None:
256 self.getreplies(inreplyto).add(category, entry)
256 self.getreplies(inreplyto).add(category, entry)
257
257
258 def getreplies(self, partid):
258 def getreplies(self, partid):
259 """get the records that are replies to a specific part"""
259 """get the records that are replies to a specific part"""
260 return self._replies.setdefault(partid, unbundlerecords())
260 return self._replies.setdefault(partid, unbundlerecords())
261
261
262 def __getitem__(self, cat):
262 def __getitem__(self, cat):
263 return tuple(self._categories.get(cat, ()))
263 return tuple(self._categories.get(cat, ()))
264
264
265 def __iter__(self):
265 def __iter__(self):
266 return iter(self._sequences)
266 return iter(self._sequences)
267
267
268 def __len__(self):
268 def __len__(self):
269 return len(self._sequences)
269 return len(self._sequences)
270
270
271 def __nonzero__(self):
271 def __nonzero__(self):
272 return bool(self._sequences)
272 return bool(self._sequences)
273
273
274 __bool__ = __nonzero__
274 __bool__ = __nonzero__
275
275
276 class bundleoperation(object):
276 class bundleoperation(object):
277 """an object that represents a single bundling process
277 """an object that represents a single bundling process
278
278
279 Its purpose is to carry unbundle-related objects and states.
279 Its purpose is to carry unbundle-related objects and states.
280
280
281 A new object should be created at the beginning of each bundle processing.
281 A new object should be created at the beginning of each bundle processing.
282 The object is to be returned by the processing function.
282 The object is to be returned by the processing function.
283
283
284 The object has very little content now it will ultimately contain:
284 The object has very little content now it will ultimately contain:
285 * an access to the repo the bundle is applied to,
285 * an access to the repo the bundle is applied to,
286 * a ui object,
286 * a ui object,
287 * a way to retrieve a transaction to add changes to the repo,
287 * a way to retrieve a transaction to add changes to the repo,
288 * a way to record the result of processing each part,
288 * a way to record the result of processing each part,
289 * a way to construct a bundle response when applicable.
289 * a way to construct a bundle response when applicable.
290 """
290 """
291
291
292 def __init__(self, repo, transactiongetter, captureoutput=True):
292 def __init__(self, repo, transactiongetter, captureoutput=True):
293 self.repo = repo
293 self.repo = repo
294 self.ui = repo.ui
294 self.ui = repo.ui
295 self.records = unbundlerecords()
295 self.records = unbundlerecords()
296 self.gettransaction = transactiongetter
296 self.gettransaction = transactiongetter
297 self.reply = None
297 self.reply = None
298 self.captureoutput = captureoutput
298 self.captureoutput = captureoutput
299
299
300 class TransactionUnavailable(RuntimeError):
300 class TransactionUnavailable(RuntimeError):
301 pass
301 pass
302
302
303 def _notransaction():
303 def _notransaction():
304 """default method to get a transaction while processing a bundle
304 """default method to get a transaction while processing a bundle
305
305
306 Raise an exception to highlight the fact that no transaction was expected
306 Raise an exception to highlight the fact that no transaction was expected
307 to be created"""
307 to be created"""
308 raise TransactionUnavailable()
308 raise TransactionUnavailable()
309
309
310 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
310 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
311 # transform me into unbundler.apply() as soon as the freeze is lifted
311 # transform me into unbundler.apply() as soon as the freeze is lifted
312 tr.hookargs['bundle2'] = '1'
312 tr.hookargs['bundle2'] = '1'
313 if source is not None and 'source' not in tr.hookargs:
313 if source is not None and 'source' not in tr.hookargs:
314 tr.hookargs['source'] = source
314 tr.hookargs['source'] = source
315 if url is not None and 'url' not in tr.hookargs:
315 if url is not None and 'url' not in tr.hookargs:
316 tr.hookargs['url'] = url
316 tr.hookargs['url'] = url
317 return processbundle(repo, unbundler, lambda: tr, op=op)
317 return processbundle(repo, unbundler, lambda: tr, op=op)
318
318
319 def processbundle(repo, unbundler, transactiongetter=None, op=None):
319 def processbundle(repo, unbundler, transactiongetter=None, op=None):
320 """This function process a bundle, apply effect to/from a repo
320 """This function process a bundle, apply effect to/from a repo
321
321
322 It iterates over each part then searches for and uses the proper handling
322 It iterates over each part then searches for and uses the proper handling
323 code to process the part. Parts are processed in order.
323 code to process the part. Parts are processed in order.
324
324
325 Unknown Mandatory part will abort the process.
325 Unknown Mandatory part will abort the process.
326
326
327 It is temporarily possible to provide a prebuilt bundleoperation to the
327 It is temporarily possible to provide a prebuilt bundleoperation to the
328 function. This is used to ensure output is properly propagated in case of
328 function. This is used to ensure output is properly propagated in case of
329 an error during the unbundling. This output capturing part will likely be
329 an error during the unbundling. This output capturing part will likely be
330 reworked and this ability will probably go away in the process.
330 reworked and this ability will probably go away in the process.
331 """
331 """
332 if op is None:
332 if op is None:
333 if transactiongetter is None:
333 if transactiongetter is None:
334 transactiongetter = _notransaction
334 transactiongetter = _notransaction
335 op = bundleoperation(repo, transactiongetter)
335 op = bundleoperation(repo, transactiongetter)
336 # todo:
336 # todo:
337 # - replace this is a init function soon.
337 # - replace this is a init function soon.
338 # - exception catching
338 # - exception catching
339 unbundler.params
339 unbundler.params
340 if repo.ui.debugflag:
340 if repo.ui.debugflag:
341 msg = ['bundle2-input-bundle:']
341 msg = ['bundle2-input-bundle:']
342 if unbundler.params:
342 if unbundler.params:
343 msg.append(' %i params')
343 msg.append(' %i params')
344 if op.gettransaction is None:
344 if op.gettransaction is None:
345 msg.append(' no-transaction')
345 msg.append(' no-transaction')
346 else:
346 else:
347 msg.append(' with-transaction')
347 msg.append(' with-transaction')
348 msg.append('\n')
348 msg.append('\n')
349 repo.ui.debug(''.join(msg))
349 repo.ui.debug(''.join(msg))
350 iterparts = enumerate(unbundler.iterparts())
350 iterparts = enumerate(unbundler.iterparts())
351 part = None
351 part = None
352 nbpart = 0
352 nbpart = 0
353 try:
353 try:
354 for nbpart, part in iterparts:
354 for nbpart, part in iterparts:
355 _processpart(op, part)
355 _processpart(op, part)
356 except Exception as exc:
356 except Exception as exc:
357 for nbpart, part in iterparts:
357 for nbpart, part in iterparts:
358 # consume the bundle content
358 # consume the bundle content
359 part.seek(0, 2)
359 part.seek(0, 2)
360 # Small hack to let caller code distinguish exceptions from bundle2
360 # Small hack to let caller code distinguish exceptions from bundle2
361 # processing from processing the old format. This is mostly
361 # processing from processing the old format. This is mostly
362 # needed to handle different return codes to unbundle according to the
362 # needed to handle different return codes to unbundle according to the
363 # type of bundle. We should probably clean up or drop this return code
363 # type of bundle. We should probably clean up or drop this return code
364 # craziness in a future version.
364 # craziness in a future version.
365 exc.duringunbundle2 = True
365 exc.duringunbundle2 = True
366 salvaged = []
366 salvaged = []
367 replycaps = None
367 replycaps = None
368 if op.reply is not None:
368 if op.reply is not None:
369 salvaged = op.reply.salvageoutput()
369 salvaged = op.reply.salvageoutput()
370 replycaps = op.reply.capabilities
370 replycaps = op.reply.capabilities
371 exc._replycaps = replycaps
371 exc._replycaps = replycaps
372 exc._bundle2salvagedoutput = salvaged
372 exc._bundle2salvagedoutput = salvaged
373 raise
373 raise
374 finally:
374 finally:
375 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
375 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
376
376
377 return op
377 return op
378
378
379 def _processpart(op, part):
379 def _processpart(op, part):
380 """process a single part from a bundle
380 """process a single part from a bundle
381
381
382 The part is guaranteed to have been fully consumed when the function exits
382 The part is guaranteed to have been fully consumed when the function exits
383 (even if an exception is raised)."""
383 (even if an exception is raised)."""
384 status = 'unknown' # used by debug output
384 status = 'unknown' # used by debug output
385 hardabort = False
385 hardabort = False
386 try:
386 try:
387 try:
387 try:
388 handler = parthandlermapping.get(part.type)
388 handler = parthandlermapping.get(part.type)
389 if handler is None:
389 if handler is None:
390 status = 'unsupported-type'
390 status = 'unsupported-type'
391 raise error.BundleUnknownFeatureError(parttype=part.type)
391 raise error.BundleUnknownFeatureError(parttype=part.type)
392 indebug(op.ui, 'found a handler for part %r' % part.type)
392 indebug(op.ui, 'found a handler for part %r' % part.type)
393 unknownparams = part.mandatorykeys - handler.params
393 unknownparams = part.mandatorykeys - handler.params
394 if unknownparams:
394 if unknownparams:
395 unknownparams = list(unknownparams)
395 unknownparams = list(unknownparams)
396 unknownparams.sort()
396 unknownparams.sort()
397 status = 'unsupported-params (%s)' % unknownparams
397 status = 'unsupported-params (%s)' % unknownparams
398 raise error.BundleUnknownFeatureError(parttype=part.type,
398 raise error.BundleUnknownFeatureError(parttype=part.type,
399 params=unknownparams)
399 params=unknownparams)
400 status = 'supported'
400 status = 'supported'
401 except error.BundleUnknownFeatureError as exc:
401 except error.BundleUnknownFeatureError as exc:
402 if part.mandatory: # mandatory parts
402 if part.mandatory: # mandatory parts
403 raise
403 raise
404 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
404 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
405 return # skip to part processing
405 return # skip to part processing
406 finally:
406 finally:
407 if op.ui.debugflag:
407 if op.ui.debugflag:
408 msg = ['bundle2-input-part: "%s"' % part.type]
408 msg = ['bundle2-input-part: "%s"' % part.type]
409 if not part.mandatory:
409 if not part.mandatory:
410 msg.append(' (advisory)')
410 msg.append(' (advisory)')
411 nbmp = len(part.mandatorykeys)
411 nbmp = len(part.mandatorykeys)
412 nbap = len(part.params) - nbmp
412 nbap = len(part.params) - nbmp
413 if nbmp or nbap:
413 if nbmp or nbap:
414 msg.append(' (params:')
414 msg.append(' (params:')
415 if nbmp:
415 if nbmp:
416 msg.append(' %i mandatory' % nbmp)
416 msg.append(' %i mandatory' % nbmp)
417 if nbap:
417 if nbap:
418 msg.append(' %i advisory' % nbmp)
418 msg.append(' %i advisory' % nbmp)
419 msg.append(')')
419 msg.append(')')
420 msg.append(' %s\n' % status)
420 msg.append(' %s\n' % status)
421 op.ui.debug(''.join(msg))
421 op.ui.debug(''.join(msg))
422
422
423 # handler is called outside the above try block so that we don't
423 # handler is called outside the above try block so that we don't
424 # risk catching KeyErrors from anything other than the
424 # risk catching KeyErrors from anything other than the
425 # parthandlermapping lookup (any KeyError raised by handler()
425 # parthandlermapping lookup (any KeyError raised by handler()
426 # itself represents a defect of a different variety).
426 # itself represents a defect of a different variety).
427 output = None
427 output = None
428 if op.captureoutput and op.reply is not None:
428 if op.captureoutput and op.reply is not None:
429 op.ui.pushbuffer(error=True, subproc=True)
429 op.ui.pushbuffer(error=True, subproc=True)
430 output = ''
430 output = ''
431 try:
431 try:
432 handler(op, part)
432 handler(op, part)
433 finally:
433 finally:
434 if output is not None:
434 if output is not None:
435 output = op.ui.popbuffer()
435 output = op.ui.popbuffer()
436 if output:
436 if output:
437 outpart = op.reply.newpart('output', data=output,
437 outpart = op.reply.newpart('output', data=output,
438 mandatory=False)
438 mandatory=False)
439 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
439 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
440 # If exiting or interrupted, do not attempt to seek the stream in the
440 # If exiting or interrupted, do not attempt to seek the stream in the
441 # finally block below. This makes abort faster.
441 # finally block below. This makes abort faster.
442 except (SystemExit, KeyboardInterrupt):
442 except (SystemExit, KeyboardInterrupt):
443 hardabort = True
443 hardabort = True
444 raise
444 raise
445 finally:
445 finally:
446 # consume the part content to not corrupt the stream.
446 # consume the part content to not corrupt the stream.
447 if not hardabort:
447 if not hardabort:
448 part.seek(0, 2)
448 part.seek(0, 2)
449
449
450
450
451 def decodecaps(blob):
451 def decodecaps(blob):
452 """decode a bundle2 caps bytes blob into a dictionary
452 """decode a bundle2 caps bytes blob into a dictionary
453
453
454 The blob is a list of capabilities (one per line)
454 The blob is a list of capabilities (one per line)
455 Capabilities may have values using a line of the form::
455 Capabilities may have values using a line of the form::
456
456
457 capability=value1,value2,value3
457 capability=value1,value2,value3
458
458
459 The values are always a list."""
459 The values are always a list."""
460 caps = {}
460 caps = {}
461 for line in blob.splitlines():
461 for line in blob.splitlines():
462 if not line:
462 if not line:
463 continue
463 continue
464 if '=' not in line:
464 if '=' not in line:
465 key, vals = line, ()
465 key, vals = line, ()
466 else:
466 else:
467 key, vals = line.split('=', 1)
467 key, vals = line.split('=', 1)
468 vals = vals.split(',')
468 vals = vals.split(',')
469 key = urlreq.unquote(key)
469 key = urlreq.unquote(key)
470 vals = [urlreq.unquote(v) for v in vals]
470 vals = [urlreq.unquote(v) for v in vals]
471 caps[key] = vals
471 caps[key] = vals
472 return caps
472 return caps
473
473
474 def encodecaps(caps):
474 def encodecaps(caps):
475 """encode a bundle2 caps dictionary into a bytes blob"""
475 """encode a bundle2 caps dictionary into a bytes blob"""
476 chunks = []
476 chunks = []
477 for ca in sorted(caps):
477 for ca in sorted(caps):
478 vals = caps[ca]
478 vals = caps[ca]
479 ca = urlreq.quote(ca)
479 ca = urlreq.quote(ca)
480 vals = [urlreq.quote(v) for v in vals]
480 vals = [urlreq.quote(v) for v in vals]
481 if vals:
481 if vals:
482 ca = "%s=%s" % (ca, ','.join(vals))
482 ca = "%s=%s" % (ca, ','.join(vals))
483 chunks.append(ca)
483 chunks.append(ca)
484 return '\n'.join(chunks)
484 return '\n'.join(chunks)
485
485
486 bundletypes = {
486 bundletypes = {
487 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
487 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
488 # since the unification ssh accepts a header but there
488 # since the unification ssh accepts a header but there
489 # is no capability signaling it.
489 # is no capability signaling it.
490 "HG20": (), # special-cased below
490 "HG20": (), # special-cased below
491 "HG10UN": ("HG10UN", 'UN'),
491 "HG10UN": ("HG10UN", 'UN'),
492 "HG10BZ": ("HG10", 'BZ'),
492 "HG10BZ": ("HG10", 'BZ'),
493 "HG10GZ": ("HG10GZ", 'GZ'),
493 "HG10GZ": ("HG10GZ", 'GZ'),
494 }
494 }
495
495
496 # hgweb uses this list to communicate its preferred type
496 # hgweb uses this list to communicate its preferred type
497 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
497 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
498
498
499 class bundle20(object):
499 class bundle20(object):
500 """represent an outgoing bundle2 container
500 """represent an outgoing bundle2 container
501
501
502 Use the `addparam` method to add stream level parameter. and `newpart` to
502 Use the `addparam` method to add stream level parameter. and `newpart` to
503 populate it. Then call `getchunks` to retrieve all the binary chunks of
503 populate it. Then call `getchunks` to retrieve all the binary chunks of
504 data that compose the bundle2 container."""
504 data that compose the bundle2 container."""
505
505
506 _magicstring = 'HG20'
506 _magicstring = 'HG20'
507
507
508 def __init__(self, ui, capabilities=()):
508 def __init__(self, ui, capabilities=()):
509 self.ui = ui
509 self.ui = ui
510 self._params = []
510 self._params = []
511 self._parts = []
511 self._parts = []
512 self.capabilities = dict(capabilities)
512 self.capabilities = dict(capabilities)
513 self._compengine = util.compengines.forbundletype('UN')
513 self._compengine = util.compengines.forbundletype('UN')
514 self._compopts = None
514 self._compopts = None
515
515
516 def setcompression(self, alg, compopts=None):
516 def setcompression(self, alg, compopts=None):
517 """setup core part compression to <alg>"""
517 """setup core part compression to <alg>"""
518 if alg in (None, 'UN'):
518 if alg in (None, 'UN'):
519 return
519 return
520 assert not any(n.lower() == 'compression' for n, v in self._params)
520 assert not any(n.lower() == 'compression' for n, v in self._params)
521 self.addparam('Compression', alg)
521 self.addparam('Compression', alg)
522 self._compengine = util.compengines.forbundletype(alg)
522 self._compengine = util.compengines.forbundletype(alg)
523 self._compopts = compopts
523 self._compopts = compopts
524
524
525 @property
525 @property
526 def nbparts(self):
526 def nbparts(self):
527 """total number of parts added to the bundler"""
527 """total number of parts added to the bundler"""
528 return len(self._parts)
528 return len(self._parts)
529
529
530 # methods used to defines the bundle2 content
530 # methods used to defines the bundle2 content
531 def addparam(self, name, value=None):
531 def addparam(self, name, value=None):
532 """add a stream level parameter"""
532 """add a stream level parameter"""
533 if not name:
533 if not name:
534 raise ValueError('empty parameter name')
534 raise ValueError('empty parameter name')
535 if name[0] not in string.letters:
535 if name[0] not in string.letters:
536 raise ValueError('non letter first character: %r' % name)
536 raise ValueError('non letter first character: %r' % name)
537 self._params.append((name, value))
537 self._params.append((name, value))
538
538
539 def addpart(self, part):
539 def addpart(self, part):
540 """add a new part to the bundle2 container
540 """add a new part to the bundle2 container
541
541
542 Parts contains the actual applicative payload."""
542 Parts contains the actual applicative payload."""
543 assert part.id is None
543 assert part.id is None
544 part.id = len(self._parts) # very cheap counter
544 part.id = len(self._parts) # very cheap counter
545 self._parts.append(part)
545 self._parts.append(part)
546
546
547 def newpart(self, typeid, *args, **kwargs):
547 def newpart(self, typeid, *args, **kwargs):
548 """create a new part and add it to the containers
548 """create a new part and add it to the containers
549
549
550 As the part is directly added to the containers. For now, this means
550 As the part is directly added to the containers. For now, this means
551 that any failure to properly initialize the part after calling
551 that any failure to properly initialize the part after calling
552 ``newpart`` should result in a failure of the whole bundling process.
552 ``newpart`` should result in a failure of the whole bundling process.
553
553
554 You can still fall back to manually create and add if you need better
554 You can still fall back to manually create and add if you need better
555 control."""
555 control."""
556 part = bundlepart(typeid, *args, **kwargs)
556 part = bundlepart(typeid, *args, **kwargs)
557 self.addpart(part)
557 self.addpart(part)
558 return part
558 return part
559
559
560 # methods used to generate the bundle2 stream
560 # methods used to generate the bundle2 stream
561 def getchunks(self):
561 def getchunks(self):
562 if self.ui.debugflag:
562 if self.ui.debugflag:
563 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
563 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
564 if self._params:
564 if self._params:
565 msg.append(' (%i params)' % len(self._params))
565 msg.append(' (%i params)' % len(self._params))
566 msg.append(' %i parts total\n' % len(self._parts))
566 msg.append(' %i parts total\n' % len(self._parts))
567 self.ui.debug(''.join(msg))
567 self.ui.debug(''.join(msg))
568 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
568 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
569 yield self._magicstring
569 yield self._magicstring
570 param = self._paramchunk()
570 param = self._paramchunk()
571 outdebug(self.ui, 'bundle parameter: %s' % param)
571 outdebug(self.ui, 'bundle parameter: %s' % param)
572 yield _pack(_fstreamparamsize, len(param))
572 yield _pack(_fstreamparamsize, len(param))
573 if param:
573 if param:
574 yield param
574 yield param
575 for chunk in self._compengine.compressstream(self._getcorechunk(),
575 for chunk in self._compengine.compressstream(self._getcorechunk(),
576 self._compopts):
576 self._compopts):
577 yield chunk
577 yield chunk
578
578
579 def _paramchunk(self):
579 def _paramchunk(self):
580 """return a encoded version of all stream parameters"""
580 """return a encoded version of all stream parameters"""
581 blocks = []
581 blocks = []
582 for par, value in self._params:
582 for par, value in self._params:
583 par = urlreq.quote(par)
583 par = urlreq.quote(par)
584 if value is not None:
584 if value is not None:
585 value = urlreq.quote(value)
585 value = urlreq.quote(value)
586 par = '%s=%s' % (par, value)
586 par = '%s=%s' % (par, value)
587 blocks.append(par)
587 blocks.append(par)
588 return ' '.join(blocks)
588 return ' '.join(blocks)
589
589
590 def _getcorechunk(self):
590 def _getcorechunk(self):
591 """yield chunk for the core part of the bundle
591 """yield chunk for the core part of the bundle
592
592
593 (all but headers and parameters)"""
593 (all but headers and parameters)"""
594 outdebug(self.ui, 'start of parts')
594 outdebug(self.ui, 'start of parts')
595 for part in self._parts:
595 for part in self._parts:
596 outdebug(self.ui, 'bundle part: "%s"' % part.type)
596 outdebug(self.ui, 'bundle part: "%s"' % part.type)
597 for chunk in part.getchunks(ui=self.ui):
597 for chunk in part.getchunks(ui=self.ui):
598 yield chunk
598 yield chunk
599 outdebug(self.ui, 'end of bundle')
599 outdebug(self.ui, 'end of bundle')
600 yield _pack(_fpartheadersize, 0)
600 yield _pack(_fpartheadersize, 0)
601
601
602
602
603 def salvageoutput(self):
603 def salvageoutput(self):
604 """return a list with a copy of all output parts in the bundle
604 """return a list with a copy of all output parts in the bundle
605
605
606 This is meant to be used during error handling to make sure we preserve
606 This is meant to be used during error handling to make sure we preserve
607 server output"""
607 server output"""
608 salvaged = []
608 salvaged = []
609 for part in self._parts:
609 for part in self._parts:
610 if part.type.startswith('output'):
610 if part.type.startswith('output'):
611 salvaged.append(part.copy())
611 salvaged.append(part.copy())
612 return salvaged
612 return salvaged
613
613
614
614
615 class unpackermixin(object):
615 class unpackermixin(object):
616 """A mixin to extract bytes and struct data from a stream"""
616 """A mixin to extract bytes and struct data from a stream"""
617
617
618 def __init__(self, fp):
618 def __init__(self, fp):
619 self._fp = fp
619 self._fp = fp
620 self._seekable = (util.safehasattr(fp, 'seek') and
620 self._seekable = (util.safehasattr(fp, 'seek') and
621 util.safehasattr(fp, 'tell'))
621 util.safehasattr(fp, 'tell'))
622
622
623 def _unpack(self, format):
623 def _unpack(self, format):
624 """unpack this struct format from the stream"""
624 """unpack this struct format from the stream"""
625 data = self._readexact(struct.calcsize(format))
625 data = self._readexact(struct.calcsize(format))
626 return _unpack(format, data)
626 return _unpack(format, data)
627
627
628 def _readexact(self, size):
628 def _readexact(self, size):
629 """read exactly <size> bytes from the stream"""
629 """read exactly <size> bytes from the stream"""
630 return changegroup.readexactly(self._fp, size)
630 return changegroup.readexactly(self._fp, size)
631
631
632 def seek(self, offset, whence=0):
632 def seek(self, offset, whence=0):
633 """move the underlying file pointer"""
633 """move the underlying file pointer"""
634 if self._seekable:
634 if self._seekable:
635 return self._fp.seek(offset, whence)
635 return self._fp.seek(offset, whence)
636 else:
636 else:
637 raise NotImplementedError(_('File pointer is not seekable'))
637 raise NotImplementedError(_('File pointer is not seekable'))
638
638
639 def tell(self):
639 def tell(self):
640 """return the file offset, or None if file is not seekable"""
640 """return the file offset, or None if file is not seekable"""
641 if self._seekable:
641 if self._seekable:
642 try:
642 try:
643 return self._fp.tell()
643 return self._fp.tell()
644 except IOError as e:
644 except IOError as e:
645 if e.errno == errno.ESPIPE:
645 if e.errno == errno.ESPIPE:
646 self._seekable = False
646 self._seekable = False
647 else:
647 else:
648 raise
648 raise
649 return None
649 return None
650
650
651 def close(self):
651 def close(self):
652 """close underlying file"""
652 """close underlying file"""
653 if util.safehasattr(self._fp, 'close'):
653 if util.safehasattr(self._fp, 'close'):
654 return self._fp.close()
654 return self._fp.close()
655
655
656 def getunbundler(ui, fp, magicstring=None):
656 def getunbundler(ui, fp, magicstring=None):
657 """return a valid unbundler object for a given magicstring"""
657 """return a valid unbundler object for a given magicstring"""
658 if magicstring is None:
658 if magicstring is None:
659 magicstring = changegroup.readexactly(fp, 4)
659 magicstring = changegroup.readexactly(fp, 4)
660 magic, version = magicstring[0:2], magicstring[2:4]
660 magic, version = magicstring[0:2], magicstring[2:4]
661 if magic != 'HG':
661 if magic != 'HG':
662 raise error.Abort(_('not a Mercurial bundle'))
662 raise error.Abort(_('not a Mercurial bundle'))
663 unbundlerclass = formatmap.get(version)
663 unbundlerclass = formatmap.get(version)
664 if unbundlerclass is None:
664 if unbundlerclass is None:
665 raise error.Abort(_('unknown bundle version %s') % version)
665 raise error.Abort(_('unknown bundle version %s') % version)
666 unbundler = unbundlerclass(ui, fp)
666 unbundler = unbundlerclass(ui, fp)
667 indebug(ui, 'start processing of %s stream' % magicstring)
667 indebug(ui, 'start processing of %s stream' % magicstring)
668 return unbundler
668 return unbundler
669
669
670 class unbundle20(unpackermixin):
670 class unbundle20(unpackermixin):
671 """interpret a bundle2 stream
671 """interpret a bundle2 stream
672
672
673 This class is fed with a binary stream and yields parts through its
673 This class is fed with a binary stream and yields parts through its
674 `iterparts` methods."""
674 `iterparts` methods."""
675
675
676 _magicstring = 'HG20'
676 _magicstring = 'HG20'
677
677
678 def __init__(self, ui, fp):
678 def __init__(self, ui, fp):
679 """If header is specified, we do not read it out of the stream."""
679 """If header is specified, we do not read it out of the stream."""
680 self.ui = ui
680 self.ui = ui
681 self._compengine = util.compengines.forbundletype('UN')
681 self._compengine = util.compengines.forbundletype('UN')
682 self._compressed = None
682 self._compressed = None
683 super(unbundle20, self).__init__(fp)
683 super(unbundle20, self).__init__(fp)
684
684
685 @util.propertycache
685 @util.propertycache
686 def params(self):
686 def params(self):
687 """dictionary of stream level parameters"""
687 """dictionary of stream level parameters"""
688 indebug(self.ui, 'reading bundle2 stream parameters')
688 indebug(self.ui, 'reading bundle2 stream parameters')
689 params = {}
689 params = {}
690 paramssize = self._unpack(_fstreamparamsize)[0]
690 paramssize = self._unpack(_fstreamparamsize)[0]
691 if paramssize < 0:
691 if paramssize < 0:
692 raise error.BundleValueError('negative bundle param size: %i'
692 raise error.BundleValueError('negative bundle param size: %i'
693 % paramssize)
693 % paramssize)
694 if paramssize:
694 if paramssize:
695 params = self._readexact(paramssize)
695 params = self._readexact(paramssize)
696 params = self._processallparams(params)
696 params = self._processallparams(params)
697 return params
697 return params
698
698
699 def _processallparams(self, paramsblock):
699 def _processallparams(self, paramsblock):
700 """"""
700 """"""
701 params = util.sortdict()
701 params = util.sortdict()
702 for p in paramsblock.split(' '):
702 for p in paramsblock.split(' '):
703 p = p.split('=', 1)
703 p = p.split('=', 1)
704 p = [urlreq.unquote(i) for i in p]
704 p = [urlreq.unquote(i) for i in p]
705 if len(p) < 2:
705 if len(p) < 2:
706 p.append(None)
706 p.append(None)
707 self._processparam(*p)
707 self._processparam(*p)
708 params[p[0]] = p[1]
708 params[p[0]] = p[1]
709 return params
709 return params
710
710
711
711
712 def _processparam(self, name, value):
712 def _processparam(self, name, value):
713 """process a parameter, applying its effect if needed
713 """process a parameter, applying its effect if needed
714
714
715 Parameter starting with a lower case letter are advisory and will be
715 Parameter starting with a lower case letter are advisory and will be
716 ignored when unknown. Those starting with an upper case letter are
716 ignored when unknown. Those starting with an upper case letter are
717 mandatory and will this function will raise a KeyError when unknown.
717 mandatory and will this function will raise a KeyError when unknown.
718
718
719 Note: no option are currently supported. Any input will be either
719 Note: no option are currently supported. Any input will be either
720 ignored or failing.
720 ignored or failing.
721 """
721 """
722 if not name:
722 if not name:
723 raise ValueError('empty parameter name')
723 raise ValueError('empty parameter name')
724 if name[0] not in string.letters:
724 if name[0] not in string.letters:
725 raise ValueError('non letter first character: %r' % name)
725 raise ValueError('non letter first character: %r' % name)
726 try:
726 try:
727 handler = b2streamparamsmap[name.lower()]
727 handler = b2streamparamsmap[name.lower()]
728 except KeyError:
728 except KeyError:
729 if name[0].islower():
729 if name[0].islower():
730 indebug(self.ui, "ignoring unknown parameter %r" % name)
730 indebug(self.ui, "ignoring unknown parameter %r" % name)
731 else:
731 else:
732 raise error.BundleUnknownFeatureError(params=(name,))
732 raise error.BundleUnknownFeatureError(params=(name,))
733 else:
733 else:
734 handler(self, name, value)
734 handler(self, name, value)
735
735
736 def _forwardchunks(self):
736 def _forwardchunks(self):
737 """utility to transfer a bundle2 as binary
737 """utility to transfer a bundle2 as binary
738
738
739 This is made necessary by the fact the 'getbundle' command over 'ssh'
739 This is made necessary by the fact the 'getbundle' command over 'ssh'
740 have no way to know then the reply end, relying on the bundle to be
740 have no way to know then the reply end, relying on the bundle to be
741 interpreted to know its end. This is terrible and we are sorry, but we
741 interpreted to know its end. This is terrible and we are sorry, but we
742 needed to move forward to get general delta enabled.
742 needed to move forward to get general delta enabled.
743 """
743 """
744 yield self._magicstring
744 yield self._magicstring
745 assert 'params' not in vars(self)
745 assert 'params' not in vars(self)
746 paramssize = self._unpack(_fstreamparamsize)[0]
746 paramssize = self._unpack(_fstreamparamsize)[0]
747 if paramssize < 0:
747 if paramssize < 0:
748 raise error.BundleValueError('negative bundle param size: %i'
748 raise error.BundleValueError('negative bundle param size: %i'
749 % paramssize)
749 % paramssize)
750 yield _pack(_fstreamparamsize, paramssize)
750 yield _pack(_fstreamparamsize, paramssize)
751 if paramssize:
751 if paramssize:
752 params = self._readexact(paramssize)
752 params = self._readexact(paramssize)
753 self._processallparams(params)
753 self._processallparams(params)
754 yield params
754 yield params
755 assert self._compengine.bundletype == 'UN'
755 assert self._compengine.bundletype == 'UN'
756 # From there, payload might need to be decompressed
756 # From there, payload might need to be decompressed
757 self._fp = self._compengine.decompressorreader(self._fp)
757 self._fp = self._compengine.decompressorreader(self._fp)
758 emptycount = 0
758 emptycount = 0
759 while emptycount < 2:
759 while emptycount < 2:
760 # so we can brainlessly loop
760 # so we can brainlessly loop
761 assert _fpartheadersize == _fpayloadsize
761 assert _fpartheadersize == _fpayloadsize
762 size = self._unpack(_fpartheadersize)[0]
762 size = self._unpack(_fpartheadersize)[0]
763 yield _pack(_fpartheadersize, size)
763 yield _pack(_fpartheadersize, size)
764 if size:
764 if size:
765 emptycount = 0
765 emptycount = 0
766 else:
766 else:
767 emptycount += 1
767 emptycount += 1
768 continue
768 continue
769 if size == flaginterrupt:
769 if size == flaginterrupt:
770 continue
770 continue
771 elif size < 0:
771 elif size < 0:
772 raise error.BundleValueError('negative chunk size: %i')
772 raise error.BundleValueError('negative chunk size: %i')
773 yield self._readexact(size)
773 yield self._readexact(size)
774
774
775
775
776 def iterparts(self):
776 def iterparts(self):
777 """yield all parts contained in the stream"""
777 """yield all parts contained in the stream"""
778 # make sure param have been loaded
778 # make sure param have been loaded
779 self.params
779 self.params
780 # From there, payload need to be decompressed
780 # From there, payload need to be decompressed
781 self._fp = self._compengine.decompressorreader(self._fp)
781 self._fp = self._compengine.decompressorreader(self._fp)
782 indebug(self.ui, 'start extraction of bundle2 parts')
782 indebug(self.ui, 'start extraction of bundle2 parts')
783 headerblock = self._readpartheader()
783 headerblock = self._readpartheader()
784 while headerblock is not None:
784 while headerblock is not None:
785 part = unbundlepart(self.ui, headerblock, self._fp)
785 part = unbundlepart(self.ui, headerblock, self._fp)
786 yield part
786 yield part
787 part.seek(0, 2)
787 part.seek(0, 2)
788 headerblock = self._readpartheader()
788 headerblock = self._readpartheader()
789 indebug(self.ui, 'end of bundle2 stream')
789 indebug(self.ui, 'end of bundle2 stream')
790
790
791 def _readpartheader(self):
791 def _readpartheader(self):
792 """reads a part header size and return the bytes blob
792 """reads a part header size and return the bytes blob
793
793
794 returns None if empty"""
794 returns None if empty"""
795 headersize = self._unpack(_fpartheadersize)[0]
795 headersize = self._unpack(_fpartheadersize)[0]
796 if headersize < 0:
796 if headersize < 0:
797 raise error.BundleValueError('negative part header size: %i'
797 raise error.BundleValueError('negative part header size: %i'
798 % headersize)
798 % headersize)
799 indebug(self.ui, 'part header size: %i' % headersize)
799 indebug(self.ui, 'part header size: %i' % headersize)
800 if headersize:
800 if headersize:
801 return self._readexact(headersize)
801 return self._readexact(headersize)
802 return None
802 return None
803
803
804 def compressed(self):
804 def compressed(self):
805 self.params # load params
805 self.params # load params
806 return self._compressed
806 return self._compressed
807
807
808 formatmap = {'20': unbundle20}
808 formatmap = {'20': unbundle20}
809
809
810 b2streamparamsmap = {}
810 b2streamparamsmap = {}
811
811
812 def b2streamparamhandler(name):
812 def b2streamparamhandler(name):
813 """register a handler for a stream level parameter"""
813 """register a handler for a stream level parameter"""
814 def decorator(func):
814 def decorator(func):
815 assert name not in formatmap
815 assert name not in formatmap
816 b2streamparamsmap[name] = func
816 b2streamparamsmap[name] = func
817 return func
817 return func
818 return decorator
818 return decorator
819
819
820 @b2streamparamhandler('compression')
820 @b2streamparamhandler('compression')
821 def processcompression(unbundler, param, value):
821 def processcompression(unbundler, param, value):
822 """read compression parameter and install payload decompression"""
822 """read compression parameter and install payload decompression"""
823 if value not in util.compengines.supportedbundletypes:
823 if value not in util.compengines.supportedbundletypes:
824 raise error.BundleUnknownFeatureError(params=(param,),
824 raise error.BundleUnknownFeatureError(params=(param,),
825 values=(value,))
825 values=(value,))
826 unbundler._compengine = util.compengines.forbundletype(value)
826 unbundler._compengine = util.compengines.forbundletype(value)
827 if value is not None:
827 if value is not None:
828 unbundler._compressed = True
828 unbundler._compressed = True
829
829
830 class bundlepart(object):
830 class bundlepart(object):
831 """A bundle2 part contains application level payload
831 """A bundle2 part contains application level payload
832
832
833 The part `type` is used to route the part to the application level
833 The part `type` is used to route the part to the application level
834 handler.
834 handler.
835
835
836 The part payload is contained in ``part.data``. It could be raw bytes or a
836 The part payload is contained in ``part.data``. It could be raw bytes or a
837 generator of byte chunks.
837 generator of byte chunks.
838
838
839 You can add parameters to the part using the ``addparam`` method.
839 You can add parameters to the part using the ``addparam`` method.
840 Parameters can be either mandatory (default) or advisory. Remote side
840 Parameters can be either mandatory (default) or advisory. Remote side
841 should be able to safely ignore the advisory ones.
841 should be able to safely ignore the advisory ones.
842
842
843 Both data and parameters cannot be modified after the generation has begun.
843 Both data and parameters cannot be modified after the generation has begun.
844 """
844 """
845
845
846 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
846 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
847 data='', mandatory=True):
847 data='', mandatory=True):
848 validateparttype(parttype)
848 validateparttype(parttype)
849 self.id = None
849 self.id = None
850 self.type = parttype
850 self.type = parttype
851 self._data = data
851 self._data = data
852 self._mandatoryparams = list(mandatoryparams)
852 self._mandatoryparams = list(mandatoryparams)
853 self._advisoryparams = list(advisoryparams)
853 self._advisoryparams = list(advisoryparams)
854 # checking for duplicated entries
854 # checking for duplicated entries
855 self._seenparams = set()
855 self._seenparams = set()
856 for pname, __ in self._mandatoryparams + self._advisoryparams:
856 for pname, __ in self._mandatoryparams + self._advisoryparams:
857 if pname in self._seenparams:
857 if pname in self._seenparams:
858 raise error.ProgrammingError('duplicated params: %s' % pname)
858 raise error.ProgrammingError('duplicated params: %s' % pname)
859 self._seenparams.add(pname)
859 self._seenparams.add(pname)
860 # status of the part's generation:
860 # status of the part's generation:
861 # - None: not started,
861 # - None: not started,
862 # - False: currently generated,
862 # - False: currently generated,
863 # - True: generation done.
863 # - True: generation done.
864 self._generated = None
864 self._generated = None
865 self.mandatory = mandatory
865 self.mandatory = mandatory
866
866
867 def __repr__(self):
867 def __repr__(self):
868 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
868 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
869 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
869 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
870 % (cls, id(self), self.id, self.type, self.mandatory))
870 % (cls, id(self), self.id, self.type, self.mandatory))
871
871
872 def copy(self):
872 def copy(self):
873 """return a copy of the part
873 """return a copy of the part
874
874
875 The new part have the very same content but no partid assigned yet.
875 The new part have the very same content but no partid assigned yet.
876 Parts with generated data cannot be copied."""
876 Parts with generated data cannot be copied."""
877 assert not util.safehasattr(self.data, 'next')
877 assert not util.safehasattr(self.data, 'next')
878 return self.__class__(self.type, self._mandatoryparams,
878 return self.__class__(self.type, self._mandatoryparams,
879 self._advisoryparams, self._data, self.mandatory)
879 self._advisoryparams, self._data, self.mandatory)
880
880
881 # methods used to defines the part content
881 # methods used to defines the part content
882 @property
882 @property
883 def data(self):
883 def data(self):
884 return self._data
884 return self._data
885
885
886 @data.setter
886 @data.setter
887 def data(self, data):
887 def data(self, data):
888 if self._generated is not None:
888 if self._generated is not None:
889 raise error.ReadOnlyPartError('part is being generated')
889 raise error.ReadOnlyPartError('part is being generated')
890 self._data = data
890 self._data = data
891
891
892 @property
892 @property
893 def mandatoryparams(self):
893 def mandatoryparams(self):
894 # make it an immutable tuple to force people through ``addparam``
894 # make it an immutable tuple to force people through ``addparam``
895 return tuple(self._mandatoryparams)
895 return tuple(self._mandatoryparams)
896
896
897 @property
897 @property
898 def advisoryparams(self):
898 def advisoryparams(self):
899 # make it an immutable tuple to force people through ``addparam``
899 # make it an immutable tuple to force people through ``addparam``
900 return tuple(self._advisoryparams)
900 return tuple(self._advisoryparams)
901
901
902 def addparam(self, name, value='', mandatory=True):
902 def addparam(self, name, value='', mandatory=True):
903 """add a parameter to the part
904
905 If 'mandatory' is set to True, the remote handler must claim support
906 for this parameter or the unbundling will be aborted.
907
908 The 'name' and 'value' cannot exceed 255 bytes each.
909 """
903 if self._generated is not None:
910 if self._generated is not None:
904 raise error.ReadOnlyPartError('part is being generated')
911 raise error.ReadOnlyPartError('part is being generated')
905 if name in self._seenparams:
912 if name in self._seenparams:
906 raise ValueError('duplicated params: %s' % name)
913 raise ValueError('duplicated params: %s' % name)
907 self._seenparams.add(name)
914 self._seenparams.add(name)
908 params = self._advisoryparams
915 params = self._advisoryparams
909 if mandatory:
916 if mandatory:
910 params = self._mandatoryparams
917 params = self._mandatoryparams
911 params.append((name, value))
918 params.append((name, value))
912
919
913 # methods used to generates the bundle2 stream
920 # methods used to generates the bundle2 stream
914 def getchunks(self, ui):
921 def getchunks(self, ui):
915 if self._generated is not None:
922 if self._generated is not None:
916 raise error.ProgrammingError('part can only be consumed once')
923 raise error.ProgrammingError('part can only be consumed once')
917 self._generated = False
924 self._generated = False
918
925
919 if ui.debugflag:
926 if ui.debugflag:
920 msg = ['bundle2-output-part: "%s"' % self.type]
927 msg = ['bundle2-output-part: "%s"' % self.type]
921 if not self.mandatory:
928 if not self.mandatory:
922 msg.append(' (advisory)')
929 msg.append(' (advisory)')
923 nbmp = len(self.mandatoryparams)
930 nbmp = len(self.mandatoryparams)
924 nbap = len(self.advisoryparams)
931 nbap = len(self.advisoryparams)
925 if nbmp or nbap:
932 if nbmp or nbap:
926 msg.append(' (params:')
933 msg.append(' (params:')
927 if nbmp:
934 if nbmp:
928 msg.append(' %i mandatory' % nbmp)
935 msg.append(' %i mandatory' % nbmp)
929 if nbap:
936 if nbap:
930 msg.append(' %i advisory' % nbmp)
937 msg.append(' %i advisory' % nbmp)
931 msg.append(')')
938 msg.append(')')
932 if not self.data:
939 if not self.data:
933 msg.append(' empty payload')
940 msg.append(' empty payload')
934 elif util.safehasattr(self.data, 'next'):
941 elif util.safehasattr(self.data, 'next'):
935 msg.append(' streamed payload')
942 msg.append(' streamed payload')
936 else:
943 else:
937 msg.append(' %i bytes payload' % len(self.data))
944 msg.append(' %i bytes payload' % len(self.data))
938 msg.append('\n')
945 msg.append('\n')
939 ui.debug(''.join(msg))
946 ui.debug(''.join(msg))
940
947
941 #### header
948 #### header
942 if self.mandatory:
949 if self.mandatory:
943 parttype = self.type.upper()
950 parttype = self.type.upper()
944 else:
951 else:
945 parttype = self.type.lower()
952 parttype = self.type.lower()
946 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
953 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
947 ## parttype
954 ## parttype
948 header = [_pack(_fparttypesize, len(parttype)),
955 header = [_pack(_fparttypesize, len(parttype)),
949 parttype, _pack(_fpartid, self.id),
956 parttype, _pack(_fpartid, self.id),
950 ]
957 ]
951 ## parameters
958 ## parameters
952 # count
959 # count
953 manpar = self.mandatoryparams
960 manpar = self.mandatoryparams
954 advpar = self.advisoryparams
961 advpar = self.advisoryparams
955 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
962 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
956 # size
963 # size
957 parsizes = []
964 parsizes = []
958 for key, value in manpar:
965 for key, value in manpar:
959 parsizes.append(len(key))
966 parsizes.append(len(key))
960 parsizes.append(len(value))
967 parsizes.append(len(value))
961 for key, value in advpar:
968 for key, value in advpar:
962 parsizes.append(len(key))
969 parsizes.append(len(key))
963 parsizes.append(len(value))
970 parsizes.append(len(value))
964 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
971 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
965 header.append(paramsizes)
972 header.append(paramsizes)
966 # key, value
973 # key, value
967 for key, value in manpar:
974 for key, value in manpar:
968 header.append(key)
975 header.append(key)
969 header.append(value)
976 header.append(value)
970 for key, value in advpar:
977 for key, value in advpar:
971 header.append(key)
978 header.append(key)
972 header.append(value)
979 header.append(value)
973 ## finalize header
980 ## finalize header
974 headerchunk = ''.join(header)
981 headerchunk = ''.join(header)
975 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
982 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
976 yield _pack(_fpartheadersize, len(headerchunk))
983 yield _pack(_fpartheadersize, len(headerchunk))
977 yield headerchunk
984 yield headerchunk
978 ## payload
985 ## payload
979 try:
986 try:
980 for chunk in self._payloadchunks():
987 for chunk in self._payloadchunks():
981 outdebug(ui, 'payload chunk size: %i' % len(chunk))
988 outdebug(ui, 'payload chunk size: %i' % len(chunk))
982 yield _pack(_fpayloadsize, len(chunk))
989 yield _pack(_fpayloadsize, len(chunk))
983 yield chunk
990 yield chunk
984 except GeneratorExit:
991 except GeneratorExit:
985 # GeneratorExit means that nobody is listening for our
992 # GeneratorExit means that nobody is listening for our
986 # results anyway, so just bail quickly rather than trying
993 # results anyway, so just bail quickly rather than trying
987 # to produce an error part.
994 # to produce an error part.
988 ui.debug('bundle2-generatorexit\n')
995 ui.debug('bundle2-generatorexit\n')
989 raise
996 raise
990 except BaseException as exc:
997 except BaseException as exc:
991 # backup exception data for later
998 # backup exception data for later
992 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
999 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
993 % exc)
1000 % exc)
994 exc_info = sys.exc_info()
1001 exc_info = sys.exc_info()
995 msg = 'unexpected error: %s' % exc
1002 msg = 'unexpected error: %s' % exc
996 interpart = bundlepart('error:abort', [('message', msg)],
1003 interpart = bundlepart('error:abort', [('message', msg)],
997 mandatory=False)
1004 mandatory=False)
998 interpart.id = 0
1005 interpart.id = 0
999 yield _pack(_fpayloadsize, -1)
1006 yield _pack(_fpayloadsize, -1)
1000 for chunk in interpart.getchunks(ui=ui):
1007 for chunk in interpart.getchunks(ui=ui):
1001 yield chunk
1008 yield chunk
1002 outdebug(ui, 'closing payload chunk')
1009 outdebug(ui, 'closing payload chunk')
1003 # abort current part payload
1010 # abort current part payload
1004 yield _pack(_fpayloadsize, 0)
1011 yield _pack(_fpayloadsize, 0)
1005 if pycompat.ispy3:
1012 if pycompat.ispy3:
1006 raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
1013 raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
1007 else:
1014 else:
1008 exec("""raise exc_info[0], exc_info[1], exc_info[2]""")
1015 exec("""raise exc_info[0], exc_info[1], exc_info[2]""")
1009 # end of payload
1016 # end of payload
1010 outdebug(ui, 'closing payload chunk')
1017 outdebug(ui, 'closing payload chunk')
1011 yield _pack(_fpayloadsize, 0)
1018 yield _pack(_fpayloadsize, 0)
1012 self._generated = True
1019 self._generated = True
1013
1020
1014 def _payloadchunks(self):
1021 def _payloadchunks(self):
1015 """yield chunks of a the part payload
1022 """yield chunks of a the part payload
1016
1023
1017 Exists to handle the different methods to provide data to a part."""
1024 Exists to handle the different methods to provide data to a part."""
1018 # we only support fixed size data now.
1025 # we only support fixed size data now.
1019 # This will be improved in the future.
1026 # This will be improved in the future.
1020 if util.safehasattr(self.data, 'next'):
1027 if util.safehasattr(self.data, 'next'):
1021 buff = util.chunkbuffer(self.data)
1028 buff = util.chunkbuffer(self.data)
1022 chunk = buff.read(preferedchunksize)
1029 chunk = buff.read(preferedchunksize)
1023 while chunk:
1030 while chunk:
1024 yield chunk
1031 yield chunk
1025 chunk = buff.read(preferedchunksize)
1032 chunk = buff.read(preferedchunksize)
1026 elif len(self.data):
1033 elif len(self.data):
1027 yield self.data
1034 yield self.data
1028
1035
1029
1036
1030 flaginterrupt = -1
1037 flaginterrupt = -1
1031
1038
1032 class interrupthandler(unpackermixin):
1039 class interrupthandler(unpackermixin):
1033 """read one part and process it with restricted capability
1040 """read one part and process it with restricted capability
1034
1041
1035 This allows to transmit exception raised on the producer size during part
1042 This allows to transmit exception raised on the producer size during part
1036 iteration while the consumer is reading a part.
1043 iteration while the consumer is reading a part.
1037
1044
1038 Part processed in this manner only have access to a ui object,"""
1045 Part processed in this manner only have access to a ui object,"""
1039
1046
1040 def __init__(self, ui, fp):
1047 def __init__(self, ui, fp):
1041 super(interrupthandler, self).__init__(fp)
1048 super(interrupthandler, self).__init__(fp)
1042 self.ui = ui
1049 self.ui = ui
1043
1050
1044 def _readpartheader(self):
1051 def _readpartheader(self):
1045 """reads a part header size and return the bytes blob
1052 """reads a part header size and return the bytes blob
1046
1053
1047 returns None if empty"""
1054 returns None if empty"""
1048 headersize = self._unpack(_fpartheadersize)[0]
1055 headersize = self._unpack(_fpartheadersize)[0]
1049 if headersize < 0:
1056 if headersize < 0:
1050 raise error.BundleValueError('negative part header size: %i'
1057 raise error.BundleValueError('negative part header size: %i'
1051 % headersize)
1058 % headersize)
1052 indebug(self.ui, 'part header size: %i\n' % headersize)
1059 indebug(self.ui, 'part header size: %i\n' % headersize)
1053 if headersize:
1060 if headersize:
1054 return self._readexact(headersize)
1061 return self._readexact(headersize)
1055 return None
1062 return None
1056
1063
1057 def __call__(self):
1064 def __call__(self):
1058
1065
1059 self.ui.debug('bundle2-input-stream-interrupt:'
1066 self.ui.debug('bundle2-input-stream-interrupt:'
1060 ' opening out of band context\n')
1067 ' opening out of band context\n')
1061 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1068 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1062 headerblock = self._readpartheader()
1069 headerblock = self._readpartheader()
1063 if headerblock is None:
1070 if headerblock is None:
1064 indebug(self.ui, 'no part found during interruption.')
1071 indebug(self.ui, 'no part found during interruption.')
1065 return
1072 return
1066 part = unbundlepart(self.ui, headerblock, self._fp)
1073 part = unbundlepart(self.ui, headerblock, self._fp)
1067 op = interruptoperation(self.ui)
1074 op = interruptoperation(self.ui)
1068 _processpart(op, part)
1075 _processpart(op, part)
1069 self.ui.debug('bundle2-input-stream-interrupt:'
1076 self.ui.debug('bundle2-input-stream-interrupt:'
1070 ' closing out of band context\n')
1077 ' closing out of band context\n')
1071
1078
1072 class interruptoperation(object):
1079 class interruptoperation(object):
1073 """A limited operation to be use by part handler during interruption
1080 """A limited operation to be use by part handler during interruption
1074
1081
1075 It only have access to an ui object.
1082 It only have access to an ui object.
1076 """
1083 """
1077
1084
1078 def __init__(self, ui):
1085 def __init__(self, ui):
1079 self.ui = ui
1086 self.ui = ui
1080 self.reply = None
1087 self.reply = None
1081 self.captureoutput = False
1088 self.captureoutput = False
1082
1089
1083 @property
1090 @property
1084 def repo(self):
1091 def repo(self):
1085 raise error.ProgrammingError('no repo access from stream interruption')
1092 raise error.ProgrammingError('no repo access from stream interruption')
1086
1093
1087 def gettransaction(self):
1094 def gettransaction(self):
1088 raise TransactionUnavailable('no repo access from stream interruption')
1095 raise TransactionUnavailable('no repo access from stream interruption')
1089
1096
1090 class unbundlepart(unpackermixin):
1097 class unbundlepart(unpackermixin):
1091 """a bundle part read from a bundle"""
1098 """a bundle part read from a bundle"""
1092
1099
1093 def __init__(self, ui, header, fp):
1100 def __init__(self, ui, header, fp):
1094 super(unbundlepart, self).__init__(fp)
1101 super(unbundlepart, self).__init__(fp)
1095 self.ui = ui
1102 self.ui = ui
1096 # unbundle state attr
1103 # unbundle state attr
1097 self._headerdata = header
1104 self._headerdata = header
1098 self._headeroffset = 0
1105 self._headeroffset = 0
1099 self._initialized = False
1106 self._initialized = False
1100 self.consumed = False
1107 self.consumed = False
1101 # part data
1108 # part data
1102 self.id = None
1109 self.id = None
1103 self.type = None
1110 self.type = None
1104 self.mandatoryparams = None
1111 self.mandatoryparams = None
1105 self.advisoryparams = None
1112 self.advisoryparams = None
1106 self.params = None
1113 self.params = None
1107 self.mandatorykeys = ()
1114 self.mandatorykeys = ()
1108 self._payloadstream = None
1115 self._payloadstream = None
1109 self._readheader()
1116 self._readheader()
1110 self._mandatory = None
1117 self._mandatory = None
1111 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1118 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1112 self._pos = 0
1119 self._pos = 0
1113
1120
1114 def _fromheader(self, size):
1121 def _fromheader(self, size):
1115 """return the next <size> byte from the header"""
1122 """return the next <size> byte from the header"""
1116 offset = self._headeroffset
1123 offset = self._headeroffset
1117 data = self._headerdata[offset:(offset + size)]
1124 data = self._headerdata[offset:(offset + size)]
1118 self._headeroffset = offset + size
1125 self._headeroffset = offset + size
1119 return data
1126 return data
1120
1127
1121 def _unpackheader(self, format):
1128 def _unpackheader(self, format):
1122 """read given format from header
1129 """read given format from header
1123
1130
1124 This automatically compute the size of the format to read."""
1131 This automatically compute the size of the format to read."""
1125 data = self._fromheader(struct.calcsize(format))
1132 data = self._fromheader(struct.calcsize(format))
1126 return _unpack(format, data)
1133 return _unpack(format, data)
1127
1134
1128 def _initparams(self, mandatoryparams, advisoryparams):
1135 def _initparams(self, mandatoryparams, advisoryparams):
1129 """internal function to setup all logic related parameters"""
1136 """internal function to setup all logic related parameters"""
1130 # make it read only to prevent people touching it by mistake.
1137 # make it read only to prevent people touching it by mistake.
1131 self.mandatoryparams = tuple(mandatoryparams)
1138 self.mandatoryparams = tuple(mandatoryparams)
1132 self.advisoryparams = tuple(advisoryparams)
1139 self.advisoryparams = tuple(advisoryparams)
1133 # user friendly UI
1140 # user friendly UI
1134 self.params = util.sortdict(self.mandatoryparams)
1141 self.params = util.sortdict(self.mandatoryparams)
1135 self.params.update(self.advisoryparams)
1142 self.params.update(self.advisoryparams)
1136 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1143 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1137
1144
1138 def _payloadchunks(self, chunknum=0):
1145 def _payloadchunks(self, chunknum=0):
1139 '''seek to specified chunk and start yielding data'''
1146 '''seek to specified chunk and start yielding data'''
1140 if len(self._chunkindex) == 0:
1147 if len(self._chunkindex) == 0:
1141 assert chunknum == 0, 'Must start with chunk 0'
1148 assert chunknum == 0, 'Must start with chunk 0'
1142 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1149 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1143 else:
1150 else:
1144 assert chunknum < len(self._chunkindex), \
1151 assert chunknum < len(self._chunkindex), \
1145 'Unknown chunk %d' % chunknum
1152 'Unknown chunk %d' % chunknum
1146 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1153 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1147
1154
1148 pos = self._chunkindex[chunknum][0]
1155 pos = self._chunkindex[chunknum][0]
1149 payloadsize = self._unpack(_fpayloadsize)[0]
1156 payloadsize = self._unpack(_fpayloadsize)[0]
1150 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1157 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1151 while payloadsize:
1158 while payloadsize:
1152 if payloadsize == flaginterrupt:
1159 if payloadsize == flaginterrupt:
1153 # interruption detection, the handler will now read a
1160 # interruption detection, the handler will now read a
1154 # single part and process it.
1161 # single part and process it.
1155 interrupthandler(self.ui, self._fp)()
1162 interrupthandler(self.ui, self._fp)()
1156 elif payloadsize < 0:
1163 elif payloadsize < 0:
1157 msg = 'negative payload chunk size: %i' % payloadsize
1164 msg = 'negative payload chunk size: %i' % payloadsize
1158 raise error.BundleValueError(msg)
1165 raise error.BundleValueError(msg)
1159 else:
1166 else:
1160 result = self._readexact(payloadsize)
1167 result = self._readexact(payloadsize)
1161 chunknum += 1
1168 chunknum += 1
1162 pos += payloadsize
1169 pos += payloadsize
1163 if chunknum == len(self._chunkindex):
1170 if chunknum == len(self._chunkindex):
1164 self._chunkindex.append((pos,
1171 self._chunkindex.append((pos,
1165 super(unbundlepart, self).tell()))
1172 super(unbundlepart, self).tell()))
1166 yield result
1173 yield result
1167 payloadsize = self._unpack(_fpayloadsize)[0]
1174 payloadsize = self._unpack(_fpayloadsize)[0]
1168 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1175 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1169
1176
1170 def _findchunk(self, pos):
1177 def _findchunk(self, pos):
1171 '''for a given payload position, return a chunk number and offset'''
1178 '''for a given payload position, return a chunk number and offset'''
1172 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1179 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1173 if ppos == pos:
1180 if ppos == pos:
1174 return chunk, 0
1181 return chunk, 0
1175 elif ppos > pos:
1182 elif ppos > pos:
1176 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1183 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1177 raise ValueError('Unknown chunk')
1184 raise ValueError('Unknown chunk')
1178
1185
1179 def _readheader(self):
1186 def _readheader(self):
1180 """read the header and setup the object"""
1187 """read the header and setup the object"""
1181 typesize = self._unpackheader(_fparttypesize)[0]
1188 typesize = self._unpackheader(_fparttypesize)[0]
1182 self.type = self._fromheader(typesize)
1189 self.type = self._fromheader(typesize)
1183 indebug(self.ui, 'part type: "%s"' % self.type)
1190 indebug(self.ui, 'part type: "%s"' % self.type)
1184 self.id = self._unpackheader(_fpartid)[0]
1191 self.id = self._unpackheader(_fpartid)[0]
1185 indebug(self.ui, 'part id: "%s"' % self.id)
1192 indebug(self.ui, 'part id: "%s"' % self.id)
1186 # extract mandatory bit from type
1193 # extract mandatory bit from type
1187 self.mandatory = (self.type != self.type.lower())
1194 self.mandatory = (self.type != self.type.lower())
1188 self.type = self.type.lower()
1195 self.type = self.type.lower()
1189 ## reading parameters
1196 ## reading parameters
1190 # param count
1197 # param count
1191 mancount, advcount = self._unpackheader(_fpartparamcount)
1198 mancount, advcount = self._unpackheader(_fpartparamcount)
1192 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1199 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1193 # param size
1200 # param size
1194 fparamsizes = _makefpartparamsizes(mancount + advcount)
1201 fparamsizes = _makefpartparamsizes(mancount + advcount)
1195 paramsizes = self._unpackheader(fparamsizes)
1202 paramsizes = self._unpackheader(fparamsizes)
1196 # make it a list of couple again
1203 # make it a list of couple again
1197 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1204 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1198 # split mandatory from advisory
1205 # split mandatory from advisory
1199 mansizes = paramsizes[:mancount]
1206 mansizes = paramsizes[:mancount]
1200 advsizes = paramsizes[mancount:]
1207 advsizes = paramsizes[mancount:]
1201 # retrieve param value
1208 # retrieve param value
1202 manparams = []
1209 manparams = []
1203 for key, value in mansizes:
1210 for key, value in mansizes:
1204 manparams.append((self._fromheader(key), self._fromheader(value)))
1211 manparams.append((self._fromheader(key), self._fromheader(value)))
1205 advparams = []
1212 advparams = []
1206 for key, value in advsizes:
1213 for key, value in advsizes:
1207 advparams.append((self._fromheader(key), self._fromheader(value)))
1214 advparams.append((self._fromheader(key), self._fromheader(value)))
1208 self._initparams(manparams, advparams)
1215 self._initparams(manparams, advparams)
1209 ## part payload
1216 ## part payload
1210 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1217 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1211 # we read the data, tell it
1218 # we read the data, tell it
1212 self._initialized = True
1219 self._initialized = True
1213
1220
1214 def read(self, size=None):
1221 def read(self, size=None):
1215 """read payload data"""
1222 """read payload data"""
1216 if not self._initialized:
1223 if not self._initialized:
1217 self._readheader()
1224 self._readheader()
1218 if size is None:
1225 if size is None:
1219 data = self._payloadstream.read()
1226 data = self._payloadstream.read()
1220 else:
1227 else:
1221 data = self._payloadstream.read(size)
1228 data = self._payloadstream.read(size)
1222 self._pos += len(data)
1229 self._pos += len(data)
1223 if size is None or len(data) < size:
1230 if size is None or len(data) < size:
1224 if not self.consumed and self._pos:
1231 if not self.consumed and self._pos:
1225 self.ui.debug('bundle2-input-part: total payload size %i\n'
1232 self.ui.debug('bundle2-input-part: total payload size %i\n'
1226 % self._pos)
1233 % self._pos)
1227 self.consumed = True
1234 self.consumed = True
1228 return data
1235 return data
1229
1236
1230 def tell(self):
1237 def tell(self):
1231 return self._pos
1238 return self._pos
1232
1239
1233 def seek(self, offset, whence=0):
1240 def seek(self, offset, whence=0):
1234 if whence == 0:
1241 if whence == 0:
1235 newpos = offset
1242 newpos = offset
1236 elif whence == 1:
1243 elif whence == 1:
1237 newpos = self._pos + offset
1244 newpos = self._pos + offset
1238 elif whence == 2:
1245 elif whence == 2:
1239 if not self.consumed:
1246 if not self.consumed:
1240 self.read()
1247 self.read()
1241 newpos = self._chunkindex[-1][0] - offset
1248 newpos = self._chunkindex[-1][0] - offset
1242 else:
1249 else:
1243 raise ValueError('Unknown whence value: %r' % (whence,))
1250 raise ValueError('Unknown whence value: %r' % (whence,))
1244
1251
1245 if newpos > self._chunkindex[-1][0] and not self.consumed:
1252 if newpos > self._chunkindex[-1][0] and not self.consumed:
1246 self.read()
1253 self.read()
1247 if not 0 <= newpos <= self._chunkindex[-1][0]:
1254 if not 0 <= newpos <= self._chunkindex[-1][0]:
1248 raise ValueError('Offset out of range')
1255 raise ValueError('Offset out of range')
1249
1256
1250 if self._pos != newpos:
1257 if self._pos != newpos:
1251 chunk, internaloffset = self._findchunk(newpos)
1258 chunk, internaloffset = self._findchunk(newpos)
1252 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1259 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1253 adjust = self.read(internaloffset)
1260 adjust = self.read(internaloffset)
1254 if len(adjust) != internaloffset:
1261 if len(adjust) != internaloffset:
1255 raise error.Abort(_('Seek failed\n'))
1262 raise error.Abort(_('Seek failed\n'))
1256 self._pos = newpos
1263 self._pos = newpos
1257
1264
1258 # These are only the static capabilities.
1265 # These are only the static capabilities.
1259 # Check the 'getrepocaps' function for the rest.
1266 # Check the 'getrepocaps' function for the rest.
1260 capabilities = {'HG20': (),
1267 capabilities = {'HG20': (),
1261 'error': ('abort', 'unsupportedcontent', 'pushraced',
1268 'error': ('abort', 'unsupportedcontent', 'pushraced',
1262 'pushkey'),
1269 'pushkey'),
1263 'listkeys': (),
1270 'listkeys': (),
1264 'pushkey': (),
1271 'pushkey': (),
1265 'digests': tuple(sorted(util.DIGESTS.keys())),
1272 'digests': tuple(sorted(util.DIGESTS.keys())),
1266 'remote-changegroup': ('http', 'https'),
1273 'remote-changegroup': ('http', 'https'),
1267 'hgtagsfnodes': (),
1274 'hgtagsfnodes': (),
1268 }
1275 }
1269
1276
1270 def getrepocaps(repo, allowpushback=False):
1277 def getrepocaps(repo, allowpushback=False):
1271 """return the bundle2 capabilities for a given repo
1278 """return the bundle2 capabilities for a given repo
1272
1279
1273 Exists to allow extensions (like evolution) to mutate the capabilities.
1280 Exists to allow extensions (like evolution) to mutate the capabilities.
1274 """
1281 """
1275 caps = capabilities.copy()
1282 caps = capabilities.copy()
1276 caps['changegroup'] = tuple(sorted(
1283 caps['changegroup'] = tuple(sorted(
1277 changegroup.supportedincomingversions(repo)))
1284 changegroup.supportedincomingversions(repo)))
1278 if obsolete.isenabled(repo, obsolete.exchangeopt):
1285 if obsolete.isenabled(repo, obsolete.exchangeopt):
1279 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1286 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1280 caps['obsmarkers'] = supportedformat
1287 caps['obsmarkers'] = supportedformat
1281 if allowpushback:
1288 if allowpushback:
1282 caps['pushback'] = ()
1289 caps['pushback'] = ()
1283 return caps
1290 return caps
1284
1291
1285 def bundle2caps(remote):
1292 def bundle2caps(remote):
1286 """return the bundle capabilities of a peer as dict"""
1293 """return the bundle capabilities of a peer as dict"""
1287 raw = remote.capable('bundle2')
1294 raw = remote.capable('bundle2')
1288 if not raw and raw != '':
1295 if not raw and raw != '':
1289 return {}
1296 return {}
1290 capsblob = urlreq.unquote(remote.capable('bundle2'))
1297 capsblob = urlreq.unquote(remote.capable('bundle2'))
1291 return decodecaps(capsblob)
1298 return decodecaps(capsblob)
1292
1299
1293 def obsmarkersversion(caps):
1300 def obsmarkersversion(caps):
1294 """extract the list of supported obsmarkers versions from a bundle2caps dict
1301 """extract the list of supported obsmarkers versions from a bundle2caps dict
1295 """
1302 """
1296 obscaps = caps.get('obsmarkers', ())
1303 obscaps = caps.get('obsmarkers', ())
1297 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1304 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1298
1305
1299 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1306 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1300 compopts=None):
1307 compopts=None):
1301 """Write a bundle file and return its filename.
1308 """Write a bundle file and return its filename.
1302
1309
1303 Existing files will not be overwritten.
1310 Existing files will not be overwritten.
1304 If no filename is specified, a temporary file is created.
1311 If no filename is specified, a temporary file is created.
1305 bz2 compression can be turned off.
1312 bz2 compression can be turned off.
1306 The bundle file will be deleted in case of errors.
1313 The bundle file will be deleted in case of errors.
1307 """
1314 """
1308
1315
1309 if bundletype == "HG20":
1316 if bundletype == "HG20":
1310 bundle = bundle20(ui)
1317 bundle = bundle20(ui)
1311 bundle.setcompression(compression, compopts)
1318 bundle.setcompression(compression, compopts)
1312 part = bundle.newpart('changegroup', data=cg.getchunks())
1319 part = bundle.newpart('changegroup', data=cg.getchunks())
1313 part.addparam('version', cg.version)
1320 part.addparam('version', cg.version)
1314 if 'clcount' in cg.extras:
1321 if 'clcount' in cg.extras:
1315 part.addparam('nbchanges', str(cg.extras['clcount']),
1322 part.addparam('nbchanges', str(cg.extras['clcount']),
1316 mandatory=False)
1323 mandatory=False)
1317 chunkiter = bundle.getchunks()
1324 chunkiter = bundle.getchunks()
1318 else:
1325 else:
1319 # compression argument is only for the bundle2 case
1326 # compression argument is only for the bundle2 case
1320 assert compression is None
1327 assert compression is None
1321 if cg.version != '01':
1328 if cg.version != '01':
1322 raise error.Abort(_('old bundle types only supports v1 '
1329 raise error.Abort(_('old bundle types only supports v1 '
1323 'changegroups'))
1330 'changegroups'))
1324 header, comp = bundletypes[bundletype]
1331 header, comp = bundletypes[bundletype]
1325 if comp not in util.compengines.supportedbundletypes:
1332 if comp not in util.compengines.supportedbundletypes:
1326 raise error.Abort(_('unknown stream compression type: %s')
1333 raise error.Abort(_('unknown stream compression type: %s')
1327 % comp)
1334 % comp)
1328 compengine = util.compengines.forbundletype(comp)
1335 compengine = util.compengines.forbundletype(comp)
1329 def chunkiter():
1336 def chunkiter():
1330 yield header
1337 yield header
1331 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1338 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1332 yield chunk
1339 yield chunk
1333 chunkiter = chunkiter()
1340 chunkiter = chunkiter()
1334
1341
1335 # parse the changegroup data, otherwise we will block
1342 # parse the changegroup data, otherwise we will block
1336 # in case of sshrepo because we don't know the end of the stream
1343 # in case of sshrepo because we don't know the end of the stream
1337 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1344 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1338
1345
1339 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1346 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1340 def handlechangegroup(op, inpart):
1347 def handlechangegroup(op, inpart):
1341 """apply a changegroup part on the repo
1348 """apply a changegroup part on the repo
1342
1349
1343 This is a very early implementation that will massive rework before being
1350 This is a very early implementation that will massive rework before being
1344 inflicted to any end-user.
1351 inflicted to any end-user.
1345 """
1352 """
1346 # Make sure we trigger a transaction creation
1353 # Make sure we trigger a transaction creation
1347 #
1354 #
1348 # The addchangegroup function will get a transaction object by itself, but
1355 # The addchangegroup function will get a transaction object by itself, but
1349 # we need to make sure we trigger the creation of a transaction object used
1356 # we need to make sure we trigger the creation of a transaction object used
1350 # for the whole processing scope.
1357 # for the whole processing scope.
1351 op.gettransaction()
1358 op.gettransaction()
1352 unpackerversion = inpart.params.get('version', '01')
1359 unpackerversion = inpart.params.get('version', '01')
1353 # We should raise an appropriate exception here
1360 # We should raise an appropriate exception here
1354 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1361 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1355 # the source and url passed here are overwritten by the one contained in
1362 # the source and url passed here are overwritten by the one contained in
1356 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1363 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1357 nbchangesets = None
1364 nbchangesets = None
1358 if 'nbchanges' in inpart.params:
1365 if 'nbchanges' in inpart.params:
1359 nbchangesets = int(inpart.params.get('nbchanges'))
1366 nbchangesets = int(inpart.params.get('nbchanges'))
1360 if ('treemanifest' in inpart.params and
1367 if ('treemanifest' in inpart.params and
1361 'treemanifest' not in op.repo.requirements):
1368 'treemanifest' not in op.repo.requirements):
1362 if len(op.repo.changelog) != 0:
1369 if len(op.repo.changelog) != 0:
1363 raise error.Abort(_(
1370 raise error.Abort(_(
1364 "bundle contains tree manifests, but local repo is "
1371 "bundle contains tree manifests, but local repo is "
1365 "non-empty and does not use tree manifests"))
1372 "non-empty and does not use tree manifests"))
1366 op.repo.requirements.add('treemanifest')
1373 op.repo.requirements.add('treemanifest')
1367 op.repo._applyopenerreqs()
1374 op.repo._applyopenerreqs()
1368 op.repo._writerequirements()
1375 op.repo._writerequirements()
1369 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1376 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1370 op.records.add('changegroup', {'return': ret})
1377 op.records.add('changegroup', {'return': ret})
1371 if op.reply is not None:
1378 if op.reply is not None:
1372 # This is definitely not the final form of this
1379 # This is definitely not the final form of this
1373 # return. But one need to start somewhere.
1380 # return. But one need to start somewhere.
1374 part = op.reply.newpart('reply:changegroup', mandatory=False)
1381 part = op.reply.newpart('reply:changegroup', mandatory=False)
1375 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1382 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1376 part.addparam('return', '%i' % ret, mandatory=False)
1383 part.addparam('return', '%i' % ret, mandatory=False)
1377 assert not inpart.read()
1384 assert not inpart.read()
1378
1385
1379 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1386 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1380 ['digest:%s' % k for k in util.DIGESTS.keys()])
1387 ['digest:%s' % k for k in util.DIGESTS.keys()])
1381 @parthandler('remote-changegroup', _remotechangegroupparams)
1388 @parthandler('remote-changegroup', _remotechangegroupparams)
1382 def handleremotechangegroup(op, inpart):
1389 def handleremotechangegroup(op, inpart):
1383 """apply a bundle10 on the repo, given an url and validation information
1390 """apply a bundle10 on the repo, given an url and validation information
1384
1391
1385 All the information about the remote bundle to import are given as
1392 All the information about the remote bundle to import are given as
1386 parameters. The parameters include:
1393 parameters. The parameters include:
1387 - url: the url to the bundle10.
1394 - url: the url to the bundle10.
1388 - size: the bundle10 file size. It is used to validate what was
1395 - size: the bundle10 file size. It is used to validate what was
1389 retrieved by the client matches the server knowledge about the bundle.
1396 retrieved by the client matches the server knowledge about the bundle.
1390 - digests: a space separated list of the digest types provided as
1397 - digests: a space separated list of the digest types provided as
1391 parameters.
1398 parameters.
1392 - digest:<digest-type>: the hexadecimal representation of the digest with
1399 - digest:<digest-type>: the hexadecimal representation of the digest with
1393 that name. Like the size, it is used to validate what was retrieved by
1400 that name. Like the size, it is used to validate what was retrieved by
1394 the client matches what the server knows about the bundle.
1401 the client matches what the server knows about the bundle.
1395
1402
1396 When multiple digest types are given, all of them are checked.
1403 When multiple digest types are given, all of them are checked.
1397 """
1404 """
1398 try:
1405 try:
1399 raw_url = inpart.params['url']
1406 raw_url = inpart.params['url']
1400 except KeyError:
1407 except KeyError:
1401 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1408 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1402 parsed_url = util.url(raw_url)
1409 parsed_url = util.url(raw_url)
1403 if parsed_url.scheme not in capabilities['remote-changegroup']:
1410 if parsed_url.scheme not in capabilities['remote-changegroup']:
1404 raise error.Abort(_('remote-changegroup does not support %s urls') %
1411 raise error.Abort(_('remote-changegroup does not support %s urls') %
1405 parsed_url.scheme)
1412 parsed_url.scheme)
1406
1413
1407 try:
1414 try:
1408 size = int(inpart.params['size'])
1415 size = int(inpart.params['size'])
1409 except ValueError:
1416 except ValueError:
1410 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1417 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1411 % 'size')
1418 % 'size')
1412 except KeyError:
1419 except KeyError:
1413 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1420 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1414
1421
1415 digests = {}
1422 digests = {}
1416 for typ in inpart.params.get('digests', '').split():
1423 for typ in inpart.params.get('digests', '').split():
1417 param = 'digest:%s' % typ
1424 param = 'digest:%s' % typ
1418 try:
1425 try:
1419 value = inpart.params[param]
1426 value = inpart.params[param]
1420 except KeyError:
1427 except KeyError:
1421 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1428 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1422 param)
1429 param)
1423 digests[typ] = value
1430 digests[typ] = value
1424
1431
1425 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1432 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1426
1433
1427 # Make sure we trigger a transaction creation
1434 # Make sure we trigger a transaction creation
1428 #
1435 #
1429 # The addchangegroup function will get a transaction object by itself, but
1436 # The addchangegroup function will get a transaction object by itself, but
1430 # we need to make sure we trigger the creation of a transaction object used
1437 # we need to make sure we trigger the creation of a transaction object used
1431 # for the whole processing scope.
1438 # for the whole processing scope.
1432 op.gettransaction()
1439 op.gettransaction()
1433 from . import exchange
1440 from . import exchange
1434 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1441 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1435 if not isinstance(cg, changegroup.cg1unpacker):
1442 if not isinstance(cg, changegroup.cg1unpacker):
1436 raise error.Abort(_('%s: not a bundle version 1.0') %
1443 raise error.Abort(_('%s: not a bundle version 1.0') %
1437 util.hidepassword(raw_url))
1444 util.hidepassword(raw_url))
1438 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1445 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1439 op.records.add('changegroup', {'return': ret})
1446 op.records.add('changegroup', {'return': ret})
1440 if op.reply is not None:
1447 if op.reply is not None:
1441 # This is definitely not the final form of this
1448 # This is definitely not the final form of this
1442 # return. But one need to start somewhere.
1449 # return. But one need to start somewhere.
1443 part = op.reply.newpart('reply:changegroup')
1450 part = op.reply.newpart('reply:changegroup')
1444 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1451 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1445 part.addparam('return', '%i' % ret, mandatory=False)
1452 part.addparam('return', '%i' % ret, mandatory=False)
1446 try:
1453 try:
1447 real_part.validate()
1454 real_part.validate()
1448 except error.Abort as e:
1455 except error.Abort as e:
1449 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1456 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1450 (util.hidepassword(raw_url), str(e)))
1457 (util.hidepassword(raw_url), str(e)))
1451 assert not inpart.read()
1458 assert not inpart.read()
1452
1459
1453 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1460 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1454 def handlereplychangegroup(op, inpart):
1461 def handlereplychangegroup(op, inpart):
1455 ret = int(inpart.params['return'])
1462 ret = int(inpart.params['return'])
1456 replyto = int(inpart.params['in-reply-to'])
1463 replyto = int(inpart.params['in-reply-to'])
1457 op.records.add('changegroup', {'return': ret}, replyto)
1464 op.records.add('changegroup', {'return': ret}, replyto)
1458
1465
1459 @parthandler('check:heads')
1466 @parthandler('check:heads')
1460 def handlecheckheads(op, inpart):
1467 def handlecheckheads(op, inpart):
1461 """check that head of the repo did not change
1468 """check that head of the repo did not change
1462
1469
1463 This is used to detect a push race when using unbundle.
1470 This is used to detect a push race when using unbundle.
1464 This replaces the "heads" argument of unbundle."""
1471 This replaces the "heads" argument of unbundle."""
1465 h = inpart.read(20)
1472 h = inpart.read(20)
1466 heads = []
1473 heads = []
1467 while len(h) == 20:
1474 while len(h) == 20:
1468 heads.append(h)
1475 heads.append(h)
1469 h = inpart.read(20)
1476 h = inpart.read(20)
1470 assert not h
1477 assert not h
1471 # Trigger a transaction so that we are guaranteed to have the lock now.
1478 # Trigger a transaction so that we are guaranteed to have the lock now.
1472 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1479 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1473 op.gettransaction()
1480 op.gettransaction()
1474 if sorted(heads) != sorted(op.repo.heads()):
1481 if sorted(heads) != sorted(op.repo.heads()):
1475 raise error.PushRaced('repository changed while pushing - '
1482 raise error.PushRaced('repository changed while pushing - '
1476 'please try again')
1483 'please try again')
1477
1484
1478 @parthandler('output')
1485 @parthandler('output')
1479 def handleoutput(op, inpart):
1486 def handleoutput(op, inpart):
1480 """forward output captured on the server to the client"""
1487 """forward output captured on the server to the client"""
1481 for line in inpart.read().splitlines():
1488 for line in inpart.read().splitlines():
1482 op.ui.status(_('remote: %s\n') % line)
1489 op.ui.status(_('remote: %s\n') % line)
1483
1490
1484 @parthandler('replycaps')
1491 @parthandler('replycaps')
1485 def handlereplycaps(op, inpart):
1492 def handlereplycaps(op, inpart):
1486 """Notify that a reply bundle should be created
1493 """Notify that a reply bundle should be created
1487
1494
1488 The payload contains the capabilities information for the reply"""
1495 The payload contains the capabilities information for the reply"""
1489 caps = decodecaps(inpart.read())
1496 caps = decodecaps(inpart.read())
1490 if op.reply is None:
1497 if op.reply is None:
1491 op.reply = bundle20(op.ui, caps)
1498 op.reply = bundle20(op.ui, caps)
1492
1499
1493 class AbortFromPart(error.Abort):
1500 class AbortFromPart(error.Abort):
1494 """Sub-class of Abort that denotes an error from a bundle2 part."""
1501 """Sub-class of Abort that denotes an error from a bundle2 part."""
1495
1502
1496 @parthandler('error:abort', ('message', 'hint'))
1503 @parthandler('error:abort', ('message', 'hint'))
1497 def handleerrorabort(op, inpart):
1504 def handleerrorabort(op, inpart):
1498 """Used to transmit abort error over the wire"""
1505 """Used to transmit abort error over the wire"""
1499 raise AbortFromPart(inpart.params['message'],
1506 raise AbortFromPart(inpart.params['message'],
1500 hint=inpart.params.get('hint'))
1507 hint=inpart.params.get('hint'))
1501
1508
1502 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1509 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1503 'in-reply-to'))
1510 'in-reply-to'))
1504 def handleerrorpushkey(op, inpart):
1511 def handleerrorpushkey(op, inpart):
1505 """Used to transmit failure of a mandatory pushkey over the wire"""
1512 """Used to transmit failure of a mandatory pushkey over the wire"""
1506 kwargs = {}
1513 kwargs = {}
1507 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1514 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1508 value = inpart.params.get(name)
1515 value = inpart.params.get(name)
1509 if value is not None:
1516 if value is not None:
1510 kwargs[name] = value
1517 kwargs[name] = value
1511 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1518 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1512
1519
1513 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1520 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1514 def handleerrorunsupportedcontent(op, inpart):
1521 def handleerrorunsupportedcontent(op, inpart):
1515 """Used to transmit unknown content error over the wire"""
1522 """Used to transmit unknown content error over the wire"""
1516 kwargs = {}
1523 kwargs = {}
1517 parttype = inpart.params.get('parttype')
1524 parttype = inpart.params.get('parttype')
1518 if parttype is not None:
1525 if parttype is not None:
1519 kwargs['parttype'] = parttype
1526 kwargs['parttype'] = parttype
1520 params = inpart.params.get('params')
1527 params = inpart.params.get('params')
1521 if params is not None:
1528 if params is not None:
1522 kwargs['params'] = params.split('\0')
1529 kwargs['params'] = params.split('\0')
1523
1530
1524 raise error.BundleUnknownFeatureError(**kwargs)
1531 raise error.BundleUnknownFeatureError(**kwargs)
1525
1532
1526 @parthandler('error:pushraced', ('message',))
1533 @parthandler('error:pushraced', ('message',))
1527 def handleerrorpushraced(op, inpart):
1534 def handleerrorpushraced(op, inpart):
1528 """Used to transmit push race error over the wire"""
1535 """Used to transmit push race error over the wire"""
1529 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1536 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1530
1537
1531 @parthandler('listkeys', ('namespace',))
1538 @parthandler('listkeys', ('namespace',))
1532 def handlelistkeys(op, inpart):
1539 def handlelistkeys(op, inpart):
1533 """retrieve pushkey namespace content stored in a bundle2"""
1540 """retrieve pushkey namespace content stored in a bundle2"""
1534 namespace = inpart.params['namespace']
1541 namespace = inpart.params['namespace']
1535 r = pushkey.decodekeys(inpart.read())
1542 r = pushkey.decodekeys(inpart.read())
1536 op.records.add('listkeys', (namespace, r))
1543 op.records.add('listkeys', (namespace, r))
1537
1544
1538 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1545 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1539 def handlepushkey(op, inpart):
1546 def handlepushkey(op, inpart):
1540 """process a pushkey request"""
1547 """process a pushkey request"""
1541 dec = pushkey.decode
1548 dec = pushkey.decode
1542 namespace = dec(inpart.params['namespace'])
1549 namespace = dec(inpart.params['namespace'])
1543 key = dec(inpart.params['key'])
1550 key = dec(inpart.params['key'])
1544 old = dec(inpart.params['old'])
1551 old = dec(inpart.params['old'])
1545 new = dec(inpart.params['new'])
1552 new = dec(inpart.params['new'])
1546 # Grab the transaction to ensure that we have the lock before performing the
1553 # Grab the transaction to ensure that we have the lock before performing the
1547 # pushkey.
1554 # pushkey.
1548 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1555 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1549 op.gettransaction()
1556 op.gettransaction()
1550 ret = op.repo.pushkey(namespace, key, old, new)
1557 ret = op.repo.pushkey(namespace, key, old, new)
1551 record = {'namespace': namespace,
1558 record = {'namespace': namespace,
1552 'key': key,
1559 'key': key,
1553 'old': old,
1560 'old': old,
1554 'new': new}
1561 'new': new}
1555 op.records.add('pushkey', record)
1562 op.records.add('pushkey', record)
1556 if op.reply is not None:
1563 if op.reply is not None:
1557 rpart = op.reply.newpart('reply:pushkey')
1564 rpart = op.reply.newpart('reply:pushkey')
1558 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1565 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1559 rpart.addparam('return', '%i' % ret, mandatory=False)
1566 rpart.addparam('return', '%i' % ret, mandatory=False)
1560 if inpart.mandatory and not ret:
1567 if inpart.mandatory and not ret:
1561 kwargs = {}
1568 kwargs = {}
1562 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1569 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1563 if key in inpart.params:
1570 if key in inpart.params:
1564 kwargs[key] = inpart.params[key]
1571 kwargs[key] = inpart.params[key]
1565 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1572 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1566
1573
1567 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1574 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1568 def handlepushkeyreply(op, inpart):
1575 def handlepushkeyreply(op, inpart):
1569 """retrieve the result of a pushkey request"""
1576 """retrieve the result of a pushkey request"""
1570 ret = int(inpart.params['return'])
1577 ret = int(inpart.params['return'])
1571 partid = int(inpart.params['in-reply-to'])
1578 partid = int(inpart.params['in-reply-to'])
1572 op.records.add('pushkey', {'return': ret}, partid)
1579 op.records.add('pushkey', {'return': ret}, partid)
1573
1580
1574 @parthandler('obsmarkers')
1581 @parthandler('obsmarkers')
1575 def handleobsmarker(op, inpart):
1582 def handleobsmarker(op, inpart):
1576 """add a stream of obsmarkers to the repo"""
1583 """add a stream of obsmarkers to the repo"""
1577 tr = op.gettransaction()
1584 tr = op.gettransaction()
1578 markerdata = inpart.read()
1585 markerdata = inpart.read()
1579 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1586 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1580 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1587 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1581 % len(markerdata))
1588 % len(markerdata))
1582 # The mergemarkers call will crash if marker creation is not enabled.
1589 # The mergemarkers call will crash if marker creation is not enabled.
1583 # we want to avoid this if the part is advisory.
1590 # we want to avoid this if the part is advisory.
1584 if not inpart.mandatory and op.repo.obsstore.readonly:
1591 if not inpart.mandatory and op.repo.obsstore.readonly:
1585 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1592 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1586 return
1593 return
1587 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1594 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1588 if new:
1595 if new:
1589 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1596 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1590 op.records.add('obsmarkers', {'new': new})
1597 op.records.add('obsmarkers', {'new': new})
1591 if op.reply is not None:
1598 if op.reply is not None:
1592 rpart = op.reply.newpart('reply:obsmarkers')
1599 rpart = op.reply.newpart('reply:obsmarkers')
1593 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1600 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1594 rpart.addparam('new', '%i' % new, mandatory=False)
1601 rpart.addparam('new', '%i' % new, mandatory=False)
1595
1602
1596
1603
1597 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1604 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1598 def handleobsmarkerreply(op, inpart):
1605 def handleobsmarkerreply(op, inpart):
1599 """retrieve the result of a pushkey request"""
1606 """retrieve the result of a pushkey request"""
1600 ret = int(inpart.params['new'])
1607 ret = int(inpart.params['new'])
1601 partid = int(inpart.params['in-reply-to'])
1608 partid = int(inpart.params['in-reply-to'])
1602 op.records.add('obsmarkers', {'new': ret}, partid)
1609 op.records.add('obsmarkers', {'new': ret}, partid)
1603
1610
1604 @parthandler('hgtagsfnodes')
1611 @parthandler('hgtagsfnodes')
1605 def handlehgtagsfnodes(op, inpart):
1612 def handlehgtagsfnodes(op, inpart):
1606 """Applies .hgtags fnodes cache entries to the local repo.
1613 """Applies .hgtags fnodes cache entries to the local repo.
1607
1614
1608 Payload is pairs of 20 byte changeset nodes and filenodes.
1615 Payload is pairs of 20 byte changeset nodes and filenodes.
1609 """
1616 """
1610 # Grab the transaction so we ensure that we have the lock at this point.
1617 # Grab the transaction so we ensure that we have the lock at this point.
1611 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1618 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1612 op.gettransaction()
1619 op.gettransaction()
1613 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1620 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1614
1621
1615 count = 0
1622 count = 0
1616 while True:
1623 while True:
1617 node = inpart.read(20)
1624 node = inpart.read(20)
1618 fnode = inpart.read(20)
1625 fnode = inpart.read(20)
1619 if len(node) < 20 or len(fnode) < 20:
1626 if len(node) < 20 or len(fnode) < 20:
1620 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1627 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1621 break
1628 break
1622 cache.setfnode(node, fnode)
1629 cache.setfnode(node, fnode)
1623 count += 1
1630 count += 1
1624
1631
1625 cache.write()
1632 cache.write()
1626 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1633 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now