##// END OF EJS Templates
bundle2: only emit compressed chunks if they have data...
Gregory Szorc -
r30177:9626022f default
parent child Browse files
Show More
@@ -1,1622 +1,1626
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 pushkey,
161 pushkey,
162 pycompat,
162 pycompat,
163 tags,
163 tags,
164 url,
164 url,
165 util,
165 util,
166 )
166 )
167
167
168 urlerr = util.urlerr
168 urlerr = util.urlerr
169 urlreq = util.urlreq
169 urlreq = util.urlreq
170
170
171 _pack = struct.pack
171 _pack = struct.pack
172 _unpack = struct.unpack
172 _unpack = struct.unpack
173
173
174 _fstreamparamsize = '>i'
174 _fstreamparamsize = '>i'
175 _fpartheadersize = '>i'
175 _fpartheadersize = '>i'
176 _fparttypesize = '>B'
176 _fparttypesize = '>B'
177 _fpartid = '>I'
177 _fpartid = '>I'
178 _fpayloadsize = '>i'
178 _fpayloadsize = '>i'
179 _fpartparamcount = '>BB'
179 _fpartparamcount = '>BB'
180
180
181 preferedchunksize = 4096
181 preferedchunksize = 4096
182
182
183 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
183 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
184
184
185 def outdebug(ui, message):
185 def outdebug(ui, message):
186 """debug regarding output stream (bundling)"""
186 """debug regarding output stream (bundling)"""
187 if ui.configbool('devel', 'bundle2.debug', False):
187 if ui.configbool('devel', 'bundle2.debug', False):
188 ui.debug('bundle2-output: %s\n' % message)
188 ui.debug('bundle2-output: %s\n' % message)
189
189
190 def indebug(ui, message):
190 def indebug(ui, message):
191 """debug on input stream (unbundling)"""
191 """debug on input stream (unbundling)"""
192 if ui.configbool('devel', 'bundle2.debug', False):
192 if ui.configbool('devel', 'bundle2.debug', False):
193 ui.debug('bundle2-input: %s\n' % message)
193 ui.debug('bundle2-input: %s\n' % message)
194
194
195 def validateparttype(parttype):
195 def validateparttype(parttype):
196 """raise ValueError if a parttype contains invalid character"""
196 """raise ValueError if a parttype contains invalid character"""
197 if _parttypeforbidden.search(parttype):
197 if _parttypeforbidden.search(parttype):
198 raise ValueError(parttype)
198 raise ValueError(parttype)
199
199
200 def _makefpartparamsizes(nbparams):
200 def _makefpartparamsizes(nbparams):
201 """return a struct format to read part parameter sizes
201 """return a struct format to read part parameter sizes
202
202
203 The number parameters is variable so we need to build that format
203 The number parameters is variable so we need to build that format
204 dynamically.
204 dynamically.
205 """
205 """
206 return '>'+('BB'*nbparams)
206 return '>'+('BB'*nbparams)
207
207
208 parthandlermapping = {}
208 parthandlermapping = {}
209
209
210 def parthandler(parttype, params=()):
210 def parthandler(parttype, params=()):
211 """decorator that register a function as a bundle2 part handler
211 """decorator that register a function as a bundle2 part handler
212
212
213 eg::
213 eg::
214
214
215 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
215 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
216 def myparttypehandler(...):
216 def myparttypehandler(...):
217 '''process a part of type "my part".'''
217 '''process a part of type "my part".'''
218 ...
218 ...
219 """
219 """
220 validateparttype(parttype)
220 validateparttype(parttype)
221 def _decorator(func):
221 def _decorator(func):
222 lparttype = parttype.lower() # enforce lower case matching.
222 lparttype = parttype.lower() # enforce lower case matching.
223 assert lparttype not in parthandlermapping
223 assert lparttype not in parthandlermapping
224 parthandlermapping[lparttype] = func
224 parthandlermapping[lparttype] = func
225 func.params = frozenset(params)
225 func.params = frozenset(params)
226 return func
226 return func
227 return _decorator
227 return _decorator
228
228
229 class unbundlerecords(object):
229 class unbundlerecords(object):
230 """keep record of what happens during and unbundle
230 """keep record of what happens during and unbundle
231
231
232 New records are added using `records.add('cat', obj)`. Where 'cat' is a
232 New records are added using `records.add('cat', obj)`. Where 'cat' is a
233 category of record and obj is an arbitrary object.
233 category of record and obj is an arbitrary object.
234
234
235 `records['cat']` will return all entries of this category 'cat'.
235 `records['cat']` will return all entries of this category 'cat'.
236
236
237 Iterating on the object itself will yield `('category', obj)` tuples
237 Iterating on the object itself will yield `('category', obj)` tuples
238 for all entries.
238 for all entries.
239
239
240 All iterations happens in chronological order.
240 All iterations happens in chronological order.
241 """
241 """
242
242
243 def __init__(self):
243 def __init__(self):
244 self._categories = {}
244 self._categories = {}
245 self._sequences = []
245 self._sequences = []
246 self._replies = {}
246 self._replies = {}
247
247
248 def add(self, category, entry, inreplyto=None):
248 def add(self, category, entry, inreplyto=None):
249 """add a new record of a given category.
249 """add a new record of a given category.
250
250
251 The entry can then be retrieved in the list returned by
251 The entry can then be retrieved in the list returned by
252 self['category']."""
252 self['category']."""
253 self._categories.setdefault(category, []).append(entry)
253 self._categories.setdefault(category, []).append(entry)
254 self._sequences.append((category, entry))
254 self._sequences.append((category, entry))
255 if inreplyto is not None:
255 if inreplyto is not None:
256 self.getreplies(inreplyto).add(category, entry)
256 self.getreplies(inreplyto).add(category, entry)
257
257
258 def getreplies(self, partid):
258 def getreplies(self, partid):
259 """get the records that are replies to a specific part"""
259 """get the records that are replies to a specific part"""
260 return self._replies.setdefault(partid, unbundlerecords())
260 return self._replies.setdefault(partid, unbundlerecords())
261
261
262 def __getitem__(self, cat):
262 def __getitem__(self, cat):
263 return tuple(self._categories.get(cat, ()))
263 return tuple(self._categories.get(cat, ()))
264
264
265 def __iter__(self):
265 def __iter__(self):
266 return iter(self._sequences)
266 return iter(self._sequences)
267
267
268 def __len__(self):
268 def __len__(self):
269 return len(self._sequences)
269 return len(self._sequences)
270
270
271 def __nonzero__(self):
271 def __nonzero__(self):
272 return bool(self._sequences)
272 return bool(self._sequences)
273
273
274 class bundleoperation(object):
274 class bundleoperation(object):
275 """an object that represents a single bundling process
275 """an object that represents a single bundling process
276
276
277 Its purpose is to carry unbundle-related objects and states.
277 Its purpose is to carry unbundle-related objects and states.
278
278
279 A new object should be created at the beginning of each bundle processing.
279 A new object should be created at the beginning of each bundle processing.
280 The object is to be returned by the processing function.
280 The object is to be returned by the processing function.
281
281
282 The object has very little content now it will ultimately contain:
282 The object has very little content now it will ultimately contain:
283 * an access to the repo the bundle is applied to,
283 * an access to the repo the bundle is applied to,
284 * a ui object,
284 * a ui object,
285 * a way to retrieve a transaction to add changes to the repo,
285 * a way to retrieve a transaction to add changes to the repo,
286 * a way to record the result of processing each part,
286 * a way to record the result of processing each part,
287 * a way to construct a bundle response when applicable.
287 * a way to construct a bundle response when applicable.
288 """
288 """
289
289
290 def __init__(self, repo, transactiongetter, captureoutput=True):
290 def __init__(self, repo, transactiongetter, captureoutput=True):
291 self.repo = repo
291 self.repo = repo
292 self.ui = repo.ui
292 self.ui = repo.ui
293 self.records = unbundlerecords()
293 self.records = unbundlerecords()
294 self.gettransaction = transactiongetter
294 self.gettransaction = transactiongetter
295 self.reply = None
295 self.reply = None
296 self.captureoutput = captureoutput
296 self.captureoutput = captureoutput
297
297
298 class TransactionUnavailable(RuntimeError):
298 class TransactionUnavailable(RuntimeError):
299 pass
299 pass
300
300
301 def _notransaction():
301 def _notransaction():
302 """default method to get a transaction while processing a bundle
302 """default method to get a transaction while processing a bundle
303
303
304 Raise an exception to highlight the fact that no transaction was expected
304 Raise an exception to highlight the fact that no transaction was expected
305 to be created"""
305 to be created"""
306 raise TransactionUnavailable()
306 raise TransactionUnavailable()
307
307
308 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
308 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
309 # transform me into unbundler.apply() as soon as the freeze is lifted
309 # transform me into unbundler.apply() as soon as the freeze is lifted
310 tr.hookargs['bundle2'] = '1'
310 tr.hookargs['bundle2'] = '1'
311 if source is not None and 'source' not in tr.hookargs:
311 if source is not None and 'source' not in tr.hookargs:
312 tr.hookargs['source'] = source
312 tr.hookargs['source'] = source
313 if url is not None and 'url' not in tr.hookargs:
313 if url is not None and 'url' not in tr.hookargs:
314 tr.hookargs['url'] = url
314 tr.hookargs['url'] = url
315 return processbundle(repo, unbundler, lambda: tr, op=op)
315 return processbundle(repo, unbundler, lambda: tr, op=op)
316
316
317 def processbundle(repo, unbundler, transactiongetter=None, op=None):
317 def processbundle(repo, unbundler, transactiongetter=None, op=None):
318 """This function process a bundle, apply effect to/from a repo
318 """This function process a bundle, apply effect to/from a repo
319
319
320 It iterates over each part then searches for and uses the proper handling
320 It iterates over each part then searches for and uses the proper handling
321 code to process the part. Parts are processed in order.
321 code to process the part. Parts are processed in order.
322
322
323 This is very early version of this function that will be strongly reworked
323 This is very early version of this function that will be strongly reworked
324 before final usage.
324 before final usage.
325
325
326 Unknown Mandatory part will abort the process.
326 Unknown Mandatory part will abort the process.
327
327
328 It is temporarily possible to provide a prebuilt bundleoperation to the
328 It is temporarily possible to provide a prebuilt bundleoperation to the
329 function. This is used to ensure output is properly propagated in case of
329 function. This is used to ensure output is properly propagated in case of
330 an error during the unbundling. This output capturing part will likely be
330 an error during the unbundling. This output capturing part will likely be
331 reworked and this ability will probably go away in the process.
331 reworked and this ability will probably go away in the process.
332 """
332 """
333 if op is None:
333 if op is None:
334 if transactiongetter is None:
334 if transactiongetter is None:
335 transactiongetter = _notransaction
335 transactiongetter = _notransaction
336 op = bundleoperation(repo, transactiongetter)
336 op = bundleoperation(repo, transactiongetter)
337 # todo:
337 # todo:
338 # - replace this is a init function soon.
338 # - replace this is a init function soon.
339 # - exception catching
339 # - exception catching
340 unbundler.params
340 unbundler.params
341 if repo.ui.debugflag:
341 if repo.ui.debugflag:
342 msg = ['bundle2-input-bundle:']
342 msg = ['bundle2-input-bundle:']
343 if unbundler.params:
343 if unbundler.params:
344 msg.append(' %i params')
344 msg.append(' %i params')
345 if op.gettransaction is None:
345 if op.gettransaction is None:
346 msg.append(' no-transaction')
346 msg.append(' no-transaction')
347 else:
347 else:
348 msg.append(' with-transaction')
348 msg.append(' with-transaction')
349 msg.append('\n')
349 msg.append('\n')
350 repo.ui.debug(''.join(msg))
350 repo.ui.debug(''.join(msg))
351 iterparts = enumerate(unbundler.iterparts())
351 iterparts = enumerate(unbundler.iterparts())
352 part = None
352 part = None
353 nbpart = 0
353 nbpart = 0
354 try:
354 try:
355 for nbpart, part in iterparts:
355 for nbpart, part in iterparts:
356 _processpart(op, part)
356 _processpart(op, part)
357 except Exception as exc:
357 except Exception as exc:
358 for nbpart, part in iterparts:
358 for nbpart, part in iterparts:
359 # consume the bundle content
359 # consume the bundle content
360 part.seek(0, 2)
360 part.seek(0, 2)
361 # Small hack to let caller code distinguish exceptions from bundle2
361 # Small hack to let caller code distinguish exceptions from bundle2
362 # processing from processing the old format. This is mostly
362 # processing from processing the old format. This is mostly
363 # needed to handle different return codes to unbundle according to the
363 # needed to handle different return codes to unbundle according to the
364 # type of bundle. We should probably clean up or drop this return code
364 # type of bundle. We should probably clean up or drop this return code
365 # craziness in a future version.
365 # craziness in a future version.
366 exc.duringunbundle2 = True
366 exc.duringunbundle2 = True
367 salvaged = []
367 salvaged = []
368 replycaps = None
368 replycaps = None
369 if op.reply is not None:
369 if op.reply is not None:
370 salvaged = op.reply.salvageoutput()
370 salvaged = op.reply.salvageoutput()
371 replycaps = op.reply.capabilities
371 replycaps = op.reply.capabilities
372 exc._replycaps = replycaps
372 exc._replycaps = replycaps
373 exc._bundle2salvagedoutput = salvaged
373 exc._bundle2salvagedoutput = salvaged
374 raise
374 raise
375 finally:
375 finally:
376 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
376 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
377
377
378 return op
378 return op
379
379
380 def _processpart(op, part):
380 def _processpart(op, part):
381 """process a single part from a bundle
381 """process a single part from a bundle
382
382
383 The part is guaranteed to have been fully consumed when the function exits
383 The part is guaranteed to have been fully consumed when the function exits
384 (even if an exception is raised)."""
384 (even if an exception is raised)."""
385 status = 'unknown' # used by debug output
385 status = 'unknown' # used by debug output
386 hardabort = False
386 hardabort = False
387 try:
387 try:
388 try:
388 try:
389 handler = parthandlermapping.get(part.type)
389 handler = parthandlermapping.get(part.type)
390 if handler is None:
390 if handler is None:
391 status = 'unsupported-type'
391 status = 'unsupported-type'
392 raise error.BundleUnknownFeatureError(parttype=part.type)
392 raise error.BundleUnknownFeatureError(parttype=part.type)
393 indebug(op.ui, 'found a handler for part %r' % part.type)
393 indebug(op.ui, 'found a handler for part %r' % part.type)
394 unknownparams = part.mandatorykeys - handler.params
394 unknownparams = part.mandatorykeys - handler.params
395 if unknownparams:
395 if unknownparams:
396 unknownparams = list(unknownparams)
396 unknownparams = list(unknownparams)
397 unknownparams.sort()
397 unknownparams.sort()
398 status = 'unsupported-params (%s)' % unknownparams
398 status = 'unsupported-params (%s)' % unknownparams
399 raise error.BundleUnknownFeatureError(parttype=part.type,
399 raise error.BundleUnknownFeatureError(parttype=part.type,
400 params=unknownparams)
400 params=unknownparams)
401 status = 'supported'
401 status = 'supported'
402 except error.BundleUnknownFeatureError as exc:
402 except error.BundleUnknownFeatureError as exc:
403 if part.mandatory: # mandatory parts
403 if part.mandatory: # mandatory parts
404 raise
404 raise
405 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
405 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
406 return # skip to part processing
406 return # skip to part processing
407 finally:
407 finally:
408 if op.ui.debugflag:
408 if op.ui.debugflag:
409 msg = ['bundle2-input-part: "%s"' % part.type]
409 msg = ['bundle2-input-part: "%s"' % part.type]
410 if not part.mandatory:
410 if not part.mandatory:
411 msg.append(' (advisory)')
411 msg.append(' (advisory)')
412 nbmp = len(part.mandatorykeys)
412 nbmp = len(part.mandatorykeys)
413 nbap = len(part.params) - nbmp
413 nbap = len(part.params) - nbmp
414 if nbmp or nbap:
414 if nbmp or nbap:
415 msg.append(' (params:')
415 msg.append(' (params:')
416 if nbmp:
416 if nbmp:
417 msg.append(' %i mandatory' % nbmp)
417 msg.append(' %i mandatory' % nbmp)
418 if nbap:
418 if nbap:
419 msg.append(' %i advisory' % nbmp)
419 msg.append(' %i advisory' % nbmp)
420 msg.append(')')
420 msg.append(')')
421 msg.append(' %s\n' % status)
421 msg.append(' %s\n' % status)
422 op.ui.debug(''.join(msg))
422 op.ui.debug(''.join(msg))
423
423
424 # handler is called outside the above try block so that we don't
424 # handler is called outside the above try block so that we don't
425 # risk catching KeyErrors from anything other than the
425 # risk catching KeyErrors from anything other than the
426 # parthandlermapping lookup (any KeyError raised by handler()
426 # parthandlermapping lookup (any KeyError raised by handler()
427 # itself represents a defect of a different variety).
427 # itself represents a defect of a different variety).
428 output = None
428 output = None
429 if op.captureoutput and op.reply is not None:
429 if op.captureoutput and op.reply is not None:
430 op.ui.pushbuffer(error=True, subproc=True)
430 op.ui.pushbuffer(error=True, subproc=True)
431 output = ''
431 output = ''
432 try:
432 try:
433 handler(op, part)
433 handler(op, part)
434 finally:
434 finally:
435 if output is not None:
435 if output is not None:
436 output = op.ui.popbuffer()
436 output = op.ui.popbuffer()
437 if output:
437 if output:
438 outpart = op.reply.newpart('output', data=output,
438 outpart = op.reply.newpart('output', data=output,
439 mandatory=False)
439 mandatory=False)
440 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
440 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
441 # If exiting or interrupted, do not attempt to seek the stream in the
441 # If exiting or interrupted, do not attempt to seek the stream in the
442 # finally block below. This makes abort faster.
442 # finally block below. This makes abort faster.
443 except (SystemExit, KeyboardInterrupt):
443 except (SystemExit, KeyboardInterrupt):
444 hardabort = True
444 hardabort = True
445 raise
445 raise
446 finally:
446 finally:
447 # consume the part content to not corrupt the stream.
447 # consume the part content to not corrupt the stream.
448 if not hardabort:
448 if not hardabort:
449 part.seek(0, 2)
449 part.seek(0, 2)
450
450
451
451
452 def decodecaps(blob):
452 def decodecaps(blob):
453 """decode a bundle2 caps bytes blob into a dictionary
453 """decode a bundle2 caps bytes blob into a dictionary
454
454
455 The blob is a list of capabilities (one per line)
455 The blob is a list of capabilities (one per line)
456 Capabilities may have values using a line of the form::
456 Capabilities may have values using a line of the form::
457
457
458 capability=value1,value2,value3
458 capability=value1,value2,value3
459
459
460 The values are always a list."""
460 The values are always a list."""
461 caps = {}
461 caps = {}
462 for line in blob.splitlines():
462 for line in blob.splitlines():
463 if not line:
463 if not line:
464 continue
464 continue
465 if '=' not in line:
465 if '=' not in line:
466 key, vals = line, ()
466 key, vals = line, ()
467 else:
467 else:
468 key, vals = line.split('=', 1)
468 key, vals = line.split('=', 1)
469 vals = vals.split(',')
469 vals = vals.split(',')
470 key = urlreq.unquote(key)
470 key = urlreq.unquote(key)
471 vals = [urlreq.unquote(v) for v in vals]
471 vals = [urlreq.unquote(v) for v in vals]
472 caps[key] = vals
472 caps[key] = vals
473 return caps
473 return caps
474
474
475 def encodecaps(caps):
475 def encodecaps(caps):
476 """encode a bundle2 caps dictionary into a bytes blob"""
476 """encode a bundle2 caps dictionary into a bytes blob"""
477 chunks = []
477 chunks = []
478 for ca in sorted(caps):
478 for ca in sorted(caps):
479 vals = caps[ca]
479 vals = caps[ca]
480 ca = urlreq.quote(ca)
480 ca = urlreq.quote(ca)
481 vals = [urlreq.quote(v) for v in vals]
481 vals = [urlreq.quote(v) for v in vals]
482 if vals:
482 if vals:
483 ca = "%s=%s" % (ca, ','.join(vals))
483 ca = "%s=%s" % (ca, ','.join(vals))
484 chunks.append(ca)
484 chunks.append(ca)
485 return '\n'.join(chunks)
485 return '\n'.join(chunks)
486
486
487 bundletypes = {
487 bundletypes = {
488 "": ("", None), # only when using unbundle on ssh and old http servers
488 "": ("", None), # only when using unbundle on ssh and old http servers
489 # since the unification ssh accepts a header but there
489 # since the unification ssh accepts a header but there
490 # is no capability signaling it.
490 # is no capability signaling it.
491 "HG20": (), # special-cased below
491 "HG20": (), # special-cased below
492 "HG10UN": ("HG10UN", None),
492 "HG10UN": ("HG10UN", None),
493 "HG10BZ": ("HG10", 'BZ'),
493 "HG10BZ": ("HG10", 'BZ'),
494 "HG10GZ": ("HG10GZ", 'GZ'),
494 "HG10GZ": ("HG10GZ", 'GZ'),
495 }
495 }
496
496
497 # hgweb uses this list to communicate its preferred type
497 # hgweb uses this list to communicate its preferred type
498 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
498 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
499
499
500 class bundle20(object):
500 class bundle20(object):
501 """represent an outgoing bundle2 container
501 """represent an outgoing bundle2 container
502
502
503 Use the `addparam` method to add stream level parameter. and `newpart` to
503 Use the `addparam` method to add stream level parameter. and `newpart` to
504 populate it. Then call `getchunks` to retrieve all the binary chunks of
504 populate it. Then call `getchunks` to retrieve all the binary chunks of
505 data that compose the bundle2 container."""
505 data that compose the bundle2 container."""
506
506
507 _magicstring = 'HG20'
507 _magicstring = 'HG20'
508
508
509 def __init__(self, ui, capabilities=()):
509 def __init__(self, ui, capabilities=()):
510 self.ui = ui
510 self.ui = ui
511 self._params = []
511 self._params = []
512 self._parts = []
512 self._parts = []
513 self.capabilities = dict(capabilities)
513 self.capabilities = dict(capabilities)
514 self._compressor = util.compressors[None]()
514 self._compressor = util.compressors[None]()
515
515
516 def setcompression(self, alg):
516 def setcompression(self, alg):
517 """setup core part compression to <alg>"""
517 """setup core part compression to <alg>"""
518 if alg is None:
518 if alg is None:
519 return
519 return
520 assert not any(n.lower() == 'Compression' for n, v in self._params)
520 assert not any(n.lower() == 'Compression' for n, v in self._params)
521 self.addparam('Compression', alg)
521 self.addparam('Compression', alg)
522 self._compressor = util.compressors[alg]()
522 self._compressor = util.compressors[alg]()
523
523
524 @property
524 @property
525 def nbparts(self):
525 def nbparts(self):
526 """total number of parts added to the bundler"""
526 """total number of parts added to the bundler"""
527 return len(self._parts)
527 return len(self._parts)
528
528
529 # methods used to defines the bundle2 content
529 # methods used to defines the bundle2 content
530 def addparam(self, name, value=None):
530 def addparam(self, name, value=None):
531 """add a stream level parameter"""
531 """add a stream level parameter"""
532 if not name:
532 if not name:
533 raise ValueError('empty parameter name')
533 raise ValueError('empty parameter name')
534 if name[0] not in string.letters:
534 if name[0] not in string.letters:
535 raise ValueError('non letter first character: %r' % name)
535 raise ValueError('non letter first character: %r' % name)
536 self._params.append((name, value))
536 self._params.append((name, value))
537
537
538 def addpart(self, part):
538 def addpart(self, part):
539 """add a new part to the bundle2 container
539 """add a new part to the bundle2 container
540
540
541 Parts contains the actual applicative payload."""
541 Parts contains the actual applicative payload."""
542 assert part.id is None
542 assert part.id is None
543 part.id = len(self._parts) # very cheap counter
543 part.id = len(self._parts) # very cheap counter
544 self._parts.append(part)
544 self._parts.append(part)
545
545
546 def newpart(self, typeid, *args, **kwargs):
546 def newpart(self, typeid, *args, **kwargs):
547 """create a new part and add it to the containers
547 """create a new part and add it to the containers
548
548
549 As the part is directly added to the containers. For now, this means
549 As the part is directly added to the containers. For now, this means
550 that any failure to properly initialize the part after calling
550 that any failure to properly initialize the part after calling
551 ``newpart`` should result in a failure of the whole bundling process.
551 ``newpart`` should result in a failure of the whole bundling process.
552
552
553 You can still fall back to manually create and add if you need better
553 You can still fall back to manually create and add if you need better
554 control."""
554 control."""
555 part = bundlepart(typeid, *args, **kwargs)
555 part = bundlepart(typeid, *args, **kwargs)
556 self.addpart(part)
556 self.addpart(part)
557 return part
557 return part
558
558
559 # methods used to generate the bundle2 stream
559 # methods used to generate the bundle2 stream
560 def getchunks(self):
560 def getchunks(self):
561 if self.ui.debugflag:
561 if self.ui.debugflag:
562 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
562 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
563 if self._params:
563 if self._params:
564 msg.append(' (%i params)' % len(self._params))
564 msg.append(' (%i params)' % len(self._params))
565 msg.append(' %i parts total\n' % len(self._parts))
565 msg.append(' %i parts total\n' % len(self._parts))
566 self.ui.debug(''.join(msg))
566 self.ui.debug(''.join(msg))
567 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
567 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
568 yield self._magicstring
568 yield self._magicstring
569 param = self._paramchunk()
569 param = self._paramchunk()
570 outdebug(self.ui, 'bundle parameter: %s' % param)
570 outdebug(self.ui, 'bundle parameter: %s' % param)
571 yield _pack(_fstreamparamsize, len(param))
571 yield _pack(_fstreamparamsize, len(param))
572 if param:
572 if param:
573 yield param
573 yield param
574 # starting compression
574 # starting compression
575 for chunk in self._getcorechunk():
575 for chunk in self._getcorechunk():
576 yield self._compressor.compress(chunk)
576 data = self._compressor.compress(chunk)
577 if data:
578 yield data
577 yield self._compressor.flush()
579 yield self._compressor.flush()
578
580
579 def _paramchunk(self):
581 def _paramchunk(self):
580 """return a encoded version of all stream parameters"""
582 """return a encoded version of all stream parameters"""
581 blocks = []
583 blocks = []
582 for par, value in self._params:
584 for par, value in self._params:
583 par = urlreq.quote(par)
585 par = urlreq.quote(par)
584 if value is not None:
586 if value is not None:
585 value = urlreq.quote(value)
587 value = urlreq.quote(value)
586 par = '%s=%s' % (par, value)
588 par = '%s=%s' % (par, value)
587 blocks.append(par)
589 blocks.append(par)
588 return ' '.join(blocks)
590 return ' '.join(blocks)
589
591
590 def _getcorechunk(self):
592 def _getcorechunk(self):
591 """yield chunk for the core part of the bundle
593 """yield chunk for the core part of the bundle
592
594
593 (all but headers and parameters)"""
595 (all but headers and parameters)"""
594 outdebug(self.ui, 'start of parts')
596 outdebug(self.ui, 'start of parts')
595 for part in self._parts:
597 for part in self._parts:
596 outdebug(self.ui, 'bundle part: "%s"' % part.type)
598 outdebug(self.ui, 'bundle part: "%s"' % part.type)
597 for chunk in part.getchunks(ui=self.ui):
599 for chunk in part.getchunks(ui=self.ui):
598 yield chunk
600 yield chunk
599 outdebug(self.ui, 'end of bundle')
601 outdebug(self.ui, 'end of bundle')
600 yield _pack(_fpartheadersize, 0)
602 yield _pack(_fpartheadersize, 0)
601
603
602
604
603 def salvageoutput(self):
605 def salvageoutput(self):
604 """return a list with a copy of all output parts in the bundle
606 """return a list with a copy of all output parts in the bundle
605
607
606 This is meant to be used during error handling to make sure we preserve
608 This is meant to be used during error handling to make sure we preserve
607 server output"""
609 server output"""
608 salvaged = []
610 salvaged = []
609 for part in self._parts:
611 for part in self._parts:
610 if part.type.startswith('output'):
612 if part.type.startswith('output'):
611 salvaged.append(part.copy())
613 salvaged.append(part.copy())
612 return salvaged
614 return salvaged
613
615
614
616
615 class unpackermixin(object):
617 class unpackermixin(object):
616 """A mixin to extract bytes and struct data from a stream"""
618 """A mixin to extract bytes and struct data from a stream"""
617
619
618 def __init__(self, fp):
620 def __init__(self, fp):
619 self._fp = fp
621 self._fp = fp
620 self._seekable = (util.safehasattr(fp, 'seek') and
622 self._seekable = (util.safehasattr(fp, 'seek') and
621 util.safehasattr(fp, 'tell'))
623 util.safehasattr(fp, 'tell'))
622
624
623 def _unpack(self, format):
625 def _unpack(self, format):
624 """unpack this struct format from the stream"""
626 """unpack this struct format from the stream"""
625 data = self._readexact(struct.calcsize(format))
627 data = self._readexact(struct.calcsize(format))
626 return _unpack(format, data)
628 return _unpack(format, data)
627
629
628 def _readexact(self, size):
630 def _readexact(self, size):
629 """read exactly <size> bytes from the stream"""
631 """read exactly <size> bytes from the stream"""
630 return changegroup.readexactly(self._fp, size)
632 return changegroup.readexactly(self._fp, size)
631
633
632 def seek(self, offset, whence=0):
634 def seek(self, offset, whence=0):
633 """move the underlying file pointer"""
635 """move the underlying file pointer"""
634 if self._seekable:
636 if self._seekable:
635 return self._fp.seek(offset, whence)
637 return self._fp.seek(offset, whence)
636 else:
638 else:
637 raise NotImplementedError(_('File pointer is not seekable'))
639 raise NotImplementedError(_('File pointer is not seekable'))
638
640
639 def tell(self):
641 def tell(self):
640 """return the file offset, or None if file is not seekable"""
642 """return the file offset, or None if file is not seekable"""
641 if self._seekable:
643 if self._seekable:
642 try:
644 try:
643 return self._fp.tell()
645 return self._fp.tell()
644 except IOError as e:
646 except IOError as e:
645 if e.errno == errno.ESPIPE:
647 if e.errno == errno.ESPIPE:
646 self._seekable = False
648 self._seekable = False
647 else:
649 else:
648 raise
650 raise
649 return None
651 return None
650
652
651 def close(self):
653 def close(self):
652 """close underlying file"""
654 """close underlying file"""
653 if util.safehasattr(self._fp, 'close'):
655 if util.safehasattr(self._fp, 'close'):
654 return self._fp.close()
656 return self._fp.close()
655
657
656 def getunbundler(ui, fp, magicstring=None):
658 def getunbundler(ui, fp, magicstring=None):
657 """return a valid unbundler object for a given magicstring"""
659 """return a valid unbundler object for a given magicstring"""
658 if magicstring is None:
660 if magicstring is None:
659 magicstring = changegroup.readexactly(fp, 4)
661 magicstring = changegroup.readexactly(fp, 4)
660 magic, version = magicstring[0:2], magicstring[2:4]
662 magic, version = magicstring[0:2], magicstring[2:4]
661 if magic != 'HG':
663 if magic != 'HG':
662 raise error.Abort(_('not a Mercurial bundle'))
664 raise error.Abort(_('not a Mercurial bundle'))
663 unbundlerclass = formatmap.get(version)
665 unbundlerclass = formatmap.get(version)
664 if unbundlerclass is None:
666 if unbundlerclass is None:
665 raise error.Abort(_('unknown bundle version %s') % version)
667 raise error.Abort(_('unknown bundle version %s') % version)
666 unbundler = unbundlerclass(ui, fp)
668 unbundler = unbundlerclass(ui, fp)
667 indebug(ui, 'start processing of %s stream' % magicstring)
669 indebug(ui, 'start processing of %s stream' % magicstring)
668 return unbundler
670 return unbundler
669
671
670 class unbundle20(unpackermixin):
672 class unbundle20(unpackermixin):
671 """interpret a bundle2 stream
673 """interpret a bundle2 stream
672
674
673 This class is fed with a binary stream and yields parts through its
675 This class is fed with a binary stream and yields parts through its
674 `iterparts` methods."""
676 `iterparts` methods."""
675
677
676 _magicstring = 'HG20'
678 _magicstring = 'HG20'
677
679
678 def __init__(self, ui, fp):
680 def __init__(self, ui, fp):
679 """If header is specified, we do not read it out of the stream."""
681 """If header is specified, we do not read it out of the stream."""
680 self.ui = ui
682 self.ui = ui
681 self._decompressor = util.decompressors[None]
683 self._decompressor = util.decompressors[None]
682 self._compressed = None
684 self._compressed = None
683 super(unbundle20, self).__init__(fp)
685 super(unbundle20, self).__init__(fp)
684
686
685 @util.propertycache
687 @util.propertycache
686 def params(self):
688 def params(self):
687 """dictionary of stream level parameters"""
689 """dictionary of stream level parameters"""
688 indebug(self.ui, 'reading bundle2 stream parameters')
690 indebug(self.ui, 'reading bundle2 stream parameters')
689 params = {}
691 params = {}
690 paramssize = self._unpack(_fstreamparamsize)[0]
692 paramssize = self._unpack(_fstreamparamsize)[0]
691 if paramssize < 0:
693 if paramssize < 0:
692 raise error.BundleValueError('negative bundle param size: %i'
694 raise error.BundleValueError('negative bundle param size: %i'
693 % paramssize)
695 % paramssize)
694 if paramssize:
696 if paramssize:
695 params = self._readexact(paramssize)
697 params = self._readexact(paramssize)
696 params = self._processallparams(params)
698 params = self._processallparams(params)
697 return params
699 return params
698
700
699 def _processallparams(self, paramsblock):
701 def _processallparams(self, paramsblock):
700 """"""
702 """"""
701 params = util.sortdict()
703 params = util.sortdict()
702 for p in paramsblock.split(' '):
704 for p in paramsblock.split(' '):
703 p = p.split('=', 1)
705 p = p.split('=', 1)
704 p = [urlreq.unquote(i) for i in p]
706 p = [urlreq.unquote(i) for i in p]
705 if len(p) < 2:
707 if len(p) < 2:
706 p.append(None)
708 p.append(None)
707 self._processparam(*p)
709 self._processparam(*p)
708 params[p[0]] = p[1]
710 params[p[0]] = p[1]
709 return params
711 return params
710
712
711
713
712 def _processparam(self, name, value):
714 def _processparam(self, name, value):
713 """process a parameter, applying its effect if needed
715 """process a parameter, applying its effect if needed
714
716
715 Parameter starting with a lower case letter are advisory and will be
717 Parameter starting with a lower case letter are advisory and will be
716 ignored when unknown. Those starting with an upper case letter are
718 ignored when unknown. Those starting with an upper case letter are
717 mandatory and will this function will raise a KeyError when unknown.
719 mandatory and will this function will raise a KeyError when unknown.
718
720
719 Note: no option are currently supported. Any input will be either
721 Note: no option are currently supported. Any input will be either
720 ignored or failing.
722 ignored or failing.
721 """
723 """
722 if not name:
724 if not name:
723 raise ValueError('empty parameter name')
725 raise ValueError('empty parameter name')
724 if name[0] not in string.letters:
726 if name[0] not in string.letters:
725 raise ValueError('non letter first character: %r' % name)
727 raise ValueError('non letter first character: %r' % name)
726 try:
728 try:
727 handler = b2streamparamsmap[name.lower()]
729 handler = b2streamparamsmap[name.lower()]
728 except KeyError:
730 except KeyError:
729 if name[0].islower():
731 if name[0].islower():
730 indebug(self.ui, "ignoring unknown parameter %r" % name)
732 indebug(self.ui, "ignoring unknown parameter %r" % name)
731 else:
733 else:
732 raise error.BundleUnknownFeatureError(params=(name,))
734 raise error.BundleUnknownFeatureError(params=(name,))
733 else:
735 else:
734 handler(self, name, value)
736 handler(self, name, value)
735
737
736 def _forwardchunks(self):
738 def _forwardchunks(self):
737 """utility to transfer a bundle2 as binary
739 """utility to transfer a bundle2 as binary
738
740
739 This is made necessary by the fact the 'getbundle' command over 'ssh'
741 This is made necessary by the fact the 'getbundle' command over 'ssh'
740 have no way to know then the reply end, relying on the bundle to be
742 have no way to know then the reply end, relying on the bundle to be
741 interpreted to know its end. This is terrible and we are sorry, but we
743 interpreted to know its end. This is terrible and we are sorry, but we
742 needed to move forward to get general delta enabled.
744 needed to move forward to get general delta enabled.
743 """
745 """
744 yield self._magicstring
746 yield self._magicstring
745 assert 'params' not in vars(self)
747 assert 'params' not in vars(self)
746 paramssize = self._unpack(_fstreamparamsize)[0]
748 paramssize = self._unpack(_fstreamparamsize)[0]
747 if paramssize < 0:
749 if paramssize < 0:
748 raise error.BundleValueError('negative bundle param size: %i'
750 raise error.BundleValueError('negative bundle param size: %i'
749 % paramssize)
751 % paramssize)
750 yield _pack(_fstreamparamsize, paramssize)
752 yield _pack(_fstreamparamsize, paramssize)
751 if paramssize:
753 if paramssize:
752 params = self._readexact(paramssize)
754 params = self._readexact(paramssize)
753 self._processallparams(params)
755 self._processallparams(params)
754 yield params
756 yield params
755 assert self._decompressor is util.decompressors[None]
757 assert self._decompressor is util.decompressors[None]
756 # From there, payload might need to be decompressed
758 # From there, payload might need to be decompressed
757 self._fp = self._decompressor(self._fp)
759 self._fp = self._decompressor(self._fp)
758 emptycount = 0
760 emptycount = 0
759 while emptycount < 2:
761 while emptycount < 2:
760 # so we can brainlessly loop
762 # so we can brainlessly loop
761 assert _fpartheadersize == _fpayloadsize
763 assert _fpartheadersize == _fpayloadsize
762 size = self._unpack(_fpartheadersize)[0]
764 size = self._unpack(_fpartheadersize)[0]
763 yield _pack(_fpartheadersize, size)
765 yield _pack(_fpartheadersize, size)
764 if size:
766 if size:
765 emptycount = 0
767 emptycount = 0
766 else:
768 else:
767 emptycount += 1
769 emptycount += 1
768 continue
770 continue
769 if size == flaginterrupt:
771 if size == flaginterrupt:
770 continue
772 continue
771 elif size < 0:
773 elif size < 0:
772 raise error.BundleValueError('negative chunk size: %i')
774 raise error.BundleValueError('negative chunk size: %i')
773 yield self._readexact(size)
775 yield self._readexact(size)
774
776
775
777
776 def iterparts(self):
778 def iterparts(self):
777 """yield all parts contained in the stream"""
779 """yield all parts contained in the stream"""
778 # make sure param have been loaded
780 # make sure param have been loaded
779 self.params
781 self.params
780 # From there, payload need to be decompressed
782 # From there, payload need to be decompressed
781 self._fp = self._decompressor(self._fp)
783 self._fp = self._decompressor(self._fp)
782 indebug(self.ui, 'start extraction of bundle2 parts')
784 indebug(self.ui, 'start extraction of bundle2 parts')
783 headerblock = self._readpartheader()
785 headerblock = self._readpartheader()
784 while headerblock is not None:
786 while headerblock is not None:
785 part = unbundlepart(self.ui, headerblock, self._fp)
787 part = unbundlepart(self.ui, headerblock, self._fp)
786 yield part
788 yield part
787 part.seek(0, 2)
789 part.seek(0, 2)
788 headerblock = self._readpartheader()
790 headerblock = self._readpartheader()
789 indebug(self.ui, 'end of bundle2 stream')
791 indebug(self.ui, 'end of bundle2 stream')
790
792
791 def _readpartheader(self):
793 def _readpartheader(self):
792 """reads a part header size and return the bytes blob
794 """reads a part header size and return the bytes blob
793
795
794 returns None if empty"""
796 returns None if empty"""
795 headersize = self._unpack(_fpartheadersize)[0]
797 headersize = self._unpack(_fpartheadersize)[0]
796 if headersize < 0:
798 if headersize < 0:
797 raise error.BundleValueError('negative part header size: %i'
799 raise error.BundleValueError('negative part header size: %i'
798 % headersize)
800 % headersize)
799 indebug(self.ui, 'part header size: %i' % headersize)
801 indebug(self.ui, 'part header size: %i' % headersize)
800 if headersize:
802 if headersize:
801 return self._readexact(headersize)
803 return self._readexact(headersize)
802 return None
804 return None
803
805
804 def compressed(self):
806 def compressed(self):
805 self.params # load params
807 self.params # load params
806 return self._compressed
808 return self._compressed
807
809
808 formatmap = {'20': unbundle20}
810 formatmap = {'20': unbundle20}
809
811
810 b2streamparamsmap = {}
812 b2streamparamsmap = {}
811
813
812 def b2streamparamhandler(name):
814 def b2streamparamhandler(name):
813 """register a handler for a stream level parameter"""
815 """register a handler for a stream level parameter"""
814 def decorator(func):
816 def decorator(func):
815 assert name not in formatmap
817 assert name not in formatmap
816 b2streamparamsmap[name] = func
818 b2streamparamsmap[name] = func
817 return func
819 return func
818 return decorator
820 return decorator
819
821
820 @b2streamparamhandler('compression')
822 @b2streamparamhandler('compression')
821 def processcompression(unbundler, param, value):
823 def processcompression(unbundler, param, value):
822 """read compression parameter and install payload decompression"""
824 """read compression parameter and install payload decompression"""
823 if value not in util.decompressors:
825 if value not in util.decompressors:
824 raise error.BundleUnknownFeatureError(params=(param,),
826 raise error.BundleUnknownFeatureError(params=(param,),
825 values=(value,))
827 values=(value,))
826 unbundler._decompressor = util.decompressors[value]
828 unbundler._decompressor = util.decompressors[value]
827 if value is not None:
829 if value is not None:
828 unbundler._compressed = True
830 unbundler._compressed = True
829
831
830 class bundlepart(object):
832 class bundlepart(object):
831 """A bundle2 part contains application level payload
833 """A bundle2 part contains application level payload
832
834
833 The part `type` is used to route the part to the application level
835 The part `type` is used to route the part to the application level
834 handler.
836 handler.
835
837
836 The part payload is contained in ``part.data``. It could be raw bytes or a
838 The part payload is contained in ``part.data``. It could be raw bytes or a
837 generator of byte chunks.
839 generator of byte chunks.
838
840
839 You can add parameters to the part using the ``addparam`` method.
841 You can add parameters to the part using the ``addparam`` method.
840 Parameters can be either mandatory (default) or advisory. Remote side
842 Parameters can be either mandatory (default) or advisory. Remote side
841 should be able to safely ignore the advisory ones.
843 should be able to safely ignore the advisory ones.
842
844
843 Both data and parameters cannot be modified after the generation has begun.
845 Both data and parameters cannot be modified after the generation has begun.
844 """
846 """
845
847
846 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
848 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
847 data='', mandatory=True):
849 data='', mandatory=True):
848 validateparttype(parttype)
850 validateparttype(parttype)
849 self.id = None
851 self.id = None
850 self.type = parttype
852 self.type = parttype
851 self._data = data
853 self._data = data
852 self._mandatoryparams = list(mandatoryparams)
854 self._mandatoryparams = list(mandatoryparams)
853 self._advisoryparams = list(advisoryparams)
855 self._advisoryparams = list(advisoryparams)
854 # checking for duplicated entries
856 # checking for duplicated entries
855 self._seenparams = set()
857 self._seenparams = set()
856 for pname, __ in self._mandatoryparams + self._advisoryparams:
858 for pname, __ in self._mandatoryparams + self._advisoryparams:
857 if pname in self._seenparams:
859 if pname in self._seenparams:
858 raise RuntimeError('duplicated params: %s' % pname)
860 raise RuntimeError('duplicated params: %s' % pname)
859 self._seenparams.add(pname)
861 self._seenparams.add(pname)
860 # status of the part's generation:
862 # status of the part's generation:
861 # - None: not started,
863 # - None: not started,
862 # - False: currently generated,
864 # - False: currently generated,
863 # - True: generation done.
865 # - True: generation done.
864 self._generated = None
866 self._generated = None
865 self.mandatory = mandatory
867 self.mandatory = mandatory
866
868
867 def copy(self):
869 def copy(self):
868 """return a copy of the part
870 """return a copy of the part
869
871
870 The new part have the very same content but no partid assigned yet.
872 The new part have the very same content but no partid assigned yet.
871 Parts with generated data cannot be copied."""
873 Parts with generated data cannot be copied."""
872 assert not util.safehasattr(self.data, 'next')
874 assert not util.safehasattr(self.data, 'next')
873 return self.__class__(self.type, self._mandatoryparams,
875 return self.__class__(self.type, self._mandatoryparams,
874 self._advisoryparams, self._data, self.mandatory)
876 self._advisoryparams, self._data, self.mandatory)
875
877
876 # methods used to defines the part content
878 # methods used to defines the part content
877 @property
879 @property
878 def data(self):
880 def data(self):
879 return self._data
881 return self._data
880
882
881 @data.setter
883 @data.setter
882 def data(self, data):
884 def data(self, data):
883 if self._generated is not None:
885 if self._generated is not None:
884 raise error.ReadOnlyPartError('part is being generated')
886 raise error.ReadOnlyPartError('part is being generated')
885 self._data = data
887 self._data = data
886
888
887 @property
889 @property
888 def mandatoryparams(self):
890 def mandatoryparams(self):
889 # make it an immutable tuple to force people through ``addparam``
891 # make it an immutable tuple to force people through ``addparam``
890 return tuple(self._mandatoryparams)
892 return tuple(self._mandatoryparams)
891
893
892 @property
894 @property
893 def advisoryparams(self):
895 def advisoryparams(self):
894 # make it an immutable tuple to force people through ``addparam``
896 # make it an immutable tuple to force people through ``addparam``
895 return tuple(self._advisoryparams)
897 return tuple(self._advisoryparams)
896
898
897 def addparam(self, name, value='', mandatory=True):
899 def addparam(self, name, value='', mandatory=True):
898 if self._generated is not None:
900 if self._generated is not None:
899 raise error.ReadOnlyPartError('part is being generated')
901 raise error.ReadOnlyPartError('part is being generated')
900 if name in self._seenparams:
902 if name in self._seenparams:
901 raise ValueError('duplicated params: %s' % name)
903 raise ValueError('duplicated params: %s' % name)
902 self._seenparams.add(name)
904 self._seenparams.add(name)
903 params = self._advisoryparams
905 params = self._advisoryparams
904 if mandatory:
906 if mandatory:
905 params = self._mandatoryparams
907 params = self._mandatoryparams
906 params.append((name, value))
908 params.append((name, value))
907
909
908 # methods used to generates the bundle2 stream
910 # methods used to generates the bundle2 stream
909 def getchunks(self, ui):
911 def getchunks(self, ui):
910 if self._generated is not None:
912 if self._generated is not None:
911 raise RuntimeError('part can only be consumed once')
913 raise RuntimeError('part can only be consumed once')
912 self._generated = False
914 self._generated = False
913
915
914 if ui.debugflag:
916 if ui.debugflag:
915 msg = ['bundle2-output-part: "%s"' % self.type]
917 msg = ['bundle2-output-part: "%s"' % self.type]
916 if not self.mandatory:
918 if not self.mandatory:
917 msg.append(' (advisory)')
919 msg.append(' (advisory)')
918 nbmp = len(self.mandatoryparams)
920 nbmp = len(self.mandatoryparams)
919 nbap = len(self.advisoryparams)
921 nbap = len(self.advisoryparams)
920 if nbmp or nbap:
922 if nbmp or nbap:
921 msg.append(' (params:')
923 msg.append(' (params:')
922 if nbmp:
924 if nbmp:
923 msg.append(' %i mandatory' % nbmp)
925 msg.append(' %i mandatory' % nbmp)
924 if nbap:
926 if nbap:
925 msg.append(' %i advisory' % nbmp)
927 msg.append(' %i advisory' % nbmp)
926 msg.append(')')
928 msg.append(')')
927 if not self.data:
929 if not self.data:
928 msg.append(' empty payload')
930 msg.append(' empty payload')
929 elif util.safehasattr(self.data, 'next'):
931 elif util.safehasattr(self.data, 'next'):
930 msg.append(' streamed payload')
932 msg.append(' streamed payload')
931 else:
933 else:
932 msg.append(' %i bytes payload' % len(self.data))
934 msg.append(' %i bytes payload' % len(self.data))
933 msg.append('\n')
935 msg.append('\n')
934 ui.debug(''.join(msg))
936 ui.debug(''.join(msg))
935
937
936 #### header
938 #### header
937 if self.mandatory:
939 if self.mandatory:
938 parttype = self.type.upper()
940 parttype = self.type.upper()
939 else:
941 else:
940 parttype = self.type.lower()
942 parttype = self.type.lower()
941 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
943 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
942 ## parttype
944 ## parttype
943 header = [_pack(_fparttypesize, len(parttype)),
945 header = [_pack(_fparttypesize, len(parttype)),
944 parttype, _pack(_fpartid, self.id),
946 parttype, _pack(_fpartid, self.id),
945 ]
947 ]
946 ## parameters
948 ## parameters
947 # count
949 # count
948 manpar = self.mandatoryparams
950 manpar = self.mandatoryparams
949 advpar = self.advisoryparams
951 advpar = self.advisoryparams
950 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
952 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
951 # size
953 # size
952 parsizes = []
954 parsizes = []
953 for key, value in manpar:
955 for key, value in manpar:
954 parsizes.append(len(key))
956 parsizes.append(len(key))
955 parsizes.append(len(value))
957 parsizes.append(len(value))
956 for key, value in advpar:
958 for key, value in advpar:
957 parsizes.append(len(key))
959 parsizes.append(len(key))
958 parsizes.append(len(value))
960 parsizes.append(len(value))
959 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
961 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
960 header.append(paramsizes)
962 header.append(paramsizes)
961 # key, value
963 # key, value
962 for key, value in manpar:
964 for key, value in manpar:
963 header.append(key)
965 header.append(key)
964 header.append(value)
966 header.append(value)
965 for key, value in advpar:
967 for key, value in advpar:
966 header.append(key)
968 header.append(key)
967 header.append(value)
969 header.append(value)
968 ## finalize header
970 ## finalize header
969 headerchunk = ''.join(header)
971 headerchunk = ''.join(header)
970 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
972 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
971 yield _pack(_fpartheadersize, len(headerchunk))
973 yield _pack(_fpartheadersize, len(headerchunk))
972 yield headerchunk
974 yield headerchunk
973 ## payload
975 ## payload
974 try:
976 try:
975 for chunk in self._payloadchunks():
977 for chunk in self._payloadchunks():
976 outdebug(ui, 'payload chunk size: %i' % len(chunk))
978 outdebug(ui, 'payload chunk size: %i' % len(chunk))
977 yield _pack(_fpayloadsize, len(chunk))
979 yield _pack(_fpayloadsize, len(chunk))
978 yield chunk
980 yield chunk
979 except GeneratorExit:
981 except GeneratorExit:
980 # GeneratorExit means that nobody is listening for our
982 # GeneratorExit means that nobody is listening for our
981 # results anyway, so just bail quickly rather than trying
983 # results anyway, so just bail quickly rather than trying
982 # to produce an error part.
984 # to produce an error part.
983 ui.debug('bundle2-generatorexit\n')
985 ui.debug('bundle2-generatorexit\n')
984 raise
986 raise
985 except BaseException as exc:
987 except BaseException as exc:
986 # backup exception data for later
988 # backup exception data for later
987 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
989 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
988 % exc)
990 % exc)
989 exc_info = sys.exc_info()
991 exc_info = sys.exc_info()
990 msg = 'unexpected error: %s' % exc
992 msg = 'unexpected error: %s' % exc
991 interpart = bundlepart('error:abort', [('message', msg)],
993 interpart = bundlepart('error:abort', [('message', msg)],
992 mandatory=False)
994 mandatory=False)
993 interpart.id = 0
995 interpart.id = 0
994 yield _pack(_fpayloadsize, -1)
996 yield _pack(_fpayloadsize, -1)
995 for chunk in interpart.getchunks(ui=ui):
997 for chunk in interpart.getchunks(ui=ui):
996 yield chunk
998 yield chunk
997 outdebug(ui, 'closing payload chunk')
999 outdebug(ui, 'closing payload chunk')
998 # abort current part payload
1000 # abort current part payload
999 yield _pack(_fpayloadsize, 0)
1001 yield _pack(_fpayloadsize, 0)
1000 if pycompat.ispy3:
1002 if pycompat.ispy3:
1001 raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
1003 raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
1002 else:
1004 else:
1003 exec("""raise exc_info[0], exc_info[1], exc_info[2]""")
1005 exec("""raise exc_info[0], exc_info[1], exc_info[2]""")
1004 # end of payload
1006 # end of payload
1005 outdebug(ui, 'closing payload chunk')
1007 outdebug(ui, 'closing payload chunk')
1006 yield _pack(_fpayloadsize, 0)
1008 yield _pack(_fpayloadsize, 0)
1007 self._generated = True
1009 self._generated = True
1008
1010
1009 def _payloadchunks(self):
1011 def _payloadchunks(self):
1010 """yield chunks of a the part payload
1012 """yield chunks of a the part payload
1011
1013
1012 Exists to handle the different methods to provide data to a part."""
1014 Exists to handle the different methods to provide data to a part."""
1013 # we only support fixed size data now.
1015 # we only support fixed size data now.
1014 # This will be improved in the future.
1016 # This will be improved in the future.
1015 if util.safehasattr(self.data, 'next'):
1017 if util.safehasattr(self.data, 'next'):
1016 buff = util.chunkbuffer(self.data)
1018 buff = util.chunkbuffer(self.data)
1017 chunk = buff.read(preferedchunksize)
1019 chunk = buff.read(preferedchunksize)
1018 while chunk:
1020 while chunk:
1019 yield chunk
1021 yield chunk
1020 chunk = buff.read(preferedchunksize)
1022 chunk = buff.read(preferedchunksize)
1021 elif len(self.data):
1023 elif len(self.data):
1022 yield self.data
1024 yield self.data
1023
1025
1024
1026
1025 flaginterrupt = -1
1027 flaginterrupt = -1
1026
1028
1027 class interrupthandler(unpackermixin):
1029 class interrupthandler(unpackermixin):
1028 """read one part and process it with restricted capability
1030 """read one part and process it with restricted capability
1029
1031
1030 This allows to transmit exception raised on the producer size during part
1032 This allows to transmit exception raised on the producer size during part
1031 iteration while the consumer is reading a part.
1033 iteration while the consumer is reading a part.
1032
1034
1033 Part processed in this manner only have access to a ui object,"""
1035 Part processed in this manner only have access to a ui object,"""
1034
1036
1035 def __init__(self, ui, fp):
1037 def __init__(self, ui, fp):
1036 super(interrupthandler, self).__init__(fp)
1038 super(interrupthandler, self).__init__(fp)
1037 self.ui = ui
1039 self.ui = ui
1038
1040
1039 def _readpartheader(self):
1041 def _readpartheader(self):
1040 """reads a part header size and return the bytes blob
1042 """reads a part header size and return the bytes blob
1041
1043
1042 returns None if empty"""
1044 returns None if empty"""
1043 headersize = self._unpack(_fpartheadersize)[0]
1045 headersize = self._unpack(_fpartheadersize)[0]
1044 if headersize < 0:
1046 if headersize < 0:
1045 raise error.BundleValueError('negative part header size: %i'
1047 raise error.BundleValueError('negative part header size: %i'
1046 % headersize)
1048 % headersize)
1047 indebug(self.ui, 'part header size: %i\n' % headersize)
1049 indebug(self.ui, 'part header size: %i\n' % headersize)
1048 if headersize:
1050 if headersize:
1049 return self._readexact(headersize)
1051 return self._readexact(headersize)
1050 return None
1052 return None
1051
1053
1052 def __call__(self):
1054 def __call__(self):
1053
1055
1054 self.ui.debug('bundle2-input-stream-interrupt:'
1056 self.ui.debug('bundle2-input-stream-interrupt:'
1055 ' opening out of band context\n')
1057 ' opening out of band context\n')
1056 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1058 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1057 headerblock = self._readpartheader()
1059 headerblock = self._readpartheader()
1058 if headerblock is None:
1060 if headerblock is None:
1059 indebug(self.ui, 'no part found during interruption.')
1061 indebug(self.ui, 'no part found during interruption.')
1060 return
1062 return
1061 part = unbundlepart(self.ui, headerblock, self._fp)
1063 part = unbundlepart(self.ui, headerblock, self._fp)
1062 op = interruptoperation(self.ui)
1064 op = interruptoperation(self.ui)
1063 _processpart(op, part)
1065 _processpart(op, part)
1064 self.ui.debug('bundle2-input-stream-interrupt:'
1066 self.ui.debug('bundle2-input-stream-interrupt:'
1065 ' closing out of band context\n')
1067 ' closing out of band context\n')
1066
1068
1067 class interruptoperation(object):
1069 class interruptoperation(object):
1068 """A limited operation to be use by part handler during interruption
1070 """A limited operation to be use by part handler during interruption
1069
1071
1070 It only have access to an ui object.
1072 It only have access to an ui object.
1071 """
1073 """
1072
1074
1073 def __init__(self, ui):
1075 def __init__(self, ui):
1074 self.ui = ui
1076 self.ui = ui
1075 self.reply = None
1077 self.reply = None
1076 self.captureoutput = False
1078 self.captureoutput = False
1077
1079
1078 @property
1080 @property
1079 def repo(self):
1081 def repo(self):
1080 raise RuntimeError('no repo access from stream interruption')
1082 raise RuntimeError('no repo access from stream interruption')
1081
1083
1082 def gettransaction(self):
1084 def gettransaction(self):
1083 raise TransactionUnavailable('no repo access from stream interruption')
1085 raise TransactionUnavailable('no repo access from stream interruption')
1084
1086
1085 class unbundlepart(unpackermixin):
1087 class unbundlepart(unpackermixin):
1086 """a bundle part read from a bundle"""
1088 """a bundle part read from a bundle"""
1087
1089
1088 def __init__(self, ui, header, fp):
1090 def __init__(self, ui, header, fp):
1089 super(unbundlepart, self).__init__(fp)
1091 super(unbundlepart, self).__init__(fp)
1090 self.ui = ui
1092 self.ui = ui
1091 # unbundle state attr
1093 # unbundle state attr
1092 self._headerdata = header
1094 self._headerdata = header
1093 self._headeroffset = 0
1095 self._headeroffset = 0
1094 self._initialized = False
1096 self._initialized = False
1095 self.consumed = False
1097 self.consumed = False
1096 # part data
1098 # part data
1097 self.id = None
1099 self.id = None
1098 self.type = None
1100 self.type = None
1099 self.mandatoryparams = None
1101 self.mandatoryparams = None
1100 self.advisoryparams = None
1102 self.advisoryparams = None
1101 self.params = None
1103 self.params = None
1102 self.mandatorykeys = ()
1104 self.mandatorykeys = ()
1103 self._payloadstream = None
1105 self._payloadstream = None
1104 self._readheader()
1106 self._readheader()
1105 self._mandatory = None
1107 self._mandatory = None
1106 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1108 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1107 self._pos = 0
1109 self._pos = 0
1108
1110
1109 def _fromheader(self, size):
1111 def _fromheader(self, size):
1110 """return the next <size> byte from the header"""
1112 """return the next <size> byte from the header"""
1111 offset = self._headeroffset
1113 offset = self._headeroffset
1112 data = self._headerdata[offset:(offset + size)]
1114 data = self._headerdata[offset:(offset + size)]
1113 self._headeroffset = offset + size
1115 self._headeroffset = offset + size
1114 return data
1116 return data
1115
1117
1116 def _unpackheader(self, format):
1118 def _unpackheader(self, format):
1117 """read given format from header
1119 """read given format from header
1118
1120
1119 This automatically compute the size of the format to read."""
1121 This automatically compute the size of the format to read."""
1120 data = self._fromheader(struct.calcsize(format))
1122 data = self._fromheader(struct.calcsize(format))
1121 return _unpack(format, data)
1123 return _unpack(format, data)
1122
1124
1123 def _initparams(self, mandatoryparams, advisoryparams):
1125 def _initparams(self, mandatoryparams, advisoryparams):
1124 """internal function to setup all logic related parameters"""
1126 """internal function to setup all logic related parameters"""
1125 # make it read only to prevent people touching it by mistake.
1127 # make it read only to prevent people touching it by mistake.
1126 self.mandatoryparams = tuple(mandatoryparams)
1128 self.mandatoryparams = tuple(mandatoryparams)
1127 self.advisoryparams = tuple(advisoryparams)
1129 self.advisoryparams = tuple(advisoryparams)
1128 # user friendly UI
1130 # user friendly UI
1129 self.params = util.sortdict(self.mandatoryparams)
1131 self.params = util.sortdict(self.mandatoryparams)
1130 self.params.update(self.advisoryparams)
1132 self.params.update(self.advisoryparams)
1131 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1133 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1132
1134
1133 def _payloadchunks(self, chunknum=0):
1135 def _payloadchunks(self, chunknum=0):
1134 '''seek to specified chunk and start yielding data'''
1136 '''seek to specified chunk and start yielding data'''
1135 if len(self._chunkindex) == 0:
1137 if len(self._chunkindex) == 0:
1136 assert chunknum == 0, 'Must start with chunk 0'
1138 assert chunknum == 0, 'Must start with chunk 0'
1137 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1139 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1138 else:
1140 else:
1139 assert chunknum < len(self._chunkindex), \
1141 assert chunknum < len(self._chunkindex), \
1140 'Unknown chunk %d' % chunknum
1142 'Unknown chunk %d' % chunknum
1141 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1143 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1142
1144
1143 pos = self._chunkindex[chunknum][0]
1145 pos = self._chunkindex[chunknum][0]
1144 payloadsize = self._unpack(_fpayloadsize)[0]
1146 payloadsize = self._unpack(_fpayloadsize)[0]
1145 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1147 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1146 while payloadsize:
1148 while payloadsize:
1147 if payloadsize == flaginterrupt:
1149 if payloadsize == flaginterrupt:
1148 # interruption detection, the handler will now read a
1150 # interruption detection, the handler will now read a
1149 # single part and process it.
1151 # single part and process it.
1150 interrupthandler(self.ui, self._fp)()
1152 interrupthandler(self.ui, self._fp)()
1151 elif payloadsize < 0:
1153 elif payloadsize < 0:
1152 msg = 'negative payload chunk size: %i' % payloadsize
1154 msg = 'negative payload chunk size: %i' % payloadsize
1153 raise error.BundleValueError(msg)
1155 raise error.BundleValueError(msg)
1154 else:
1156 else:
1155 result = self._readexact(payloadsize)
1157 result = self._readexact(payloadsize)
1156 chunknum += 1
1158 chunknum += 1
1157 pos += payloadsize
1159 pos += payloadsize
1158 if chunknum == len(self._chunkindex):
1160 if chunknum == len(self._chunkindex):
1159 self._chunkindex.append((pos,
1161 self._chunkindex.append((pos,
1160 super(unbundlepart, self).tell()))
1162 super(unbundlepart, self).tell()))
1161 yield result
1163 yield result
1162 payloadsize = self._unpack(_fpayloadsize)[0]
1164 payloadsize = self._unpack(_fpayloadsize)[0]
1163 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1165 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1164
1166
1165 def _findchunk(self, pos):
1167 def _findchunk(self, pos):
1166 '''for a given payload position, return a chunk number and offset'''
1168 '''for a given payload position, return a chunk number and offset'''
1167 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1169 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1168 if ppos == pos:
1170 if ppos == pos:
1169 return chunk, 0
1171 return chunk, 0
1170 elif ppos > pos:
1172 elif ppos > pos:
1171 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1173 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1172 raise ValueError('Unknown chunk')
1174 raise ValueError('Unknown chunk')
1173
1175
1174 def _readheader(self):
1176 def _readheader(self):
1175 """read the header and setup the object"""
1177 """read the header and setup the object"""
1176 typesize = self._unpackheader(_fparttypesize)[0]
1178 typesize = self._unpackheader(_fparttypesize)[0]
1177 self.type = self._fromheader(typesize)
1179 self.type = self._fromheader(typesize)
1178 indebug(self.ui, 'part type: "%s"' % self.type)
1180 indebug(self.ui, 'part type: "%s"' % self.type)
1179 self.id = self._unpackheader(_fpartid)[0]
1181 self.id = self._unpackheader(_fpartid)[0]
1180 indebug(self.ui, 'part id: "%s"' % self.id)
1182 indebug(self.ui, 'part id: "%s"' % self.id)
1181 # extract mandatory bit from type
1183 # extract mandatory bit from type
1182 self.mandatory = (self.type != self.type.lower())
1184 self.mandatory = (self.type != self.type.lower())
1183 self.type = self.type.lower()
1185 self.type = self.type.lower()
1184 ## reading parameters
1186 ## reading parameters
1185 # param count
1187 # param count
1186 mancount, advcount = self._unpackheader(_fpartparamcount)
1188 mancount, advcount = self._unpackheader(_fpartparamcount)
1187 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1189 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1188 # param size
1190 # param size
1189 fparamsizes = _makefpartparamsizes(mancount + advcount)
1191 fparamsizes = _makefpartparamsizes(mancount + advcount)
1190 paramsizes = self._unpackheader(fparamsizes)
1192 paramsizes = self._unpackheader(fparamsizes)
1191 # make it a list of couple again
1193 # make it a list of couple again
1192 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1194 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1193 # split mandatory from advisory
1195 # split mandatory from advisory
1194 mansizes = paramsizes[:mancount]
1196 mansizes = paramsizes[:mancount]
1195 advsizes = paramsizes[mancount:]
1197 advsizes = paramsizes[mancount:]
1196 # retrieve param value
1198 # retrieve param value
1197 manparams = []
1199 manparams = []
1198 for key, value in mansizes:
1200 for key, value in mansizes:
1199 manparams.append((self._fromheader(key), self._fromheader(value)))
1201 manparams.append((self._fromheader(key), self._fromheader(value)))
1200 advparams = []
1202 advparams = []
1201 for key, value in advsizes:
1203 for key, value in advsizes:
1202 advparams.append((self._fromheader(key), self._fromheader(value)))
1204 advparams.append((self._fromheader(key), self._fromheader(value)))
1203 self._initparams(manparams, advparams)
1205 self._initparams(manparams, advparams)
1204 ## part payload
1206 ## part payload
1205 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1207 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1206 # we read the data, tell it
1208 # we read the data, tell it
1207 self._initialized = True
1209 self._initialized = True
1208
1210
1209 def read(self, size=None):
1211 def read(self, size=None):
1210 """read payload data"""
1212 """read payload data"""
1211 if not self._initialized:
1213 if not self._initialized:
1212 self._readheader()
1214 self._readheader()
1213 if size is None:
1215 if size is None:
1214 data = self._payloadstream.read()
1216 data = self._payloadstream.read()
1215 else:
1217 else:
1216 data = self._payloadstream.read(size)
1218 data = self._payloadstream.read(size)
1217 self._pos += len(data)
1219 self._pos += len(data)
1218 if size is None or len(data) < size:
1220 if size is None or len(data) < size:
1219 if not self.consumed and self._pos:
1221 if not self.consumed and self._pos:
1220 self.ui.debug('bundle2-input-part: total payload size %i\n'
1222 self.ui.debug('bundle2-input-part: total payload size %i\n'
1221 % self._pos)
1223 % self._pos)
1222 self.consumed = True
1224 self.consumed = True
1223 return data
1225 return data
1224
1226
1225 def tell(self):
1227 def tell(self):
1226 return self._pos
1228 return self._pos
1227
1229
1228 def seek(self, offset, whence=0):
1230 def seek(self, offset, whence=0):
1229 if whence == 0:
1231 if whence == 0:
1230 newpos = offset
1232 newpos = offset
1231 elif whence == 1:
1233 elif whence == 1:
1232 newpos = self._pos + offset
1234 newpos = self._pos + offset
1233 elif whence == 2:
1235 elif whence == 2:
1234 if not self.consumed:
1236 if not self.consumed:
1235 self.read()
1237 self.read()
1236 newpos = self._chunkindex[-1][0] - offset
1238 newpos = self._chunkindex[-1][0] - offset
1237 else:
1239 else:
1238 raise ValueError('Unknown whence value: %r' % (whence,))
1240 raise ValueError('Unknown whence value: %r' % (whence,))
1239
1241
1240 if newpos > self._chunkindex[-1][0] and not self.consumed:
1242 if newpos > self._chunkindex[-1][0] and not self.consumed:
1241 self.read()
1243 self.read()
1242 if not 0 <= newpos <= self._chunkindex[-1][0]:
1244 if not 0 <= newpos <= self._chunkindex[-1][0]:
1243 raise ValueError('Offset out of range')
1245 raise ValueError('Offset out of range')
1244
1246
1245 if self._pos != newpos:
1247 if self._pos != newpos:
1246 chunk, internaloffset = self._findchunk(newpos)
1248 chunk, internaloffset = self._findchunk(newpos)
1247 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1249 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1248 adjust = self.read(internaloffset)
1250 adjust = self.read(internaloffset)
1249 if len(adjust) != internaloffset:
1251 if len(adjust) != internaloffset:
1250 raise error.Abort(_('Seek failed\n'))
1252 raise error.Abort(_('Seek failed\n'))
1251 self._pos = newpos
1253 self._pos = newpos
1252
1254
1253 # These are only the static capabilities.
1255 # These are only the static capabilities.
1254 # Check the 'getrepocaps' function for the rest.
1256 # Check the 'getrepocaps' function for the rest.
1255 capabilities = {'HG20': (),
1257 capabilities = {'HG20': (),
1256 'error': ('abort', 'unsupportedcontent', 'pushraced',
1258 'error': ('abort', 'unsupportedcontent', 'pushraced',
1257 'pushkey'),
1259 'pushkey'),
1258 'listkeys': (),
1260 'listkeys': (),
1259 'pushkey': (),
1261 'pushkey': (),
1260 'digests': tuple(sorted(util.DIGESTS.keys())),
1262 'digests': tuple(sorted(util.DIGESTS.keys())),
1261 'remote-changegroup': ('http', 'https'),
1263 'remote-changegroup': ('http', 'https'),
1262 'hgtagsfnodes': (),
1264 'hgtagsfnodes': (),
1263 }
1265 }
1264
1266
1265 def getrepocaps(repo, allowpushback=False):
1267 def getrepocaps(repo, allowpushback=False):
1266 """return the bundle2 capabilities for a given repo
1268 """return the bundle2 capabilities for a given repo
1267
1269
1268 Exists to allow extensions (like evolution) to mutate the capabilities.
1270 Exists to allow extensions (like evolution) to mutate the capabilities.
1269 """
1271 """
1270 caps = capabilities.copy()
1272 caps = capabilities.copy()
1271 caps['changegroup'] = tuple(sorted(
1273 caps['changegroup'] = tuple(sorted(
1272 changegroup.supportedincomingversions(repo)))
1274 changegroup.supportedincomingversions(repo)))
1273 if obsolete.isenabled(repo, obsolete.exchangeopt):
1275 if obsolete.isenabled(repo, obsolete.exchangeopt):
1274 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1276 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1275 caps['obsmarkers'] = supportedformat
1277 caps['obsmarkers'] = supportedformat
1276 if allowpushback:
1278 if allowpushback:
1277 caps['pushback'] = ()
1279 caps['pushback'] = ()
1278 return caps
1280 return caps
1279
1281
1280 def bundle2caps(remote):
1282 def bundle2caps(remote):
1281 """return the bundle capabilities of a peer as dict"""
1283 """return the bundle capabilities of a peer as dict"""
1282 raw = remote.capable('bundle2')
1284 raw = remote.capable('bundle2')
1283 if not raw and raw != '':
1285 if not raw and raw != '':
1284 return {}
1286 return {}
1285 capsblob = urlreq.unquote(remote.capable('bundle2'))
1287 capsblob = urlreq.unquote(remote.capable('bundle2'))
1286 return decodecaps(capsblob)
1288 return decodecaps(capsblob)
1287
1289
1288 def obsmarkersversion(caps):
1290 def obsmarkersversion(caps):
1289 """extract the list of supported obsmarkers versions from a bundle2caps dict
1291 """extract the list of supported obsmarkers versions from a bundle2caps dict
1290 """
1292 """
1291 obscaps = caps.get('obsmarkers', ())
1293 obscaps = caps.get('obsmarkers', ())
1292 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1294 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1293
1295
1294 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
1296 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
1295 """Write a bundle file and return its filename.
1297 """Write a bundle file and return its filename.
1296
1298
1297 Existing files will not be overwritten.
1299 Existing files will not be overwritten.
1298 If no filename is specified, a temporary file is created.
1300 If no filename is specified, a temporary file is created.
1299 bz2 compression can be turned off.
1301 bz2 compression can be turned off.
1300 The bundle file will be deleted in case of errors.
1302 The bundle file will be deleted in case of errors.
1301 """
1303 """
1302
1304
1303 if bundletype == "HG20":
1305 if bundletype == "HG20":
1304 bundle = bundle20(ui)
1306 bundle = bundle20(ui)
1305 bundle.setcompression(compression)
1307 bundle.setcompression(compression)
1306 part = bundle.newpart('changegroup', data=cg.getchunks())
1308 part = bundle.newpart('changegroup', data=cg.getchunks())
1307 part.addparam('version', cg.version)
1309 part.addparam('version', cg.version)
1308 if 'clcount' in cg.extras:
1310 if 'clcount' in cg.extras:
1309 part.addparam('nbchanges', str(cg.extras['clcount']),
1311 part.addparam('nbchanges', str(cg.extras['clcount']),
1310 mandatory=False)
1312 mandatory=False)
1311 chunkiter = bundle.getchunks()
1313 chunkiter = bundle.getchunks()
1312 else:
1314 else:
1313 # compression argument is only for the bundle2 case
1315 # compression argument is only for the bundle2 case
1314 assert compression is None
1316 assert compression is None
1315 if cg.version != '01':
1317 if cg.version != '01':
1316 raise error.Abort(_('old bundle types only supports v1 '
1318 raise error.Abort(_('old bundle types only supports v1 '
1317 'changegroups'))
1319 'changegroups'))
1318 header, comp = bundletypes[bundletype]
1320 header, comp = bundletypes[bundletype]
1319 if comp not in util.compressors:
1321 if comp not in util.compressors:
1320 raise error.Abort(_('unknown stream compression type: %s')
1322 raise error.Abort(_('unknown stream compression type: %s')
1321 % comp)
1323 % comp)
1322 z = util.compressors[comp]()
1324 z = util.compressors[comp]()
1323 subchunkiter = cg.getchunks()
1325 subchunkiter = cg.getchunks()
1324 def chunkiter():
1326 def chunkiter():
1325 yield header
1327 yield header
1326 for chunk in subchunkiter:
1328 for chunk in subchunkiter:
1327 yield z.compress(chunk)
1329 data = z.compress(chunk)
1330 if data:
1331 yield data
1328 yield z.flush()
1332 yield z.flush()
1329 chunkiter = chunkiter()
1333 chunkiter = chunkiter()
1330
1334
1331 # parse the changegroup data, otherwise we will block
1335 # parse the changegroup data, otherwise we will block
1332 # in case of sshrepo because we don't know the end of the stream
1336 # in case of sshrepo because we don't know the end of the stream
1333 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1337 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1334
1338
1335 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1339 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1336 def handlechangegroup(op, inpart):
1340 def handlechangegroup(op, inpart):
1337 """apply a changegroup part on the repo
1341 """apply a changegroup part on the repo
1338
1342
1339 This is a very early implementation that will massive rework before being
1343 This is a very early implementation that will massive rework before being
1340 inflicted to any end-user.
1344 inflicted to any end-user.
1341 """
1345 """
1342 # Make sure we trigger a transaction creation
1346 # Make sure we trigger a transaction creation
1343 #
1347 #
1344 # The addchangegroup function will get a transaction object by itself, but
1348 # The addchangegroup function will get a transaction object by itself, but
1345 # we need to make sure we trigger the creation of a transaction object used
1349 # we need to make sure we trigger the creation of a transaction object used
1346 # for the whole processing scope.
1350 # for the whole processing scope.
1347 op.gettransaction()
1351 op.gettransaction()
1348 unpackerversion = inpart.params.get('version', '01')
1352 unpackerversion = inpart.params.get('version', '01')
1349 # We should raise an appropriate exception here
1353 # We should raise an appropriate exception here
1350 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1354 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1351 # the source and url passed here are overwritten by the one contained in
1355 # the source and url passed here are overwritten by the one contained in
1352 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1356 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1353 nbchangesets = None
1357 nbchangesets = None
1354 if 'nbchanges' in inpart.params:
1358 if 'nbchanges' in inpart.params:
1355 nbchangesets = int(inpart.params.get('nbchanges'))
1359 nbchangesets = int(inpart.params.get('nbchanges'))
1356 if ('treemanifest' in inpart.params and
1360 if ('treemanifest' in inpart.params and
1357 'treemanifest' not in op.repo.requirements):
1361 'treemanifest' not in op.repo.requirements):
1358 if len(op.repo.changelog) != 0:
1362 if len(op.repo.changelog) != 0:
1359 raise error.Abort(_(
1363 raise error.Abort(_(
1360 "bundle contains tree manifests, but local repo is "
1364 "bundle contains tree manifests, but local repo is "
1361 "non-empty and does not use tree manifests"))
1365 "non-empty and does not use tree manifests"))
1362 op.repo.requirements.add('treemanifest')
1366 op.repo.requirements.add('treemanifest')
1363 op.repo._applyopenerreqs()
1367 op.repo._applyopenerreqs()
1364 op.repo._writerequirements()
1368 op.repo._writerequirements()
1365 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1369 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1366 op.records.add('changegroup', {'return': ret})
1370 op.records.add('changegroup', {'return': ret})
1367 if op.reply is not None:
1371 if op.reply is not None:
1368 # This is definitely not the final form of this
1372 # This is definitely not the final form of this
1369 # return. But one need to start somewhere.
1373 # return. But one need to start somewhere.
1370 part = op.reply.newpart('reply:changegroup', mandatory=False)
1374 part = op.reply.newpart('reply:changegroup', mandatory=False)
1371 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1375 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1372 part.addparam('return', '%i' % ret, mandatory=False)
1376 part.addparam('return', '%i' % ret, mandatory=False)
1373 assert not inpart.read()
1377 assert not inpart.read()
1374
1378
1375 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1379 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1376 ['digest:%s' % k for k in util.DIGESTS.keys()])
1380 ['digest:%s' % k for k in util.DIGESTS.keys()])
1377 @parthandler('remote-changegroup', _remotechangegroupparams)
1381 @parthandler('remote-changegroup', _remotechangegroupparams)
1378 def handleremotechangegroup(op, inpart):
1382 def handleremotechangegroup(op, inpart):
1379 """apply a bundle10 on the repo, given an url and validation information
1383 """apply a bundle10 on the repo, given an url and validation information
1380
1384
1381 All the information about the remote bundle to import are given as
1385 All the information about the remote bundle to import are given as
1382 parameters. The parameters include:
1386 parameters. The parameters include:
1383 - url: the url to the bundle10.
1387 - url: the url to the bundle10.
1384 - size: the bundle10 file size. It is used to validate what was
1388 - size: the bundle10 file size. It is used to validate what was
1385 retrieved by the client matches the server knowledge about the bundle.
1389 retrieved by the client matches the server knowledge about the bundle.
1386 - digests: a space separated list of the digest types provided as
1390 - digests: a space separated list of the digest types provided as
1387 parameters.
1391 parameters.
1388 - digest:<digest-type>: the hexadecimal representation of the digest with
1392 - digest:<digest-type>: the hexadecimal representation of the digest with
1389 that name. Like the size, it is used to validate what was retrieved by
1393 that name. Like the size, it is used to validate what was retrieved by
1390 the client matches what the server knows about the bundle.
1394 the client matches what the server knows about the bundle.
1391
1395
1392 When multiple digest types are given, all of them are checked.
1396 When multiple digest types are given, all of them are checked.
1393 """
1397 """
1394 try:
1398 try:
1395 raw_url = inpart.params['url']
1399 raw_url = inpart.params['url']
1396 except KeyError:
1400 except KeyError:
1397 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1401 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1398 parsed_url = util.url(raw_url)
1402 parsed_url = util.url(raw_url)
1399 if parsed_url.scheme not in capabilities['remote-changegroup']:
1403 if parsed_url.scheme not in capabilities['remote-changegroup']:
1400 raise error.Abort(_('remote-changegroup does not support %s urls') %
1404 raise error.Abort(_('remote-changegroup does not support %s urls') %
1401 parsed_url.scheme)
1405 parsed_url.scheme)
1402
1406
1403 try:
1407 try:
1404 size = int(inpart.params['size'])
1408 size = int(inpart.params['size'])
1405 except ValueError:
1409 except ValueError:
1406 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1410 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1407 % 'size')
1411 % 'size')
1408 except KeyError:
1412 except KeyError:
1409 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1413 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1410
1414
1411 digests = {}
1415 digests = {}
1412 for typ in inpart.params.get('digests', '').split():
1416 for typ in inpart.params.get('digests', '').split():
1413 param = 'digest:%s' % typ
1417 param = 'digest:%s' % typ
1414 try:
1418 try:
1415 value = inpart.params[param]
1419 value = inpart.params[param]
1416 except KeyError:
1420 except KeyError:
1417 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1421 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1418 param)
1422 param)
1419 digests[typ] = value
1423 digests[typ] = value
1420
1424
1421 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1425 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1422
1426
1423 # Make sure we trigger a transaction creation
1427 # Make sure we trigger a transaction creation
1424 #
1428 #
1425 # The addchangegroup function will get a transaction object by itself, but
1429 # The addchangegroup function will get a transaction object by itself, but
1426 # we need to make sure we trigger the creation of a transaction object used
1430 # we need to make sure we trigger the creation of a transaction object used
1427 # for the whole processing scope.
1431 # for the whole processing scope.
1428 op.gettransaction()
1432 op.gettransaction()
1429 from . import exchange
1433 from . import exchange
1430 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1434 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1431 if not isinstance(cg, changegroup.cg1unpacker):
1435 if not isinstance(cg, changegroup.cg1unpacker):
1432 raise error.Abort(_('%s: not a bundle version 1.0') %
1436 raise error.Abort(_('%s: not a bundle version 1.0') %
1433 util.hidepassword(raw_url))
1437 util.hidepassword(raw_url))
1434 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1438 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1435 op.records.add('changegroup', {'return': ret})
1439 op.records.add('changegroup', {'return': ret})
1436 if op.reply is not None:
1440 if op.reply is not None:
1437 # This is definitely not the final form of this
1441 # This is definitely not the final form of this
1438 # return. But one need to start somewhere.
1442 # return. But one need to start somewhere.
1439 part = op.reply.newpart('reply:changegroup')
1443 part = op.reply.newpart('reply:changegroup')
1440 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1444 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1441 part.addparam('return', '%i' % ret, mandatory=False)
1445 part.addparam('return', '%i' % ret, mandatory=False)
1442 try:
1446 try:
1443 real_part.validate()
1447 real_part.validate()
1444 except error.Abort as e:
1448 except error.Abort as e:
1445 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1449 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1446 (util.hidepassword(raw_url), str(e)))
1450 (util.hidepassword(raw_url), str(e)))
1447 assert not inpart.read()
1451 assert not inpart.read()
1448
1452
1449 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1453 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1450 def handlereplychangegroup(op, inpart):
1454 def handlereplychangegroup(op, inpart):
1451 ret = int(inpart.params['return'])
1455 ret = int(inpart.params['return'])
1452 replyto = int(inpart.params['in-reply-to'])
1456 replyto = int(inpart.params['in-reply-to'])
1453 op.records.add('changegroup', {'return': ret}, replyto)
1457 op.records.add('changegroup', {'return': ret}, replyto)
1454
1458
1455 @parthandler('check:heads')
1459 @parthandler('check:heads')
1456 def handlecheckheads(op, inpart):
1460 def handlecheckheads(op, inpart):
1457 """check that head of the repo did not change
1461 """check that head of the repo did not change
1458
1462
1459 This is used to detect a push race when using unbundle.
1463 This is used to detect a push race when using unbundle.
1460 This replaces the "heads" argument of unbundle."""
1464 This replaces the "heads" argument of unbundle."""
1461 h = inpart.read(20)
1465 h = inpart.read(20)
1462 heads = []
1466 heads = []
1463 while len(h) == 20:
1467 while len(h) == 20:
1464 heads.append(h)
1468 heads.append(h)
1465 h = inpart.read(20)
1469 h = inpart.read(20)
1466 assert not h
1470 assert not h
1467 # Trigger a transaction so that we are guaranteed to have the lock now.
1471 # Trigger a transaction so that we are guaranteed to have the lock now.
1468 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1472 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1469 op.gettransaction()
1473 op.gettransaction()
1470 if sorted(heads) != sorted(op.repo.heads()):
1474 if sorted(heads) != sorted(op.repo.heads()):
1471 raise error.PushRaced('repository changed while pushing - '
1475 raise error.PushRaced('repository changed while pushing - '
1472 'please try again')
1476 'please try again')
1473
1477
1474 @parthandler('output')
1478 @parthandler('output')
1475 def handleoutput(op, inpart):
1479 def handleoutput(op, inpart):
1476 """forward output captured on the server to the client"""
1480 """forward output captured on the server to the client"""
1477 for line in inpart.read().splitlines():
1481 for line in inpart.read().splitlines():
1478 op.ui.status(_('remote: %s\n') % line)
1482 op.ui.status(_('remote: %s\n') % line)
1479
1483
1480 @parthandler('replycaps')
1484 @parthandler('replycaps')
1481 def handlereplycaps(op, inpart):
1485 def handlereplycaps(op, inpart):
1482 """Notify that a reply bundle should be created
1486 """Notify that a reply bundle should be created
1483
1487
1484 The payload contains the capabilities information for the reply"""
1488 The payload contains the capabilities information for the reply"""
1485 caps = decodecaps(inpart.read())
1489 caps = decodecaps(inpart.read())
1486 if op.reply is None:
1490 if op.reply is None:
1487 op.reply = bundle20(op.ui, caps)
1491 op.reply = bundle20(op.ui, caps)
1488
1492
1489 class AbortFromPart(error.Abort):
1493 class AbortFromPart(error.Abort):
1490 """Sub-class of Abort that denotes an error from a bundle2 part."""
1494 """Sub-class of Abort that denotes an error from a bundle2 part."""
1491
1495
1492 @parthandler('error:abort', ('message', 'hint'))
1496 @parthandler('error:abort', ('message', 'hint'))
1493 def handleerrorabort(op, inpart):
1497 def handleerrorabort(op, inpart):
1494 """Used to transmit abort error over the wire"""
1498 """Used to transmit abort error over the wire"""
1495 raise AbortFromPart(inpart.params['message'],
1499 raise AbortFromPart(inpart.params['message'],
1496 hint=inpart.params.get('hint'))
1500 hint=inpart.params.get('hint'))
1497
1501
1498 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1502 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1499 'in-reply-to'))
1503 'in-reply-to'))
1500 def handleerrorpushkey(op, inpart):
1504 def handleerrorpushkey(op, inpart):
1501 """Used to transmit failure of a mandatory pushkey over the wire"""
1505 """Used to transmit failure of a mandatory pushkey over the wire"""
1502 kwargs = {}
1506 kwargs = {}
1503 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1507 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1504 value = inpart.params.get(name)
1508 value = inpart.params.get(name)
1505 if value is not None:
1509 if value is not None:
1506 kwargs[name] = value
1510 kwargs[name] = value
1507 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1511 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1508
1512
1509 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1513 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1510 def handleerrorunsupportedcontent(op, inpart):
1514 def handleerrorunsupportedcontent(op, inpart):
1511 """Used to transmit unknown content error over the wire"""
1515 """Used to transmit unknown content error over the wire"""
1512 kwargs = {}
1516 kwargs = {}
1513 parttype = inpart.params.get('parttype')
1517 parttype = inpart.params.get('parttype')
1514 if parttype is not None:
1518 if parttype is not None:
1515 kwargs['parttype'] = parttype
1519 kwargs['parttype'] = parttype
1516 params = inpart.params.get('params')
1520 params = inpart.params.get('params')
1517 if params is not None:
1521 if params is not None:
1518 kwargs['params'] = params.split('\0')
1522 kwargs['params'] = params.split('\0')
1519
1523
1520 raise error.BundleUnknownFeatureError(**kwargs)
1524 raise error.BundleUnknownFeatureError(**kwargs)
1521
1525
1522 @parthandler('error:pushraced', ('message',))
1526 @parthandler('error:pushraced', ('message',))
1523 def handleerrorpushraced(op, inpart):
1527 def handleerrorpushraced(op, inpart):
1524 """Used to transmit push race error over the wire"""
1528 """Used to transmit push race error over the wire"""
1525 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1529 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1526
1530
1527 @parthandler('listkeys', ('namespace',))
1531 @parthandler('listkeys', ('namespace',))
1528 def handlelistkeys(op, inpart):
1532 def handlelistkeys(op, inpart):
1529 """retrieve pushkey namespace content stored in a bundle2"""
1533 """retrieve pushkey namespace content stored in a bundle2"""
1530 namespace = inpart.params['namespace']
1534 namespace = inpart.params['namespace']
1531 r = pushkey.decodekeys(inpart.read())
1535 r = pushkey.decodekeys(inpart.read())
1532 op.records.add('listkeys', (namespace, r))
1536 op.records.add('listkeys', (namespace, r))
1533
1537
1534 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1538 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1535 def handlepushkey(op, inpart):
1539 def handlepushkey(op, inpart):
1536 """process a pushkey request"""
1540 """process a pushkey request"""
1537 dec = pushkey.decode
1541 dec = pushkey.decode
1538 namespace = dec(inpart.params['namespace'])
1542 namespace = dec(inpart.params['namespace'])
1539 key = dec(inpart.params['key'])
1543 key = dec(inpart.params['key'])
1540 old = dec(inpart.params['old'])
1544 old = dec(inpart.params['old'])
1541 new = dec(inpart.params['new'])
1545 new = dec(inpart.params['new'])
1542 # Grab the transaction to ensure that we have the lock before performing the
1546 # Grab the transaction to ensure that we have the lock before performing the
1543 # pushkey.
1547 # pushkey.
1544 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1548 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1545 op.gettransaction()
1549 op.gettransaction()
1546 ret = op.repo.pushkey(namespace, key, old, new)
1550 ret = op.repo.pushkey(namespace, key, old, new)
1547 record = {'namespace': namespace,
1551 record = {'namespace': namespace,
1548 'key': key,
1552 'key': key,
1549 'old': old,
1553 'old': old,
1550 'new': new}
1554 'new': new}
1551 op.records.add('pushkey', record)
1555 op.records.add('pushkey', record)
1552 if op.reply is not None:
1556 if op.reply is not None:
1553 rpart = op.reply.newpart('reply:pushkey')
1557 rpart = op.reply.newpart('reply:pushkey')
1554 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1558 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1555 rpart.addparam('return', '%i' % ret, mandatory=False)
1559 rpart.addparam('return', '%i' % ret, mandatory=False)
1556 if inpart.mandatory and not ret:
1560 if inpart.mandatory and not ret:
1557 kwargs = {}
1561 kwargs = {}
1558 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1562 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1559 if key in inpart.params:
1563 if key in inpart.params:
1560 kwargs[key] = inpart.params[key]
1564 kwargs[key] = inpart.params[key]
1561 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1565 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1562
1566
1563 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1567 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1564 def handlepushkeyreply(op, inpart):
1568 def handlepushkeyreply(op, inpart):
1565 """retrieve the result of a pushkey request"""
1569 """retrieve the result of a pushkey request"""
1566 ret = int(inpart.params['return'])
1570 ret = int(inpart.params['return'])
1567 partid = int(inpart.params['in-reply-to'])
1571 partid = int(inpart.params['in-reply-to'])
1568 op.records.add('pushkey', {'return': ret}, partid)
1572 op.records.add('pushkey', {'return': ret}, partid)
1569
1573
1570 @parthandler('obsmarkers')
1574 @parthandler('obsmarkers')
1571 def handleobsmarker(op, inpart):
1575 def handleobsmarker(op, inpart):
1572 """add a stream of obsmarkers to the repo"""
1576 """add a stream of obsmarkers to the repo"""
1573 tr = op.gettransaction()
1577 tr = op.gettransaction()
1574 markerdata = inpart.read()
1578 markerdata = inpart.read()
1575 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1579 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1576 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1580 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1577 % len(markerdata))
1581 % len(markerdata))
1578 # The mergemarkers call will crash if marker creation is not enabled.
1582 # The mergemarkers call will crash if marker creation is not enabled.
1579 # we want to avoid this if the part is advisory.
1583 # we want to avoid this if the part is advisory.
1580 if not inpart.mandatory and op.repo.obsstore.readonly:
1584 if not inpart.mandatory and op.repo.obsstore.readonly:
1581 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1585 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1582 return
1586 return
1583 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1587 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1584 if new:
1588 if new:
1585 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1589 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1586 op.records.add('obsmarkers', {'new': new})
1590 op.records.add('obsmarkers', {'new': new})
1587 if op.reply is not None:
1591 if op.reply is not None:
1588 rpart = op.reply.newpart('reply:obsmarkers')
1592 rpart = op.reply.newpart('reply:obsmarkers')
1589 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1593 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1590 rpart.addparam('new', '%i' % new, mandatory=False)
1594 rpart.addparam('new', '%i' % new, mandatory=False)
1591
1595
1592
1596
1593 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1597 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1594 def handleobsmarkerreply(op, inpart):
1598 def handleobsmarkerreply(op, inpart):
1595 """retrieve the result of a pushkey request"""
1599 """retrieve the result of a pushkey request"""
1596 ret = int(inpart.params['new'])
1600 ret = int(inpart.params['new'])
1597 partid = int(inpart.params['in-reply-to'])
1601 partid = int(inpart.params['in-reply-to'])
1598 op.records.add('obsmarkers', {'new': ret}, partid)
1602 op.records.add('obsmarkers', {'new': ret}, partid)
1599
1603
1600 @parthandler('hgtagsfnodes')
1604 @parthandler('hgtagsfnodes')
1601 def handlehgtagsfnodes(op, inpart):
1605 def handlehgtagsfnodes(op, inpart):
1602 """Applies .hgtags fnodes cache entries to the local repo.
1606 """Applies .hgtags fnodes cache entries to the local repo.
1603
1607
1604 Payload is pairs of 20 byte changeset nodes and filenodes.
1608 Payload is pairs of 20 byte changeset nodes and filenodes.
1605 """
1609 """
1606 # Grab the transaction so we ensure that we have the lock at this point.
1610 # Grab the transaction so we ensure that we have the lock at this point.
1607 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1611 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1608 op.gettransaction()
1612 op.gettransaction()
1609 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1613 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1610
1614
1611 count = 0
1615 count = 0
1612 while True:
1616 while True:
1613 node = inpart.read(20)
1617 node = inpart.read(20)
1614 fnode = inpart.read(20)
1618 fnode = inpart.read(20)
1615 if len(node) < 20 or len(fnode) < 20:
1619 if len(node) < 20 or len(fnode) < 20:
1616 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1620 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1617 break
1621 break
1618 cache.setfnode(node, fnode)
1622 cache.setfnode(node, fnode)
1619 count += 1
1623 count += 1
1620
1624
1621 cache.write()
1625 cache.write()
1622 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1626 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now