##// END OF EJS Templates
bundle2: use ProgrammingError
Jun Wu -
r31647:4dbef666 default
parent child Browse files
Show More
@@ -1,1626 +1,1626 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 pushkey,
161 pushkey,
162 pycompat,
162 pycompat,
163 tags,
163 tags,
164 url,
164 url,
165 util,
165 util,
166 )
166 )
167
167
168 urlerr = util.urlerr
168 urlerr = util.urlerr
169 urlreq = util.urlreq
169 urlreq = util.urlreq
170
170
171 _pack = struct.pack
171 _pack = struct.pack
172 _unpack = struct.unpack
172 _unpack = struct.unpack
173
173
174 _fstreamparamsize = '>i'
174 _fstreamparamsize = '>i'
175 _fpartheadersize = '>i'
175 _fpartheadersize = '>i'
176 _fparttypesize = '>B'
176 _fparttypesize = '>B'
177 _fpartid = '>I'
177 _fpartid = '>I'
178 _fpayloadsize = '>i'
178 _fpayloadsize = '>i'
179 _fpartparamcount = '>BB'
179 _fpartparamcount = '>BB'
180
180
181 preferedchunksize = 4096
181 preferedchunksize = 4096
182
182
183 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
183 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
184
184
185 def outdebug(ui, message):
185 def outdebug(ui, message):
186 """debug regarding output stream (bundling)"""
186 """debug regarding output stream (bundling)"""
187 if ui.configbool('devel', 'bundle2.debug', False):
187 if ui.configbool('devel', 'bundle2.debug', False):
188 ui.debug('bundle2-output: %s\n' % message)
188 ui.debug('bundle2-output: %s\n' % message)
189
189
190 def indebug(ui, message):
190 def indebug(ui, message):
191 """debug on input stream (unbundling)"""
191 """debug on input stream (unbundling)"""
192 if ui.configbool('devel', 'bundle2.debug', False):
192 if ui.configbool('devel', 'bundle2.debug', False):
193 ui.debug('bundle2-input: %s\n' % message)
193 ui.debug('bundle2-input: %s\n' % message)
194
194
195 def validateparttype(parttype):
195 def validateparttype(parttype):
196 """raise ValueError if a parttype contains invalid character"""
196 """raise ValueError if a parttype contains invalid character"""
197 if _parttypeforbidden.search(parttype):
197 if _parttypeforbidden.search(parttype):
198 raise ValueError(parttype)
198 raise ValueError(parttype)
199
199
200 def _makefpartparamsizes(nbparams):
200 def _makefpartparamsizes(nbparams):
201 """return a struct format to read part parameter sizes
201 """return a struct format to read part parameter sizes
202
202
203 The number parameters is variable so we need to build that format
203 The number parameters is variable so we need to build that format
204 dynamically.
204 dynamically.
205 """
205 """
206 return '>'+('BB'*nbparams)
206 return '>'+('BB'*nbparams)
207
207
208 parthandlermapping = {}
208 parthandlermapping = {}
209
209
210 def parthandler(parttype, params=()):
210 def parthandler(parttype, params=()):
211 """decorator that register a function as a bundle2 part handler
211 """decorator that register a function as a bundle2 part handler
212
212
213 eg::
213 eg::
214
214
215 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
215 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
216 def myparttypehandler(...):
216 def myparttypehandler(...):
217 '''process a part of type "my part".'''
217 '''process a part of type "my part".'''
218 ...
218 ...
219 """
219 """
220 validateparttype(parttype)
220 validateparttype(parttype)
221 def _decorator(func):
221 def _decorator(func):
222 lparttype = parttype.lower() # enforce lower case matching.
222 lparttype = parttype.lower() # enforce lower case matching.
223 assert lparttype not in parthandlermapping
223 assert lparttype not in parthandlermapping
224 parthandlermapping[lparttype] = func
224 parthandlermapping[lparttype] = func
225 func.params = frozenset(params)
225 func.params = frozenset(params)
226 return func
226 return func
227 return _decorator
227 return _decorator
228
228
229 class unbundlerecords(object):
229 class unbundlerecords(object):
230 """keep record of what happens during and unbundle
230 """keep record of what happens during and unbundle
231
231
232 New records are added using `records.add('cat', obj)`. Where 'cat' is a
232 New records are added using `records.add('cat', obj)`. Where 'cat' is a
233 category of record and obj is an arbitrary object.
233 category of record and obj is an arbitrary object.
234
234
235 `records['cat']` will return all entries of this category 'cat'.
235 `records['cat']` will return all entries of this category 'cat'.
236
236
237 Iterating on the object itself will yield `('category', obj)` tuples
237 Iterating on the object itself will yield `('category', obj)` tuples
238 for all entries.
238 for all entries.
239
239
240 All iterations happens in chronological order.
240 All iterations happens in chronological order.
241 """
241 """
242
242
243 def __init__(self):
243 def __init__(self):
244 self._categories = {}
244 self._categories = {}
245 self._sequences = []
245 self._sequences = []
246 self._replies = {}
246 self._replies = {}
247
247
248 def add(self, category, entry, inreplyto=None):
248 def add(self, category, entry, inreplyto=None):
249 """add a new record of a given category.
249 """add a new record of a given category.
250
250
251 The entry can then be retrieved in the list returned by
251 The entry can then be retrieved in the list returned by
252 self['category']."""
252 self['category']."""
253 self._categories.setdefault(category, []).append(entry)
253 self._categories.setdefault(category, []).append(entry)
254 self._sequences.append((category, entry))
254 self._sequences.append((category, entry))
255 if inreplyto is not None:
255 if inreplyto is not None:
256 self.getreplies(inreplyto).add(category, entry)
256 self.getreplies(inreplyto).add(category, entry)
257
257
258 def getreplies(self, partid):
258 def getreplies(self, partid):
259 """get the records that are replies to a specific part"""
259 """get the records that are replies to a specific part"""
260 return self._replies.setdefault(partid, unbundlerecords())
260 return self._replies.setdefault(partid, unbundlerecords())
261
261
262 def __getitem__(self, cat):
262 def __getitem__(self, cat):
263 return tuple(self._categories.get(cat, ()))
263 return tuple(self._categories.get(cat, ()))
264
264
265 def __iter__(self):
265 def __iter__(self):
266 return iter(self._sequences)
266 return iter(self._sequences)
267
267
268 def __len__(self):
268 def __len__(self):
269 return len(self._sequences)
269 return len(self._sequences)
270
270
271 def __nonzero__(self):
271 def __nonzero__(self):
272 return bool(self._sequences)
272 return bool(self._sequences)
273
273
274 __bool__ = __nonzero__
274 __bool__ = __nonzero__
275
275
276 class bundleoperation(object):
276 class bundleoperation(object):
277 """an object that represents a single bundling process
277 """an object that represents a single bundling process
278
278
279 Its purpose is to carry unbundle-related objects and states.
279 Its purpose is to carry unbundle-related objects and states.
280
280
281 A new object should be created at the beginning of each bundle processing.
281 A new object should be created at the beginning of each bundle processing.
282 The object is to be returned by the processing function.
282 The object is to be returned by the processing function.
283
283
284 The object has very little content now it will ultimately contain:
284 The object has very little content now it will ultimately contain:
285 * an access to the repo the bundle is applied to,
285 * an access to the repo the bundle is applied to,
286 * a ui object,
286 * a ui object,
287 * a way to retrieve a transaction to add changes to the repo,
287 * a way to retrieve a transaction to add changes to the repo,
288 * a way to record the result of processing each part,
288 * a way to record the result of processing each part,
289 * a way to construct a bundle response when applicable.
289 * a way to construct a bundle response when applicable.
290 """
290 """
291
291
292 def __init__(self, repo, transactiongetter, captureoutput=True):
292 def __init__(self, repo, transactiongetter, captureoutput=True):
293 self.repo = repo
293 self.repo = repo
294 self.ui = repo.ui
294 self.ui = repo.ui
295 self.records = unbundlerecords()
295 self.records = unbundlerecords()
296 self.gettransaction = transactiongetter
296 self.gettransaction = transactiongetter
297 self.reply = None
297 self.reply = None
298 self.captureoutput = captureoutput
298 self.captureoutput = captureoutput
299
299
300 class TransactionUnavailable(RuntimeError):
300 class TransactionUnavailable(RuntimeError):
301 pass
301 pass
302
302
303 def _notransaction():
303 def _notransaction():
304 """default method to get a transaction while processing a bundle
304 """default method to get a transaction while processing a bundle
305
305
306 Raise an exception to highlight the fact that no transaction was expected
306 Raise an exception to highlight the fact that no transaction was expected
307 to be created"""
307 to be created"""
308 raise TransactionUnavailable()
308 raise TransactionUnavailable()
309
309
310 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
310 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
311 # transform me into unbundler.apply() as soon as the freeze is lifted
311 # transform me into unbundler.apply() as soon as the freeze is lifted
312 tr.hookargs['bundle2'] = '1'
312 tr.hookargs['bundle2'] = '1'
313 if source is not None and 'source' not in tr.hookargs:
313 if source is not None and 'source' not in tr.hookargs:
314 tr.hookargs['source'] = source
314 tr.hookargs['source'] = source
315 if url is not None and 'url' not in tr.hookargs:
315 if url is not None and 'url' not in tr.hookargs:
316 tr.hookargs['url'] = url
316 tr.hookargs['url'] = url
317 return processbundle(repo, unbundler, lambda: tr, op=op)
317 return processbundle(repo, unbundler, lambda: tr, op=op)
318
318
319 def processbundle(repo, unbundler, transactiongetter=None, op=None):
319 def processbundle(repo, unbundler, transactiongetter=None, op=None):
320 """This function process a bundle, apply effect to/from a repo
320 """This function process a bundle, apply effect to/from a repo
321
321
322 It iterates over each part then searches for and uses the proper handling
322 It iterates over each part then searches for and uses the proper handling
323 code to process the part. Parts are processed in order.
323 code to process the part. Parts are processed in order.
324
324
325 Unknown Mandatory part will abort the process.
325 Unknown Mandatory part will abort the process.
326
326
327 It is temporarily possible to provide a prebuilt bundleoperation to the
327 It is temporarily possible to provide a prebuilt bundleoperation to the
328 function. This is used to ensure output is properly propagated in case of
328 function. This is used to ensure output is properly propagated in case of
329 an error during the unbundling. This output capturing part will likely be
329 an error during the unbundling. This output capturing part will likely be
330 reworked and this ability will probably go away in the process.
330 reworked and this ability will probably go away in the process.
331 """
331 """
332 if op is None:
332 if op is None:
333 if transactiongetter is None:
333 if transactiongetter is None:
334 transactiongetter = _notransaction
334 transactiongetter = _notransaction
335 op = bundleoperation(repo, transactiongetter)
335 op = bundleoperation(repo, transactiongetter)
336 # todo:
336 # todo:
337 # - replace this is a init function soon.
337 # - replace this is a init function soon.
338 # - exception catching
338 # - exception catching
339 unbundler.params
339 unbundler.params
340 if repo.ui.debugflag:
340 if repo.ui.debugflag:
341 msg = ['bundle2-input-bundle:']
341 msg = ['bundle2-input-bundle:']
342 if unbundler.params:
342 if unbundler.params:
343 msg.append(' %i params')
343 msg.append(' %i params')
344 if op.gettransaction is None:
344 if op.gettransaction is None:
345 msg.append(' no-transaction')
345 msg.append(' no-transaction')
346 else:
346 else:
347 msg.append(' with-transaction')
347 msg.append(' with-transaction')
348 msg.append('\n')
348 msg.append('\n')
349 repo.ui.debug(''.join(msg))
349 repo.ui.debug(''.join(msg))
350 iterparts = enumerate(unbundler.iterparts())
350 iterparts = enumerate(unbundler.iterparts())
351 part = None
351 part = None
352 nbpart = 0
352 nbpart = 0
353 try:
353 try:
354 for nbpart, part in iterparts:
354 for nbpart, part in iterparts:
355 _processpart(op, part)
355 _processpart(op, part)
356 except Exception as exc:
356 except Exception as exc:
357 for nbpart, part in iterparts:
357 for nbpart, part in iterparts:
358 # consume the bundle content
358 # consume the bundle content
359 part.seek(0, 2)
359 part.seek(0, 2)
360 # Small hack to let caller code distinguish exceptions from bundle2
360 # Small hack to let caller code distinguish exceptions from bundle2
361 # processing from processing the old format. This is mostly
361 # processing from processing the old format. This is mostly
362 # needed to handle different return codes to unbundle according to the
362 # needed to handle different return codes to unbundle according to the
363 # type of bundle. We should probably clean up or drop this return code
363 # type of bundle. We should probably clean up or drop this return code
364 # craziness in a future version.
364 # craziness in a future version.
365 exc.duringunbundle2 = True
365 exc.duringunbundle2 = True
366 salvaged = []
366 salvaged = []
367 replycaps = None
367 replycaps = None
368 if op.reply is not None:
368 if op.reply is not None:
369 salvaged = op.reply.salvageoutput()
369 salvaged = op.reply.salvageoutput()
370 replycaps = op.reply.capabilities
370 replycaps = op.reply.capabilities
371 exc._replycaps = replycaps
371 exc._replycaps = replycaps
372 exc._bundle2salvagedoutput = salvaged
372 exc._bundle2salvagedoutput = salvaged
373 raise
373 raise
374 finally:
374 finally:
375 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
375 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
376
376
377 return op
377 return op
378
378
379 def _processpart(op, part):
379 def _processpart(op, part):
380 """process a single part from a bundle
380 """process a single part from a bundle
381
381
382 The part is guaranteed to have been fully consumed when the function exits
382 The part is guaranteed to have been fully consumed when the function exits
383 (even if an exception is raised)."""
383 (even if an exception is raised)."""
384 status = 'unknown' # used by debug output
384 status = 'unknown' # used by debug output
385 hardabort = False
385 hardabort = False
386 try:
386 try:
387 try:
387 try:
388 handler = parthandlermapping.get(part.type)
388 handler = parthandlermapping.get(part.type)
389 if handler is None:
389 if handler is None:
390 status = 'unsupported-type'
390 status = 'unsupported-type'
391 raise error.BundleUnknownFeatureError(parttype=part.type)
391 raise error.BundleUnknownFeatureError(parttype=part.type)
392 indebug(op.ui, 'found a handler for part %r' % part.type)
392 indebug(op.ui, 'found a handler for part %r' % part.type)
393 unknownparams = part.mandatorykeys - handler.params
393 unknownparams = part.mandatorykeys - handler.params
394 if unknownparams:
394 if unknownparams:
395 unknownparams = list(unknownparams)
395 unknownparams = list(unknownparams)
396 unknownparams.sort()
396 unknownparams.sort()
397 status = 'unsupported-params (%s)' % unknownparams
397 status = 'unsupported-params (%s)' % unknownparams
398 raise error.BundleUnknownFeatureError(parttype=part.type,
398 raise error.BundleUnknownFeatureError(parttype=part.type,
399 params=unknownparams)
399 params=unknownparams)
400 status = 'supported'
400 status = 'supported'
401 except error.BundleUnknownFeatureError as exc:
401 except error.BundleUnknownFeatureError as exc:
402 if part.mandatory: # mandatory parts
402 if part.mandatory: # mandatory parts
403 raise
403 raise
404 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
404 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
405 return # skip to part processing
405 return # skip to part processing
406 finally:
406 finally:
407 if op.ui.debugflag:
407 if op.ui.debugflag:
408 msg = ['bundle2-input-part: "%s"' % part.type]
408 msg = ['bundle2-input-part: "%s"' % part.type]
409 if not part.mandatory:
409 if not part.mandatory:
410 msg.append(' (advisory)')
410 msg.append(' (advisory)')
411 nbmp = len(part.mandatorykeys)
411 nbmp = len(part.mandatorykeys)
412 nbap = len(part.params) - nbmp
412 nbap = len(part.params) - nbmp
413 if nbmp or nbap:
413 if nbmp or nbap:
414 msg.append(' (params:')
414 msg.append(' (params:')
415 if nbmp:
415 if nbmp:
416 msg.append(' %i mandatory' % nbmp)
416 msg.append(' %i mandatory' % nbmp)
417 if nbap:
417 if nbap:
418 msg.append(' %i advisory' % nbmp)
418 msg.append(' %i advisory' % nbmp)
419 msg.append(')')
419 msg.append(')')
420 msg.append(' %s\n' % status)
420 msg.append(' %s\n' % status)
421 op.ui.debug(''.join(msg))
421 op.ui.debug(''.join(msg))
422
422
423 # handler is called outside the above try block so that we don't
423 # handler is called outside the above try block so that we don't
424 # risk catching KeyErrors from anything other than the
424 # risk catching KeyErrors from anything other than the
425 # parthandlermapping lookup (any KeyError raised by handler()
425 # parthandlermapping lookup (any KeyError raised by handler()
426 # itself represents a defect of a different variety).
426 # itself represents a defect of a different variety).
427 output = None
427 output = None
428 if op.captureoutput and op.reply is not None:
428 if op.captureoutput and op.reply is not None:
429 op.ui.pushbuffer(error=True, subproc=True)
429 op.ui.pushbuffer(error=True, subproc=True)
430 output = ''
430 output = ''
431 try:
431 try:
432 handler(op, part)
432 handler(op, part)
433 finally:
433 finally:
434 if output is not None:
434 if output is not None:
435 output = op.ui.popbuffer()
435 output = op.ui.popbuffer()
436 if output:
436 if output:
437 outpart = op.reply.newpart('output', data=output,
437 outpart = op.reply.newpart('output', data=output,
438 mandatory=False)
438 mandatory=False)
439 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
439 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
440 # If exiting or interrupted, do not attempt to seek the stream in the
440 # If exiting or interrupted, do not attempt to seek the stream in the
441 # finally block below. This makes abort faster.
441 # finally block below. This makes abort faster.
442 except (SystemExit, KeyboardInterrupt):
442 except (SystemExit, KeyboardInterrupt):
443 hardabort = True
443 hardabort = True
444 raise
444 raise
445 finally:
445 finally:
446 # consume the part content to not corrupt the stream.
446 # consume the part content to not corrupt the stream.
447 if not hardabort:
447 if not hardabort:
448 part.seek(0, 2)
448 part.seek(0, 2)
449
449
450
450
451 def decodecaps(blob):
451 def decodecaps(blob):
452 """decode a bundle2 caps bytes blob into a dictionary
452 """decode a bundle2 caps bytes blob into a dictionary
453
453
454 The blob is a list of capabilities (one per line)
454 The blob is a list of capabilities (one per line)
455 Capabilities may have values using a line of the form::
455 Capabilities may have values using a line of the form::
456
456
457 capability=value1,value2,value3
457 capability=value1,value2,value3
458
458
459 The values are always a list."""
459 The values are always a list."""
460 caps = {}
460 caps = {}
461 for line in blob.splitlines():
461 for line in blob.splitlines():
462 if not line:
462 if not line:
463 continue
463 continue
464 if '=' not in line:
464 if '=' not in line:
465 key, vals = line, ()
465 key, vals = line, ()
466 else:
466 else:
467 key, vals = line.split('=', 1)
467 key, vals = line.split('=', 1)
468 vals = vals.split(',')
468 vals = vals.split(',')
469 key = urlreq.unquote(key)
469 key = urlreq.unquote(key)
470 vals = [urlreq.unquote(v) for v in vals]
470 vals = [urlreq.unquote(v) for v in vals]
471 caps[key] = vals
471 caps[key] = vals
472 return caps
472 return caps
473
473
474 def encodecaps(caps):
474 def encodecaps(caps):
475 """encode a bundle2 caps dictionary into a bytes blob"""
475 """encode a bundle2 caps dictionary into a bytes blob"""
476 chunks = []
476 chunks = []
477 for ca in sorted(caps):
477 for ca in sorted(caps):
478 vals = caps[ca]
478 vals = caps[ca]
479 ca = urlreq.quote(ca)
479 ca = urlreq.quote(ca)
480 vals = [urlreq.quote(v) for v in vals]
480 vals = [urlreq.quote(v) for v in vals]
481 if vals:
481 if vals:
482 ca = "%s=%s" % (ca, ','.join(vals))
482 ca = "%s=%s" % (ca, ','.join(vals))
483 chunks.append(ca)
483 chunks.append(ca)
484 return '\n'.join(chunks)
484 return '\n'.join(chunks)
485
485
486 bundletypes = {
486 bundletypes = {
487 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
487 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
488 # since the unification ssh accepts a header but there
488 # since the unification ssh accepts a header but there
489 # is no capability signaling it.
489 # is no capability signaling it.
490 "HG20": (), # special-cased below
490 "HG20": (), # special-cased below
491 "HG10UN": ("HG10UN", 'UN'),
491 "HG10UN": ("HG10UN", 'UN'),
492 "HG10BZ": ("HG10", 'BZ'),
492 "HG10BZ": ("HG10", 'BZ'),
493 "HG10GZ": ("HG10GZ", 'GZ'),
493 "HG10GZ": ("HG10GZ", 'GZ'),
494 }
494 }
495
495
496 # hgweb uses this list to communicate its preferred type
496 # hgweb uses this list to communicate its preferred type
497 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
497 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
498
498
499 class bundle20(object):
499 class bundle20(object):
500 """represent an outgoing bundle2 container
500 """represent an outgoing bundle2 container
501
501
502 Use the `addparam` method to add stream level parameter. and `newpart` to
502 Use the `addparam` method to add stream level parameter. and `newpart` to
503 populate it. Then call `getchunks` to retrieve all the binary chunks of
503 populate it. Then call `getchunks` to retrieve all the binary chunks of
504 data that compose the bundle2 container."""
504 data that compose the bundle2 container."""
505
505
506 _magicstring = 'HG20'
506 _magicstring = 'HG20'
507
507
508 def __init__(self, ui, capabilities=()):
508 def __init__(self, ui, capabilities=()):
509 self.ui = ui
509 self.ui = ui
510 self._params = []
510 self._params = []
511 self._parts = []
511 self._parts = []
512 self.capabilities = dict(capabilities)
512 self.capabilities = dict(capabilities)
513 self._compengine = util.compengines.forbundletype('UN')
513 self._compengine = util.compengines.forbundletype('UN')
514 self._compopts = None
514 self._compopts = None
515
515
516 def setcompression(self, alg, compopts=None):
516 def setcompression(self, alg, compopts=None):
517 """setup core part compression to <alg>"""
517 """setup core part compression to <alg>"""
518 if alg in (None, 'UN'):
518 if alg in (None, 'UN'):
519 return
519 return
520 assert not any(n.lower() == 'compression' for n, v in self._params)
520 assert not any(n.lower() == 'compression' for n, v in self._params)
521 self.addparam('Compression', alg)
521 self.addparam('Compression', alg)
522 self._compengine = util.compengines.forbundletype(alg)
522 self._compengine = util.compengines.forbundletype(alg)
523 self._compopts = compopts
523 self._compopts = compopts
524
524
525 @property
525 @property
526 def nbparts(self):
526 def nbparts(self):
527 """total number of parts added to the bundler"""
527 """total number of parts added to the bundler"""
528 return len(self._parts)
528 return len(self._parts)
529
529
530 # methods used to defines the bundle2 content
530 # methods used to defines the bundle2 content
531 def addparam(self, name, value=None):
531 def addparam(self, name, value=None):
532 """add a stream level parameter"""
532 """add a stream level parameter"""
533 if not name:
533 if not name:
534 raise ValueError('empty parameter name')
534 raise ValueError('empty parameter name')
535 if name[0] not in string.letters:
535 if name[0] not in string.letters:
536 raise ValueError('non letter first character: %r' % name)
536 raise ValueError('non letter first character: %r' % name)
537 self._params.append((name, value))
537 self._params.append((name, value))
538
538
539 def addpart(self, part):
539 def addpart(self, part):
540 """add a new part to the bundle2 container
540 """add a new part to the bundle2 container
541
541
542 Parts contains the actual applicative payload."""
542 Parts contains the actual applicative payload."""
543 assert part.id is None
543 assert part.id is None
544 part.id = len(self._parts) # very cheap counter
544 part.id = len(self._parts) # very cheap counter
545 self._parts.append(part)
545 self._parts.append(part)
546
546
547 def newpart(self, typeid, *args, **kwargs):
547 def newpart(self, typeid, *args, **kwargs):
548 """create a new part and add it to the containers
548 """create a new part and add it to the containers
549
549
550 As the part is directly added to the containers. For now, this means
550 As the part is directly added to the containers. For now, this means
551 that any failure to properly initialize the part after calling
551 that any failure to properly initialize the part after calling
552 ``newpart`` should result in a failure of the whole bundling process.
552 ``newpart`` should result in a failure of the whole bundling process.
553
553
554 You can still fall back to manually create and add if you need better
554 You can still fall back to manually create and add if you need better
555 control."""
555 control."""
556 part = bundlepart(typeid, *args, **kwargs)
556 part = bundlepart(typeid, *args, **kwargs)
557 self.addpart(part)
557 self.addpart(part)
558 return part
558 return part
559
559
560 # methods used to generate the bundle2 stream
560 # methods used to generate the bundle2 stream
561 def getchunks(self):
561 def getchunks(self):
562 if self.ui.debugflag:
562 if self.ui.debugflag:
563 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
563 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
564 if self._params:
564 if self._params:
565 msg.append(' (%i params)' % len(self._params))
565 msg.append(' (%i params)' % len(self._params))
566 msg.append(' %i parts total\n' % len(self._parts))
566 msg.append(' %i parts total\n' % len(self._parts))
567 self.ui.debug(''.join(msg))
567 self.ui.debug(''.join(msg))
568 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
568 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
569 yield self._magicstring
569 yield self._magicstring
570 param = self._paramchunk()
570 param = self._paramchunk()
571 outdebug(self.ui, 'bundle parameter: %s' % param)
571 outdebug(self.ui, 'bundle parameter: %s' % param)
572 yield _pack(_fstreamparamsize, len(param))
572 yield _pack(_fstreamparamsize, len(param))
573 if param:
573 if param:
574 yield param
574 yield param
575 for chunk in self._compengine.compressstream(self._getcorechunk(),
575 for chunk in self._compengine.compressstream(self._getcorechunk(),
576 self._compopts):
576 self._compopts):
577 yield chunk
577 yield chunk
578
578
579 def _paramchunk(self):
579 def _paramchunk(self):
580 """return a encoded version of all stream parameters"""
580 """return a encoded version of all stream parameters"""
581 blocks = []
581 blocks = []
582 for par, value in self._params:
582 for par, value in self._params:
583 par = urlreq.quote(par)
583 par = urlreq.quote(par)
584 if value is not None:
584 if value is not None:
585 value = urlreq.quote(value)
585 value = urlreq.quote(value)
586 par = '%s=%s' % (par, value)
586 par = '%s=%s' % (par, value)
587 blocks.append(par)
587 blocks.append(par)
588 return ' '.join(blocks)
588 return ' '.join(blocks)
589
589
590 def _getcorechunk(self):
590 def _getcorechunk(self):
591 """yield chunk for the core part of the bundle
591 """yield chunk for the core part of the bundle
592
592
593 (all but headers and parameters)"""
593 (all but headers and parameters)"""
594 outdebug(self.ui, 'start of parts')
594 outdebug(self.ui, 'start of parts')
595 for part in self._parts:
595 for part in self._parts:
596 outdebug(self.ui, 'bundle part: "%s"' % part.type)
596 outdebug(self.ui, 'bundle part: "%s"' % part.type)
597 for chunk in part.getchunks(ui=self.ui):
597 for chunk in part.getchunks(ui=self.ui):
598 yield chunk
598 yield chunk
599 outdebug(self.ui, 'end of bundle')
599 outdebug(self.ui, 'end of bundle')
600 yield _pack(_fpartheadersize, 0)
600 yield _pack(_fpartheadersize, 0)
601
601
602
602
603 def salvageoutput(self):
603 def salvageoutput(self):
604 """return a list with a copy of all output parts in the bundle
604 """return a list with a copy of all output parts in the bundle
605
605
606 This is meant to be used during error handling to make sure we preserve
606 This is meant to be used during error handling to make sure we preserve
607 server output"""
607 server output"""
608 salvaged = []
608 salvaged = []
609 for part in self._parts:
609 for part in self._parts:
610 if part.type.startswith('output'):
610 if part.type.startswith('output'):
611 salvaged.append(part.copy())
611 salvaged.append(part.copy())
612 return salvaged
612 return salvaged
613
613
614
614
615 class unpackermixin(object):
615 class unpackermixin(object):
616 """A mixin to extract bytes and struct data from a stream"""
616 """A mixin to extract bytes and struct data from a stream"""
617
617
618 def __init__(self, fp):
618 def __init__(self, fp):
619 self._fp = fp
619 self._fp = fp
620 self._seekable = (util.safehasattr(fp, 'seek') and
620 self._seekable = (util.safehasattr(fp, 'seek') and
621 util.safehasattr(fp, 'tell'))
621 util.safehasattr(fp, 'tell'))
622
622
623 def _unpack(self, format):
623 def _unpack(self, format):
624 """unpack this struct format from the stream"""
624 """unpack this struct format from the stream"""
625 data = self._readexact(struct.calcsize(format))
625 data = self._readexact(struct.calcsize(format))
626 return _unpack(format, data)
626 return _unpack(format, data)
627
627
628 def _readexact(self, size):
628 def _readexact(self, size):
629 """read exactly <size> bytes from the stream"""
629 """read exactly <size> bytes from the stream"""
630 return changegroup.readexactly(self._fp, size)
630 return changegroup.readexactly(self._fp, size)
631
631
632 def seek(self, offset, whence=0):
632 def seek(self, offset, whence=0):
633 """move the underlying file pointer"""
633 """move the underlying file pointer"""
634 if self._seekable:
634 if self._seekable:
635 return self._fp.seek(offset, whence)
635 return self._fp.seek(offset, whence)
636 else:
636 else:
637 raise NotImplementedError(_('File pointer is not seekable'))
637 raise NotImplementedError(_('File pointer is not seekable'))
638
638
639 def tell(self):
639 def tell(self):
640 """return the file offset, or None if file is not seekable"""
640 """return the file offset, or None if file is not seekable"""
641 if self._seekable:
641 if self._seekable:
642 try:
642 try:
643 return self._fp.tell()
643 return self._fp.tell()
644 except IOError as e:
644 except IOError as e:
645 if e.errno == errno.ESPIPE:
645 if e.errno == errno.ESPIPE:
646 self._seekable = False
646 self._seekable = False
647 else:
647 else:
648 raise
648 raise
649 return None
649 return None
650
650
651 def close(self):
651 def close(self):
652 """close underlying file"""
652 """close underlying file"""
653 if util.safehasattr(self._fp, 'close'):
653 if util.safehasattr(self._fp, 'close'):
654 return self._fp.close()
654 return self._fp.close()
655
655
656 def getunbundler(ui, fp, magicstring=None):
656 def getunbundler(ui, fp, magicstring=None):
657 """return a valid unbundler object for a given magicstring"""
657 """return a valid unbundler object for a given magicstring"""
658 if magicstring is None:
658 if magicstring is None:
659 magicstring = changegroup.readexactly(fp, 4)
659 magicstring = changegroup.readexactly(fp, 4)
660 magic, version = magicstring[0:2], magicstring[2:4]
660 magic, version = magicstring[0:2], magicstring[2:4]
661 if magic != 'HG':
661 if magic != 'HG':
662 raise error.Abort(_('not a Mercurial bundle'))
662 raise error.Abort(_('not a Mercurial bundle'))
663 unbundlerclass = formatmap.get(version)
663 unbundlerclass = formatmap.get(version)
664 if unbundlerclass is None:
664 if unbundlerclass is None:
665 raise error.Abort(_('unknown bundle version %s') % version)
665 raise error.Abort(_('unknown bundle version %s') % version)
666 unbundler = unbundlerclass(ui, fp)
666 unbundler = unbundlerclass(ui, fp)
667 indebug(ui, 'start processing of %s stream' % magicstring)
667 indebug(ui, 'start processing of %s stream' % magicstring)
668 return unbundler
668 return unbundler
669
669
670 class unbundle20(unpackermixin):
670 class unbundle20(unpackermixin):
671 """interpret a bundle2 stream
671 """interpret a bundle2 stream
672
672
673 This class is fed with a binary stream and yields parts through its
673 This class is fed with a binary stream and yields parts through its
674 `iterparts` methods."""
674 `iterparts` methods."""
675
675
676 _magicstring = 'HG20'
676 _magicstring = 'HG20'
677
677
678 def __init__(self, ui, fp):
678 def __init__(self, ui, fp):
679 """If header is specified, we do not read it out of the stream."""
679 """If header is specified, we do not read it out of the stream."""
680 self.ui = ui
680 self.ui = ui
681 self._compengine = util.compengines.forbundletype('UN')
681 self._compengine = util.compengines.forbundletype('UN')
682 self._compressed = None
682 self._compressed = None
683 super(unbundle20, self).__init__(fp)
683 super(unbundle20, self).__init__(fp)
684
684
685 @util.propertycache
685 @util.propertycache
686 def params(self):
686 def params(self):
687 """dictionary of stream level parameters"""
687 """dictionary of stream level parameters"""
688 indebug(self.ui, 'reading bundle2 stream parameters')
688 indebug(self.ui, 'reading bundle2 stream parameters')
689 params = {}
689 params = {}
690 paramssize = self._unpack(_fstreamparamsize)[0]
690 paramssize = self._unpack(_fstreamparamsize)[0]
691 if paramssize < 0:
691 if paramssize < 0:
692 raise error.BundleValueError('negative bundle param size: %i'
692 raise error.BundleValueError('negative bundle param size: %i'
693 % paramssize)
693 % paramssize)
694 if paramssize:
694 if paramssize:
695 params = self._readexact(paramssize)
695 params = self._readexact(paramssize)
696 params = self._processallparams(params)
696 params = self._processallparams(params)
697 return params
697 return params
698
698
699 def _processallparams(self, paramsblock):
699 def _processallparams(self, paramsblock):
700 """"""
700 """"""
701 params = util.sortdict()
701 params = util.sortdict()
702 for p in paramsblock.split(' '):
702 for p in paramsblock.split(' '):
703 p = p.split('=', 1)
703 p = p.split('=', 1)
704 p = [urlreq.unquote(i) for i in p]
704 p = [urlreq.unquote(i) for i in p]
705 if len(p) < 2:
705 if len(p) < 2:
706 p.append(None)
706 p.append(None)
707 self._processparam(*p)
707 self._processparam(*p)
708 params[p[0]] = p[1]
708 params[p[0]] = p[1]
709 return params
709 return params
710
710
711
711
712 def _processparam(self, name, value):
712 def _processparam(self, name, value):
713 """process a parameter, applying its effect if needed
713 """process a parameter, applying its effect if needed
714
714
715 Parameter starting with a lower case letter are advisory and will be
715 Parameter starting with a lower case letter are advisory and will be
716 ignored when unknown. Those starting with an upper case letter are
716 ignored when unknown. Those starting with an upper case letter are
717 mandatory and will this function will raise a KeyError when unknown.
717 mandatory and will this function will raise a KeyError when unknown.
718
718
719 Note: no option are currently supported. Any input will be either
719 Note: no option are currently supported. Any input will be either
720 ignored or failing.
720 ignored or failing.
721 """
721 """
722 if not name:
722 if not name:
723 raise ValueError('empty parameter name')
723 raise ValueError('empty parameter name')
724 if name[0] not in string.letters:
724 if name[0] not in string.letters:
725 raise ValueError('non letter first character: %r' % name)
725 raise ValueError('non letter first character: %r' % name)
726 try:
726 try:
727 handler = b2streamparamsmap[name.lower()]
727 handler = b2streamparamsmap[name.lower()]
728 except KeyError:
728 except KeyError:
729 if name[0].islower():
729 if name[0].islower():
730 indebug(self.ui, "ignoring unknown parameter %r" % name)
730 indebug(self.ui, "ignoring unknown parameter %r" % name)
731 else:
731 else:
732 raise error.BundleUnknownFeatureError(params=(name,))
732 raise error.BundleUnknownFeatureError(params=(name,))
733 else:
733 else:
734 handler(self, name, value)
734 handler(self, name, value)
735
735
736 def _forwardchunks(self):
736 def _forwardchunks(self):
737 """utility to transfer a bundle2 as binary
737 """utility to transfer a bundle2 as binary
738
738
739 This is made necessary by the fact the 'getbundle' command over 'ssh'
739 This is made necessary by the fact the 'getbundle' command over 'ssh'
740 have no way to know then the reply end, relying on the bundle to be
740 have no way to know then the reply end, relying on the bundle to be
741 interpreted to know its end. This is terrible and we are sorry, but we
741 interpreted to know its end. This is terrible and we are sorry, but we
742 needed to move forward to get general delta enabled.
742 needed to move forward to get general delta enabled.
743 """
743 """
744 yield self._magicstring
744 yield self._magicstring
745 assert 'params' not in vars(self)
745 assert 'params' not in vars(self)
746 paramssize = self._unpack(_fstreamparamsize)[0]
746 paramssize = self._unpack(_fstreamparamsize)[0]
747 if paramssize < 0:
747 if paramssize < 0:
748 raise error.BundleValueError('negative bundle param size: %i'
748 raise error.BundleValueError('negative bundle param size: %i'
749 % paramssize)
749 % paramssize)
750 yield _pack(_fstreamparamsize, paramssize)
750 yield _pack(_fstreamparamsize, paramssize)
751 if paramssize:
751 if paramssize:
752 params = self._readexact(paramssize)
752 params = self._readexact(paramssize)
753 self._processallparams(params)
753 self._processallparams(params)
754 yield params
754 yield params
755 assert self._compengine.bundletype == 'UN'
755 assert self._compengine.bundletype == 'UN'
756 # From there, payload might need to be decompressed
756 # From there, payload might need to be decompressed
757 self._fp = self._compengine.decompressorreader(self._fp)
757 self._fp = self._compengine.decompressorreader(self._fp)
758 emptycount = 0
758 emptycount = 0
759 while emptycount < 2:
759 while emptycount < 2:
760 # so we can brainlessly loop
760 # so we can brainlessly loop
761 assert _fpartheadersize == _fpayloadsize
761 assert _fpartheadersize == _fpayloadsize
762 size = self._unpack(_fpartheadersize)[0]
762 size = self._unpack(_fpartheadersize)[0]
763 yield _pack(_fpartheadersize, size)
763 yield _pack(_fpartheadersize, size)
764 if size:
764 if size:
765 emptycount = 0
765 emptycount = 0
766 else:
766 else:
767 emptycount += 1
767 emptycount += 1
768 continue
768 continue
769 if size == flaginterrupt:
769 if size == flaginterrupt:
770 continue
770 continue
771 elif size < 0:
771 elif size < 0:
772 raise error.BundleValueError('negative chunk size: %i')
772 raise error.BundleValueError('negative chunk size: %i')
773 yield self._readexact(size)
773 yield self._readexact(size)
774
774
775
775
776 def iterparts(self):
776 def iterparts(self):
777 """yield all parts contained in the stream"""
777 """yield all parts contained in the stream"""
778 # make sure param have been loaded
778 # make sure param have been loaded
779 self.params
779 self.params
780 # From there, payload need to be decompressed
780 # From there, payload need to be decompressed
781 self._fp = self._compengine.decompressorreader(self._fp)
781 self._fp = self._compengine.decompressorreader(self._fp)
782 indebug(self.ui, 'start extraction of bundle2 parts')
782 indebug(self.ui, 'start extraction of bundle2 parts')
783 headerblock = self._readpartheader()
783 headerblock = self._readpartheader()
784 while headerblock is not None:
784 while headerblock is not None:
785 part = unbundlepart(self.ui, headerblock, self._fp)
785 part = unbundlepart(self.ui, headerblock, self._fp)
786 yield part
786 yield part
787 part.seek(0, 2)
787 part.seek(0, 2)
788 headerblock = self._readpartheader()
788 headerblock = self._readpartheader()
789 indebug(self.ui, 'end of bundle2 stream')
789 indebug(self.ui, 'end of bundle2 stream')
790
790
791 def _readpartheader(self):
791 def _readpartheader(self):
792 """reads a part header size and return the bytes blob
792 """reads a part header size and return the bytes blob
793
793
794 returns None if empty"""
794 returns None if empty"""
795 headersize = self._unpack(_fpartheadersize)[0]
795 headersize = self._unpack(_fpartheadersize)[0]
796 if headersize < 0:
796 if headersize < 0:
797 raise error.BundleValueError('negative part header size: %i'
797 raise error.BundleValueError('negative part header size: %i'
798 % headersize)
798 % headersize)
799 indebug(self.ui, 'part header size: %i' % headersize)
799 indebug(self.ui, 'part header size: %i' % headersize)
800 if headersize:
800 if headersize:
801 return self._readexact(headersize)
801 return self._readexact(headersize)
802 return None
802 return None
803
803
804 def compressed(self):
804 def compressed(self):
805 self.params # load params
805 self.params # load params
806 return self._compressed
806 return self._compressed
807
807
808 formatmap = {'20': unbundle20}
808 formatmap = {'20': unbundle20}
809
809
810 b2streamparamsmap = {}
810 b2streamparamsmap = {}
811
811
812 def b2streamparamhandler(name):
812 def b2streamparamhandler(name):
813 """register a handler for a stream level parameter"""
813 """register a handler for a stream level parameter"""
814 def decorator(func):
814 def decorator(func):
815 assert name not in formatmap
815 assert name not in formatmap
816 b2streamparamsmap[name] = func
816 b2streamparamsmap[name] = func
817 return func
817 return func
818 return decorator
818 return decorator
819
819
820 @b2streamparamhandler('compression')
820 @b2streamparamhandler('compression')
821 def processcompression(unbundler, param, value):
821 def processcompression(unbundler, param, value):
822 """read compression parameter and install payload decompression"""
822 """read compression parameter and install payload decompression"""
823 if value not in util.compengines.supportedbundletypes:
823 if value not in util.compengines.supportedbundletypes:
824 raise error.BundleUnknownFeatureError(params=(param,),
824 raise error.BundleUnknownFeatureError(params=(param,),
825 values=(value,))
825 values=(value,))
826 unbundler._compengine = util.compengines.forbundletype(value)
826 unbundler._compengine = util.compengines.forbundletype(value)
827 if value is not None:
827 if value is not None:
828 unbundler._compressed = True
828 unbundler._compressed = True
829
829
830 class bundlepart(object):
830 class bundlepart(object):
831 """A bundle2 part contains application level payload
831 """A bundle2 part contains application level payload
832
832
833 The part `type` is used to route the part to the application level
833 The part `type` is used to route the part to the application level
834 handler.
834 handler.
835
835
836 The part payload is contained in ``part.data``. It could be raw bytes or a
836 The part payload is contained in ``part.data``. It could be raw bytes or a
837 generator of byte chunks.
837 generator of byte chunks.
838
838
839 You can add parameters to the part using the ``addparam`` method.
839 You can add parameters to the part using the ``addparam`` method.
840 Parameters can be either mandatory (default) or advisory. Remote side
840 Parameters can be either mandatory (default) or advisory. Remote side
841 should be able to safely ignore the advisory ones.
841 should be able to safely ignore the advisory ones.
842
842
843 Both data and parameters cannot be modified after the generation has begun.
843 Both data and parameters cannot be modified after the generation has begun.
844 """
844 """
845
845
846 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
846 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
847 data='', mandatory=True):
847 data='', mandatory=True):
848 validateparttype(parttype)
848 validateparttype(parttype)
849 self.id = None
849 self.id = None
850 self.type = parttype
850 self.type = parttype
851 self._data = data
851 self._data = data
852 self._mandatoryparams = list(mandatoryparams)
852 self._mandatoryparams = list(mandatoryparams)
853 self._advisoryparams = list(advisoryparams)
853 self._advisoryparams = list(advisoryparams)
854 # checking for duplicated entries
854 # checking for duplicated entries
855 self._seenparams = set()
855 self._seenparams = set()
856 for pname, __ in self._mandatoryparams + self._advisoryparams:
856 for pname, __ in self._mandatoryparams + self._advisoryparams:
857 if pname in self._seenparams:
857 if pname in self._seenparams:
858 raise RuntimeError('duplicated params: %s' % pname)
858 raise error.ProgrammingError('duplicated params: %s' % pname)
859 self._seenparams.add(pname)
859 self._seenparams.add(pname)
860 # status of the part's generation:
860 # status of the part's generation:
861 # - None: not started,
861 # - None: not started,
862 # - False: currently generated,
862 # - False: currently generated,
863 # - True: generation done.
863 # - True: generation done.
864 self._generated = None
864 self._generated = None
865 self.mandatory = mandatory
865 self.mandatory = mandatory
866
866
867 def __repr__(self):
867 def __repr__(self):
868 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
868 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
869 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
869 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
870 % (cls, id(self), self.id, self.type, self.mandatory))
870 % (cls, id(self), self.id, self.type, self.mandatory))
871
871
872 def copy(self):
872 def copy(self):
873 """return a copy of the part
873 """return a copy of the part
874
874
875 The new part have the very same content but no partid assigned yet.
875 The new part have the very same content but no partid assigned yet.
876 Parts with generated data cannot be copied."""
876 Parts with generated data cannot be copied."""
877 assert not util.safehasattr(self.data, 'next')
877 assert not util.safehasattr(self.data, 'next')
878 return self.__class__(self.type, self._mandatoryparams,
878 return self.__class__(self.type, self._mandatoryparams,
879 self._advisoryparams, self._data, self.mandatory)
879 self._advisoryparams, self._data, self.mandatory)
880
880
881 # methods used to defines the part content
881 # methods used to defines the part content
882 @property
882 @property
883 def data(self):
883 def data(self):
884 return self._data
884 return self._data
885
885
886 @data.setter
886 @data.setter
887 def data(self, data):
887 def data(self, data):
888 if self._generated is not None:
888 if self._generated is not None:
889 raise error.ReadOnlyPartError('part is being generated')
889 raise error.ReadOnlyPartError('part is being generated')
890 self._data = data
890 self._data = data
891
891
892 @property
892 @property
893 def mandatoryparams(self):
893 def mandatoryparams(self):
894 # make it an immutable tuple to force people through ``addparam``
894 # make it an immutable tuple to force people through ``addparam``
895 return tuple(self._mandatoryparams)
895 return tuple(self._mandatoryparams)
896
896
897 @property
897 @property
898 def advisoryparams(self):
898 def advisoryparams(self):
899 # make it an immutable tuple to force people through ``addparam``
899 # make it an immutable tuple to force people through ``addparam``
900 return tuple(self._advisoryparams)
900 return tuple(self._advisoryparams)
901
901
902 def addparam(self, name, value='', mandatory=True):
902 def addparam(self, name, value='', mandatory=True):
903 if self._generated is not None:
903 if self._generated is not None:
904 raise error.ReadOnlyPartError('part is being generated')
904 raise error.ReadOnlyPartError('part is being generated')
905 if name in self._seenparams:
905 if name in self._seenparams:
906 raise ValueError('duplicated params: %s' % name)
906 raise ValueError('duplicated params: %s' % name)
907 self._seenparams.add(name)
907 self._seenparams.add(name)
908 params = self._advisoryparams
908 params = self._advisoryparams
909 if mandatory:
909 if mandatory:
910 params = self._mandatoryparams
910 params = self._mandatoryparams
911 params.append((name, value))
911 params.append((name, value))
912
912
913 # methods used to generates the bundle2 stream
913 # methods used to generates the bundle2 stream
914 def getchunks(self, ui):
914 def getchunks(self, ui):
915 if self._generated is not None:
915 if self._generated is not None:
916 raise RuntimeError('part can only be consumed once')
916 raise error.ProgrammingError('part can only be consumed once')
917 self._generated = False
917 self._generated = False
918
918
919 if ui.debugflag:
919 if ui.debugflag:
920 msg = ['bundle2-output-part: "%s"' % self.type]
920 msg = ['bundle2-output-part: "%s"' % self.type]
921 if not self.mandatory:
921 if not self.mandatory:
922 msg.append(' (advisory)')
922 msg.append(' (advisory)')
923 nbmp = len(self.mandatoryparams)
923 nbmp = len(self.mandatoryparams)
924 nbap = len(self.advisoryparams)
924 nbap = len(self.advisoryparams)
925 if nbmp or nbap:
925 if nbmp or nbap:
926 msg.append(' (params:')
926 msg.append(' (params:')
927 if nbmp:
927 if nbmp:
928 msg.append(' %i mandatory' % nbmp)
928 msg.append(' %i mandatory' % nbmp)
929 if nbap:
929 if nbap:
930 msg.append(' %i advisory' % nbmp)
930 msg.append(' %i advisory' % nbmp)
931 msg.append(')')
931 msg.append(')')
932 if not self.data:
932 if not self.data:
933 msg.append(' empty payload')
933 msg.append(' empty payload')
934 elif util.safehasattr(self.data, 'next'):
934 elif util.safehasattr(self.data, 'next'):
935 msg.append(' streamed payload')
935 msg.append(' streamed payload')
936 else:
936 else:
937 msg.append(' %i bytes payload' % len(self.data))
937 msg.append(' %i bytes payload' % len(self.data))
938 msg.append('\n')
938 msg.append('\n')
939 ui.debug(''.join(msg))
939 ui.debug(''.join(msg))
940
940
941 #### header
941 #### header
942 if self.mandatory:
942 if self.mandatory:
943 parttype = self.type.upper()
943 parttype = self.type.upper()
944 else:
944 else:
945 parttype = self.type.lower()
945 parttype = self.type.lower()
946 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
946 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
947 ## parttype
947 ## parttype
948 header = [_pack(_fparttypesize, len(parttype)),
948 header = [_pack(_fparttypesize, len(parttype)),
949 parttype, _pack(_fpartid, self.id),
949 parttype, _pack(_fpartid, self.id),
950 ]
950 ]
951 ## parameters
951 ## parameters
952 # count
952 # count
953 manpar = self.mandatoryparams
953 manpar = self.mandatoryparams
954 advpar = self.advisoryparams
954 advpar = self.advisoryparams
955 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
955 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
956 # size
956 # size
957 parsizes = []
957 parsizes = []
958 for key, value in manpar:
958 for key, value in manpar:
959 parsizes.append(len(key))
959 parsizes.append(len(key))
960 parsizes.append(len(value))
960 parsizes.append(len(value))
961 for key, value in advpar:
961 for key, value in advpar:
962 parsizes.append(len(key))
962 parsizes.append(len(key))
963 parsizes.append(len(value))
963 parsizes.append(len(value))
964 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
964 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
965 header.append(paramsizes)
965 header.append(paramsizes)
966 # key, value
966 # key, value
967 for key, value in manpar:
967 for key, value in manpar:
968 header.append(key)
968 header.append(key)
969 header.append(value)
969 header.append(value)
970 for key, value in advpar:
970 for key, value in advpar:
971 header.append(key)
971 header.append(key)
972 header.append(value)
972 header.append(value)
973 ## finalize header
973 ## finalize header
974 headerchunk = ''.join(header)
974 headerchunk = ''.join(header)
975 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
975 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
976 yield _pack(_fpartheadersize, len(headerchunk))
976 yield _pack(_fpartheadersize, len(headerchunk))
977 yield headerchunk
977 yield headerchunk
978 ## payload
978 ## payload
979 try:
979 try:
980 for chunk in self._payloadchunks():
980 for chunk in self._payloadchunks():
981 outdebug(ui, 'payload chunk size: %i' % len(chunk))
981 outdebug(ui, 'payload chunk size: %i' % len(chunk))
982 yield _pack(_fpayloadsize, len(chunk))
982 yield _pack(_fpayloadsize, len(chunk))
983 yield chunk
983 yield chunk
984 except GeneratorExit:
984 except GeneratorExit:
985 # GeneratorExit means that nobody is listening for our
985 # GeneratorExit means that nobody is listening for our
986 # results anyway, so just bail quickly rather than trying
986 # results anyway, so just bail quickly rather than trying
987 # to produce an error part.
987 # to produce an error part.
988 ui.debug('bundle2-generatorexit\n')
988 ui.debug('bundle2-generatorexit\n')
989 raise
989 raise
990 except BaseException as exc:
990 except BaseException as exc:
991 # backup exception data for later
991 # backup exception data for later
992 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
992 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
993 % exc)
993 % exc)
994 exc_info = sys.exc_info()
994 exc_info = sys.exc_info()
995 msg = 'unexpected error: %s' % exc
995 msg = 'unexpected error: %s' % exc
996 interpart = bundlepart('error:abort', [('message', msg)],
996 interpart = bundlepart('error:abort', [('message', msg)],
997 mandatory=False)
997 mandatory=False)
998 interpart.id = 0
998 interpart.id = 0
999 yield _pack(_fpayloadsize, -1)
999 yield _pack(_fpayloadsize, -1)
1000 for chunk in interpart.getchunks(ui=ui):
1000 for chunk in interpart.getchunks(ui=ui):
1001 yield chunk
1001 yield chunk
1002 outdebug(ui, 'closing payload chunk')
1002 outdebug(ui, 'closing payload chunk')
1003 # abort current part payload
1003 # abort current part payload
1004 yield _pack(_fpayloadsize, 0)
1004 yield _pack(_fpayloadsize, 0)
1005 if pycompat.ispy3:
1005 if pycompat.ispy3:
1006 raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
1006 raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
1007 else:
1007 else:
1008 exec("""raise exc_info[0], exc_info[1], exc_info[2]""")
1008 exec("""raise exc_info[0], exc_info[1], exc_info[2]""")
1009 # end of payload
1009 # end of payload
1010 outdebug(ui, 'closing payload chunk')
1010 outdebug(ui, 'closing payload chunk')
1011 yield _pack(_fpayloadsize, 0)
1011 yield _pack(_fpayloadsize, 0)
1012 self._generated = True
1012 self._generated = True
1013
1013
1014 def _payloadchunks(self):
1014 def _payloadchunks(self):
1015 """yield chunks of a the part payload
1015 """yield chunks of a the part payload
1016
1016
1017 Exists to handle the different methods to provide data to a part."""
1017 Exists to handle the different methods to provide data to a part."""
1018 # we only support fixed size data now.
1018 # we only support fixed size data now.
1019 # This will be improved in the future.
1019 # This will be improved in the future.
1020 if util.safehasattr(self.data, 'next'):
1020 if util.safehasattr(self.data, 'next'):
1021 buff = util.chunkbuffer(self.data)
1021 buff = util.chunkbuffer(self.data)
1022 chunk = buff.read(preferedchunksize)
1022 chunk = buff.read(preferedchunksize)
1023 while chunk:
1023 while chunk:
1024 yield chunk
1024 yield chunk
1025 chunk = buff.read(preferedchunksize)
1025 chunk = buff.read(preferedchunksize)
1026 elif len(self.data):
1026 elif len(self.data):
1027 yield self.data
1027 yield self.data
1028
1028
1029
1029
1030 flaginterrupt = -1
1030 flaginterrupt = -1
1031
1031
1032 class interrupthandler(unpackermixin):
1032 class interrupthandler(unpackermixin):
1033 """read one part and process it with restricted capability
1033 """read one part and process it with restricted capability
1034
1034
1035 This allows to transmit exception raised on the producer size during part
1035 This allows to transmit exception raised on the producer size during part
1036 iteration while the consumer is reading a part.
1036 iteration while the consumer is reading a part.
1037
1037
1038 Part processed in this manner only have access to a ui object,"""
1038 Part processed in this manner only have access to a ui object,"""
1039
1039
1040 def __init__(self, ui, fp):
1040 def __init__(self, ui, fp):
1041 super(interrupthandler, self).__init__(fp)
1041 super(interrupthandler, self).__init__(fp)
1042 self.ui = ui
1042 self.ui = ui
1043
1043
1044 def _readpartheader(self):
1044 def _readpartheader(self):
1045 """reads a part header size and return the bytes blob
1045 """reads a part header size and return the bytes blob
1046
1046
1047 returns None if empty"""
1047 returns None if empty"""
1048 headersize = self._unpack(_fpartheadersize)[0]
1048 headersize = self._unpack(_fpartheadersize)[0]
1049 if headersize < 0:
1049 if headersize < 0:
1050 raise error.BundleValueError('negative part header size: %i'
1050 raise error.BundleValueError('negative part header size: %i'
1051 % headersize)
1051 % headersize)
1052 indebug(self.ui, 'part header size: %i\n' % headersize)
1052 indebug(self.ui, 'part header size: %i\n' % headersize)
1053 if headersize:
1053 if headersize:
1054 return self._readexact(headersize)
1054 return self._readexact(headersize)
1055 return None
1055 return None
1056
1056
1057 def __call__(self):
1057 def __call__(self):
1058
1058
1059 self.ui.debug('bundle2-input-stream-interrupt:'
1059 self.ui.debug('bundle2-input-stream-interrupt:'
1060 ' opening out of band context\n')
1060 ' opening out of band context\n')
1061 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1061 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1062 headerblock = self._readpartheader()
1062 headerblock = self._readpartheader()
1063 if headerblock is None:
1063 if headerblock is None:
1064 indebug(self.ui, 'no part found during interruption.')
1064 indebug(self.ui, 'no part found during interruption.')
1065 return
1065 return
1066 part = unbundlepart(self.ui, headerblock, self._fp)
1066 part = unbundlepart(self.ui, headerblock, self._fp)
1067 op = interruptoperation(self.ui)
1067 op = interruptoperation(self.ui)
1068 _processpart(op, part)
1068 _processpart(op, part)
1069 self.ui.debug('bundle2-input-stream-interrupt:'
1069 self.ui.debug('bundle2-input-stream-interrupt:'
1070 ' closing out of band context\n')
1070 ' closing out of band context\n')
1071
1071
1072 class interruptoperation(object):
1072 class interruptoperation(object):
1073 """A limited operation to be use by part handler during interruption
1073 """A limited operation to be use by part handler during interruption
1074
1074
1075 It only have access to an ui object.
1075 It only have access to an ui object.
1076 """
1076 """
1077
1077
1078 def __init__(self, ui):
1078 def __init__(self, ui):
1079 self.ui = ui
1079 self.ui = ui
1080 self.reply = None
1080 self.reply = None
1081 self.captureoutput = False
1081 self.captureoutput = False
1082
1082
1083 @property
1083 @property
1084 def repo(self):
1084 def repo(self):
1085 raise RuntimeError('no repo access from stream interruption')
1085 raise error.ProgrammingError('no repo access from stream interruption')
1086
1086
1087 def gettransaction(self):
1087 def gettransaction(self):
1088 raise TransactionUnavailable('no repo access from stream interruption')
1088 raise TransactionUnavailable('no repo access from stream interruption')
1089
1089
1090 class unbundlepart(unpackermixin):
1090 class unbundlepart(unpackermixin):
1091 """a bundle part read from a bundle"""
1091 """a bundle part read from a bundle"""
1092
1092
1093 def __init__(self, ui, header, fp):
1093 def __init__(self, ui, header, fp):
1094 super(unbundlepart, self).__init__(fp)
1094 super(unbundlepart, self).__init__(fp)
1095 self.ui = ui
1095 self.ui = ui
1096 # unbundle state attr
1096 # unbundle state attr
1097 self._headerdata = header
1097 self._headerdata = header
1098 self._headeroffset = 0
1098 self._headeroffset = 0
1099 self._initialized = False
1099 self._initialized = False
1100 self.consumed = False
1100 self.consumed = False
1101 # part data
1101 # part data
1102 self.id = None
1102 self.id = None
1103 self.type = None
1103 self.type = None
1104 self.mandatoryparams = None
1104 self.mandatoryparams = None
1105 self.advisoryparams = None
1105 self.advisoryparams = None
1106 self.params = None
1106 self.params = None
1107 self.mandatorykeys = ()
1107 self.mandatorykeys = ()
1108 self._payloadstream = None
1108 self._payloadstream = None
1109 self._readheader()
1109 self._readheader()
1110 self._mandatory = None
1110 self._mandatory = None
1111 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1111 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1112 self._pos = 0
1112 self._pos = 0
1113
1113
1114 def _fromheader(self, size):
1114 def _fromheader(self, size):
1115 """return the next <size> byte from the header"""
1115 """return the next <size> byte from the header"""
1116 offset = self._headeroffset
1116 offset = self._headeroffset
1117 data = self._headerdata[offset:(offset + size)]
1117 data = self._headerdata[offset:(offset + size)]
1118 self._headeroffset = offset + size
1118 self._headeroffset = offset + size
1119 return data
1119 return data
1120
1120
1121 def _unpackheader(self, format):
1121 def _unpackheader(self, format):
1122 """read given format from header
1122 """read given format from header
1123
1123
1124 This automatically compute the size of the format to read."""
1124 This automatically compute the size of the format to read."""
1125 data = self._fromheader(struct.calcsize(format))
1125 data = self._fromheader(struct.calcsize(format))
1126 return _unpack(format, data)
1126 return _unpack(format, data)
1127
1127
1128 def _initparams(self, mandatoryparams, advisoryparams):
1128 def _initparams(self, mandatoryparams, advisoryparams):
1129 """internal function to setup all logic related parameters"""
1129 """internal function to setup all logic related parameters"""
1130 # make it read only to prevent people touching it by mistake.
1130 # make it read only to prevent people touching it by mistake.
1131 self.mandatoryparams = tuple(mandatoryparams)
1131 self.mandatoryparams = tuple(mandatoryparams)
1132 self.advisoryparams = tuple(advisoryparams)
1132 self.advisoryparams = tuple(advisoryparams)
1133 # user friendly UI
1133 # user friendly UI
1134 self.params = util.sortdict(self.mandatoryparams)
1134 self.params = util.sortdict(self.mandatoryparams)
1135 self.params.update(self.advisoryparams)
1135 self.params.update(self.advisoryparams)
1136 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1136 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1137
1137
1138 def _payloadchunks(self, chunknum=0):
1138 def _payloadchunks(self, chunknum=0):
1139 '''seek to specified chunk and start yielding data'''
1139 '''seek to specified chunk and start yielding data'''
1140 if len(self._chunkindex) == 0:
1140 if len(self._chunkindex) == 0:
1141 assert chunknum == 0, 'Must start with chunk 0'
1141 assert chunknum == 0, 'Must start with chunk 0'
1142 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1142 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1143 else:
1143 else:
1144 assert chunknum < len(self._chunkindex), \
1144 assert chunknum < len(self._chunkindex), \
1145 'Unknown chunk %d' % chunknum
1145 'Unknown chunk %d' % chunknum
1146 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1146 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1147
1147
1148 pos = self._chunkindex[chunknum][0]
1148 pos = self._chunkindex[chunknum][0]
1149 payloadsize = self._unpack(_fpayloadsize)[0]
1149 payloadsize = self._unpack(_fpayloadsize)[0]
1150 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1150 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1151 while payloadsize:
1151 while payloadsize:
1152 if payloadsize == flaginterrupt:
1152 if payloadsize == flaginterrupt:
1153 # interruption detection, the handler will now read a
1153 # interruption detection, the handler will now read a
1154 # single part and process it.
1154 # single part and process it.
1155 interrupthandler(self.ui, self._fp)()
1155 interrupthandler(self.ui, self._fp)()
1156 elif payloadsize < 0:
1156 elif payloadsize < 0:
1157 msg = 'negative payload chunk size: %i' % payloadsize
1157 msg = 'negative payload chunk size: %i' % payloadsize
1158 raise error.BundleValueError(msg)
1158 raise error.BundleValueError(msg)
1159 else:
1159 else:
1160 result = self._readexact(payloadsize)
1160 result = self._readexact(payloadsize)
1161 chunknum += 1
1161 chunknum += 1
1162 pos += payloadsize
1162 pos += payloadsize
1163 if chunknum == len(self._chunkindex):
1163 if chunknum == len(self._chunkindex):
1164 self._chunkindex.append((pos,
1164 self._chunkindex.append((pos,
1165 super(unbundlepart, self).tell()))
1165 super(unbundlepart, self).tell()))
1166 yield result
1166 yield result
1167 payloadsize = self._unpack(_fpayloadsize)[0]
1167 payloadsize = self._unpack(_fpayloadsize)[0]
1168 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1168 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1169
1169
1170 def _findchunk(self, pos):
1170 def _findchunk(self, pos):
1171 '''for a given payload position, return a chunk number and offset'''
1171 '''for a given payload position, return a chunk number and offset'''
1172 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1172 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1173 if ppos == pos:
1173 if ppos == pos:
1174 return chunk, 0
1174 return chunk, 0
1175 elif ppos > pos:
1175 elif ppos > pos:
1176 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1176 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1177 raise ValueError('Unknown chunk')
1177 raise ValueError('Unknown chunk')
1178
1178
1179 def _readheader(self):
1179 def _readheader(self):
1180 """read the header and setup the object"""
1180 """read the header and setup the object"""
1181 typesize = self._unpackheader(_fparttypesize)[0]
1181 typesize = self._unpackheader(_fparttypesize)[0]
1182 self.type = self._fromheader(typesize)
1182 self.type = self._fromheader(typesize)
1183 indebug(self.ui, 'part type: "%s"' % self.type)
1183 indebug(self.ui, 'part type: "%s"' % self.type)
1184 self.id = self._unpackheader(_fpartid)[0]
1184 self.id = self._unpackheader(_fpartid)[0]
1185 indebug(self.ui, 'part id: "%s"' % self.id)
1185 indebug(self.ui, 'part id: "%s"' % self.id)
1186 # extract mandatory bit from type
1186 # extract mandatory bit from type
1187 self.mandatory = (self.type != self.type.lower())
1187 self.mandatory = (self.type != self.type.lower())
1188 self.type = self.type.lower()
1188 self.type = self.type.lower()
1189 ## reading parameters
1189 ## reading parameters
1190 # param count
1190 # param count
1191 mancount, advcount = self._unpackheader(_fpartparamcount)
1191 mancount, advcount = self._unpackheader(_fpartparamcount)
1192 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1192 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1193 # param size
1193 # param size
1194 fparamsizes = _makefpartparamsizes(mancount + advcount)
1194 fparamsizes = _makefpartparamsizes(mancount + advcount)
1195 paramsizes = self._unpackheader(fparamsizes)
1195 paramsizes = self._unpackheader(fparamsizes)
1196 # make it a list of couple again
1196 # make it a list of couple again
1197 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1197 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1198 # split mandatory from advisory
1198 # split mandatory from advisory
1199 mansizes = paramsizes[:mancount]
1199 mansizes = paramsizes[:mancount]
1200 advsizes = paramsizes[mancount:]
1200 advsizes = paramsizes[mancount:]
1201 # retrieve param value
1201 # retrieve param value
1202 manparams = []
1202 manparams = []
1203 for key, value in mansizes:
1203 for key, value in mansizes:
1204 manparams.append((self._fromheader(key), self._fromheader(value)))
1204 manparams.append((self._fromheader(key), self._fromheader(value)))
1205 advparams = []
1205 advparams = []
1206 for key, value in advsizes:
1206 for key, value in advsizes:
1207 advparams.append((self._fromheader(key), self._fromheader(value)))
1207 advparams.append((self._fromheader(key), self._fromheader(value)))
1208 self._initparams(manparams, advparams)
1208 self._initparams(manparams, advparams)
1209 ## part payload
1209 ## part payload
1210 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1210 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1211 # we read the data, tell it
1211 # we read the data, tell it
1212 self._initialized = True
1212 self._initialized = True
1213
1213
1214 def read(self, size=None):
1214 def read(self, size=None):
1215 """read payload data"""
1215 """read payload data"""
1216 if not self._initialized:
1216 if not self._initialized:
1217 self._readheader()
1217 self._readheader()
1218 if size is None:
1218 if size is None:
1219 data = self._payloadstream.read()
1219 data = self._payloadstream.read()
1220 else:
1220 else:
1221 data = self._payloadstream.read(size)
1221 data = self._payloadstream.read(size)
1222 self._pos += len(data)
1222 self._pos += len(data)
1223 if size is None or len(data) < size:
1223 if size is None or len(data) < size:
1224 if not self.consumed and self._pos:
1224 if not self.consumed and self._pos:
1225 self.ui.debug('bundle2-input-part: total payload size %i\n'
1225 self.ui.debug('bundle2-input-part: total payload size %i\n'
1226 % self._pos)
1226 % self._pos)
1227 self.consumed = True
1227 self.consumed = True
1228 return data
1228 return data
1229
1229
1230 def tell(self):
1230 def tell(self):
1231 return self._pos
1231 return self._pos
1232
1232
1233 def seek(self, offset, whence=0):
1233 def seek(self, offset, whence=0):
1234 if whence == 0:
1234 if whence == 0:
1235 newpos = offset
1235 newpos = offset
1236 elif whence == 1:
1236 elif whence == 1:
1237 newpos = self._pos + offset
1237 newpos = self._pos + offset
1238 elif whence == 2:
1238 elif whence == 2:
1239 if not self.consumed:
1239 if not self.consumed:
1240 self.read()
1240 self.read()
1241 newpos = self._chunkindex[-1][0] - offset
1241 newpos = self._chunkindex[-1][0] - offset
1242 else:
1242 else:
1243 raise ValueError('Unknown whence value: %r' % (whence,))
1243 raise ValueError('Unknown whence value: %r' % (whence,))
1244
1244
1245 if newpos > self._chunkindex[-1][0] and not self.consumed:
1245 if newpos > self._chunkindex[-1][0] and not self.consumed:
1246 self.read()
1246 self.read()
1247 if not 0 <= newpos <= self._chunkindex[-1][0]:
1247 if not 0 <= newpos <= self._chunkindex[-1][0]:
1248 raise ValueError('Offset out of range')
1248 raise ValueError('Offset out of range')
1249
1249
1250 if self._pos != newpos:
1250 if self._pos != newpos:
1251 chunk, internaloffset = self._findchunk(newpos)
1251 chunk, internaloffset = self._findchunk(newpos)
1252 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1252 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1253 adjust = self.read(internaloffset)
1253 adjust = self.read(internaloffset)
1254 if len(adjust) != internaloffset:
1254 if len(adjust) != internaloffset:
1255 raise error.Abort(_('Seek failed\n'))
1255 raise error.Abort(_('Seek failed\n'))
1256 self._pos = newpos
1256 self._pos = newpos
1257
1257
1258 # These are only the static capabilities.
1258 # These are only the static capabilities.
1259 # Check the 'getrepocaps' function for the rest.
1259 # Check the 'getrepocaps' function for the rest.
1260 capabilities = {'HG20': (),
1260 capabilities = {'HG20': (),
1261 'error': ('abort', 'unsupportedcontent', 'pushraced',
1261 'error': ('abort', 'unsupportedcontent', 'pushraced',
1262 'pushkey'),
1262 'pushkey'),
1263 'listkeys': (),
1263 'listkeys': (),
1264 'pushkey': (),
1264 'pushkey': (),
1265 'digests': tuple(sorted(util.DIGESTS.keys())),
1265 'digests': tuple(sorted(util.DIGESTS.keys())),
1266 'remote-changegroup': ('http', 'https'),
1266 'remote-changegroup': ('http', 'https'),
1267 'hgtagsfnodes': (),
1267 'hgtagsfnodes': (),
1268 }
1268 }
1269
1269
1270 def getrepocaps(repo, allowpushback=False):
1270 def getrepocaps(repo, allowpushback=False):
1271 """return the bundle2 capabilities for a given repo
1271 """return the bundle2 capabilities for a given repo
1272
1272
1273 Exists to allow extensions (like evolution) to mutate the capabilities.
1273 Exists to allow extensions (like evolution) to mutate the capabilities.
1274 """
1274 """
1275 caps = capabilities.copy()
1275 caps = capabilities.copy()
1276 caps['changegroup'] = tuple(sorted(
1276 caps['changegroup'] = tuple(sorted(
1277 changegroup.supportedincomingversions(repo)))
1277 changegroup.supportedincomingversions(repo)))
1278 if obsolete.isenabled(repo, obsolete.exchangeopt):
1278 if obsolete.isenabled(repo, obsolete.exchangeopt):
1279 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1279 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1280 caps['obsmarkers'] = supportedformat
1280 caps['obsmarkers'] = supportedformat
1281 if allowpushback:
1281 if allowpushback:
1282 caps['pushback'] = ()
1282 caps['pushback'] = ()
1283 return caps
1283 return caps
1284
1284
1285 def bundle2caps(remote):
1285 def bundle2caps(remote):
1286 """return the bundle capabilities of a peer as dict"""
1286 """return the bundle capabilities of a peer as dict"""
1287 raw = remote.capable('bundle2')
1287 raw = remote.capable('bundle2')
1288 if not raw and raw != '':
1288 if not raw and raw != '':
1289 return {}
1289 return {}
1290 capsblob = urlreq.unquote(remote.capable('bundle2'))
1290 capsblob = urlreq.unquote(remote.capable('bundle2'))
1291 return decodecaps(capsblob)
1291 return decodecaps(capsblob)
1292
1292
1293 def obsmarkersversion(caps):
1293 def obsmarkersversion(caps):
1294 """extract the list of supported obsmarkers versions from a bundle2caps dict
1294 """extract the list of supported obsmarkers versions from a bundle2caps dict
1295 """
1295 """
1296 obscaps = caps.get('obsmarkers', ())
1296 obscaps = caps.get('obsmarkers', ())
1297 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1297 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1298
1298
1299 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1299 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1300 compopts=None):
1300 compopts=None):
1301 """Write a bundle file and return its filename.
1301 """Write a bundle file and return its filename.
1302
1302
1303 Existing files will not be overwritten.
1303 Existing files will not be overwritten.
1304 If no filename is specified, a temporary file is created.
1304 If no filename is specified, a temporary file is created.
1305 bz2 compression can be turned off.
1305 bz2 compression can be turned off.
1306 The bundle file will be deleted in case of errors.
1306 The bundle file will be deleted in case of errors.
1307 """
1307 """
1308
1308
1309 if bundletype == "HG20":
1309 if bundletype == "HG20":
1310 bundle = bundle20(ui)
1310 bundle = bundle20(ui)
1311 bundle.setcompression(compression, compopts)
1311 bundle.setcompression(compression, compopts)
1312 part = bundle.newpart('changegroup', data=cg.getchunks())
1312 part = bundle.newpart('changegroup', data=cg.getchunks())
1313 part.addparam('version', cg.version)
1313 part.addparam('version', cg.version)
1314 if 'clcount' in cg.extras:
1314 if 'clcount' in cg.extras:
1315 part.addparam('nbchanges', str(cg.extras['clcount']),
1315 part.addparam('nbchanges', str(cg.extras['clcount']),
1316 mandatory=False)
1316 mandatory=False)
1317 chunkiter = bundle.getchunks()
1317 chunkiter = bundle.getchunks()
1318 else:
1318 else:
1319 # compression argument is only for the bundle2 case
1319 # compression argument is only for the bundle2 case
1320 assert compression is None
1320 assert compression is None
1321 if cg.version != '01':
1321 if cg.version != '01':
1322 raise error.Abort(_('old bundle types only supports v1 '
1322 raise error.Abort(_('old bundle types only supports v1 '
1323 'changegroups'))
1323 'changegroups'))
1324 header, comp = bundletypes[bundletype]
1324 header, comp = bundletypes[bundletype]
1325 if comp not in util.compengines.supportedbundletypes:
1325 if comp not in util.compengines.supportedbundletypes:
1326 raise error.Abort(_('unknown stream compression type: %s')
1326 raise error.Abort(_('unknown stream compression type: %s')
1327 % comp)
1327 % comp)
1328 compengine = util.compengines.forbundletype(comp)
1328 compengine = util.compengines.forbundletype(comp)
1329 def chunkiter():
1329 def chunkiter():
1330 yield header
1330 yield header
1331 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1331 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1332 yield chunk
1332 yield chunk
1333 chunkiter = chunkiter()
1333 chunkiter = chunkiter()
1334
1334
1335 # parse the changegroup data, otherwise we will block
1335 # parse the changegroup data, otherwise we will block
1336 # in case of sshrepo because we don't know the end of the stream
1336 # in case of sshrepo because we don't know the end of the stream
1337 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1337 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1338
1338
1339 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1339 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1340 def handlechangegroup(op, inpart):
1340 def handlechangegroup(op, inpart):
1341 """apply a changegroup part on the repo
1341 """apply a changegroup part on the repo
1342
1342
1343 This is a very early implementation that will massive rework before being
1343 This is a very early implementation that will massive rework before being
1344 inflicted to any end-user.
1344 inflicted to any end-user.
1345 """
1345 """
1346 # Make sure we trigger a transaction creation
1346 # Make sure we trigger a transaction creation
1347 #
1347 #
1348 # The addchangegroup function will get a transaction object by itself, but
1348 # The addchangegroup function will get a transaction object by itself, but
1349 # we need to make sure we trigger the creation of a transaction object used
1349 # we need to make sure we trigger the creation of a transaction object used
1350 # for the whole processing scope.
1350 # for the whole processing scope.
1351 op.gettransaction()
1351 op.gettransaction()
1352 unpackerversion = inpart.params.get('version', '01')
1352 unpackerversion = inpart.params.get('version', '01')
1353 # We should raise an appropriate exception here
1353 # We should raise an appropriate exception here
1354 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1354 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1355 # the source and url passed here are overwritten by the one contained in
1355 # the source and url passed here are overwritten by the one contained in
1356 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1356 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1357 nbchangesets = None
1357 nbchangesets = None
1358 if 'nbchanges' in inpart.params:
1358 if 'nbchanges' in inpart.params:
1359 nbchangesets = int(inpart.params.get('nbchanges'))
1359 nbchangesets = int(inpart.params.get('nbchanges'))
1360 if ('treemanifest' in inpart.params and
1360 if ('treemanifest' in inpart.params and
1361 'treemanifest' not in op.repo.requirements):
1361 'treemanifest' not in op.repo.requirements):
1362 if len(op.repo.changelog) != 0:
1362 if len(op.repo.changelog) != 0:
1363 raise error.Abort(_(
1363 raise error.Abort(_(
1364 "bundle contains tree manifests, but local repo is "
1364 "bundle contains tree manifests, but local repo is "
1365 "non-empty and does not use tree manifests"))
1365 "non-empty and does not use tree manifests"))
1366 op.repo.requirements.add('treemanifest')
1366 op.repo.requirements.add('treemanifest')
1367 op.repo._applyopenerreqs()
1367 op.repo._applyopenerreqs()
1368 op.repo._writerequirements()
1368 op.repo._writerequirements()
1369 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1369 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1370 op.records.add('changegroup', {'return': ret})
1370 op.records.add('changegroup', {'return': ret})
1371 if op.reply is not None:
1371 if op.reply is not None:
1372 # This is definitely not the final form of this
1372 # This is definitely not the final form of this
1373 # return. But one need to start somewhere.
1373 # return. But one need to start somewhere.
1374 part = op.reply.newpart('reply:changegroup', mandatory=False)
1374 part = op.reply.newpart('reply:changegroup', mandatory=False)
1375 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1375 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1376 part.addparam('return', '%i' % ret, mandatory=False)
1376 part.addparam('return', '%i' % ret, mandatory=False)
1377 assert not inpart.read()
1377 assert not inpart.read()
1378
1378
1379 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1379 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1380 ['digest:%s' % k for k in util.DIGESTS.keys()])
1380 ['digest:%s' % k for k in util.DIGESTS.keys()])
1381 @parthandler('remote-changegroup', _remotechangegroupparams)
1381 @parthandler('remote-changegroup', _remotechangegroupparams)
1382 def handleremotechangegroup(op, inpart):
1382 def handleremotechangegroup(op, inpart):
1383 """apply a bundle10 on the repo, given an url and validation information
1383 """apply a bundle10 on the repo, given an url and validation information
1384
1384
1385 All the information about the remote bundle to import are given as
1385 All the information about the remote bundle to import are given as
1386 parameters. The parameters include:
1386 parameters. The parameters include:
1387 - url: the url to the bundle10.
1387 - url: the url to the bundle10.
1388 - size: the bundle10 file size. It is used to validate what was
1388 - size: the bundle10 file size. It is used to validate what was
1389 retrieved by the client matches the server knowledge about the bundle.
1389 retrieved by the client matches the server knowledge about the bundle.
1390 - digests: a space separated list of the digest types provided as
1390 - digests: a space separated list of the digest types provided as
1391 parameters.
1391 parameters.
1392 - digest:<digest-type>: the hexadecimal representation of the digest with
1392 - digest:<digest-type>: the hexadecimal representation of the digest with
1393 that name. Like the size, it is used to validate what was retrieved by
1393 that name. Like the size, it is used to validate what was retrieved by
1394 the client matches what the server knows about the bundle.
1394 the client matches what the server knows about the bundle.
1395
1395
1396 When multiple digest types are given, all of them are checked.
1396 When multiple digest types are given, all of them are checked.
1397 """
1397 """
1398 try:
1398 try:
1399 raw_url = inpart.params['url']
1399 raw_url = inpart.params['url']
1400 except KeyError:
1400 except KeyError:
1401 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1401 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1402 parsed_url = util.url(raw_url)
1402 parsed_url = util.url(raw_url)
1403 if parsed_url.scheme not in capabilities['remote-changegroup']:
1403 if parsed_url.scheme not in capabilities['remote-changegroup']:
1404 raise error.Abort(_('remote-changegroup does not support %s urls') %
1404 raise error.Abort(_('remote-changegroup does not support %s urls') %
1405 parsed_url.scheme)
1405 parsed_url.scheme)
1406
1406
1407 try:
1407 try:
1408 size = int(inpart.params['size'])
1408 size = int(inpart.params['size'])
1409 except ValueError:
1409 except ValueError:
1410 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1410 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1411 % 'size')
1411 % 'size')
1412 except KeyError:
1412 except KeyError:
1413 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1413 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1414
1414
1415 digests = {}
1415 digests = {}
1416 for typ in inpart.params.get('digests', '').split():
1416 for typ in inpart.params.get('digests', '').split():
1417 param = 'digest:%s' % typ
1417 param = 'digest:%s' % typ
1418 try:
1418 try:
1419 value = inpart.params[param]
1419 value = inpart.params[param]
1420 except KeyError:
1420 except KeyError:
1421 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1421 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1422 param)
1422 param)
1423 digests[typ] = value
1423 digests[typ] = value
1424
1424
1425 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1425 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1426
1426
1427 # Make sure we trigger a transaction creation
1427 # Make sure we trigger a transaction creation
1428 #
1428 #
1429 # The addchangegroup function will get a transaction object by itself, but
1429 # The addchangegroup function will get a transaction object by itself, but
1430 # we need to make sure we trigger the creation of a transaction object used
1430 # we need to make sure we trigger the creation of a transaction object used
1431 # for the whole processing scope.
1431 # for the whole processing scope.
1432 op.gettransaction()
1432 op.gettransaction()
1433 from . import exchange
1433 from . import exchange
1434 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1434 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1435 if not isinstance(cg, changegroup.cg1unpacker):
1435 if not isinstance(cg, changegroup.cg1unpacker):
1436 raise error.Abort(_('%s: not a bundle version 1.0') %
1436 raise error.Abort(_('%s: not a bundle version 1.0') %
1437 util.hidepassword(raw_url))
1437 util.hidepassword(raw_url))
1438 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1438 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1439 op.records.add('changegroup', {'return': ret})
1439 op.records.add('changegroup', {'return': ret})
1440 if op.reply is not None:
1440 if op.reply is not None:
1441 # This is definitely not the final form of this
1441 # This is definitely not the final form of this
1442 # return. But one need to start somewhere.
1442 # return. But one need to start somewhere.
1443 part = op.reply.newpart('reply:changegroup')
1443 part = op.reply.newpart('reply:changegroup')
1444 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1444 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1445 part.addparam('return', '%i' % ret, mandatory=False)
1445 part.addparam('return', '%i' % ret, mandatory=False)
1446 try:
1446 try:
1447 real_part.validate()
1447 real_part.validate()
1448 except error.Abort as e:
1448 except error.Abort as e:
1449 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1449 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1450 (util.hidepassword(raw_url), str(e)))
1450 (util.hidepassword(raw_url), str(e)))
1451 assert not inpart.read()
1451 assert not inpart.read()
1452
1452
1453 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1453 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1454 def handlereplychangegroup(op, inpart):
1454 def handlereplychangegroup(op, inpart):
1455 ret = int(inpart.params['return'])
1455 ret = int(inpart.params['return'])
1456 replyto = int(inpart.params['in-reply-to'])
1456 replyto = int(inpart.params['in-reply-to'])
1457 op.records.add('changegroup', {'return': ret}, replyto)
1457 op.records.add('changegroup', {'return': ret}, replyto)
1458
1458
1459 @parthandler('check:heads')
1459 @parthandler('check:heads')
1460 def handlecheckheads(op, inpart):
1460 def handlecheckheads(op, inpart):
1461 """check that head of the repo did not change
1461 """check that head of the repo did not change
1462
1462
1463 This is used to detect a push race when using unbundle.
1463 This is used to detect a push race when using unbundle.
1464 This replaces the "heads" argument of unbundle."""
1464 This replaces the "heads" argument of unbundle."""
1465 h = inpart.read(20)
1465 h = inpart.read(20)
1466 heads = []
1466 heads = []
1467 while len(h) == 20:
1467 while len(h) == 20:
1468 heads.append(h)
1468 heads.append(h)
1469 h = inpart.read(20)
1469 h = inpart.read(20)
1470 assert not h
1470 assert not h
1471 # Trigger a transaction so that we are guaranteed to have the lock now.
1471 # Trigger a transaction so that we are guaranteed to have the lock now.
1472 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1472 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1473 op.gettransaction()
1473 op.gettransaction()
1474 if sorted(heads) != sorted(op.repo.heads()):
1474 if sorted(heads) != sorted(op.repo.heads()):
1475 raise error.PushRaced('repository changed while pushing - '
1475 raise error.PushRaced('repository changed while pushing - '
1476 'please try again')
1476 'please try again')
1477
1477
1478 @parthandler('output')
1478 @parthandler('output')
1479 def handleoutput(op, inpart):
1479 def handleoutput(op, inpart):
1480 """forward output captured on the server to the client"""
1480 """forward output captured on the server to the client"""
1481 for line in inpart.read().splitlines():
1481 for line in inpart.read().splitlines():
1482 op.ui.status(_('remote: %s\n') % line)
1482 op.ui.status(_('remote: %s\n') % line)
1483
1483
1484 @parthandler('replycaps')
1484 @parthandler('replycaps')
1485 def handlereplycaps(op, inpart):
1485 def handlereplycaps(op, inpart):
1486 """Notify that a reply bundle should be created
1486 """Notify that a reply bundle should be created
1487
1487
1488 The payload contains the capabilities information for the reply"""
1488 The payload contains the capabilities information for the reply"""
1489 caps = decodecaps(inpart.read())
1489 caps = decodecaps(inpart.read())
1490 if op.reply is None:
1490 if op.reply is None:
1491 op.reply = bundle20(op.ui, caps)
1491 op.reply = bundle20(op.ui, caps)
1492
1492
1493 class AbortFromPart(error.Abort):
1493 class AbortFromPart(error.Abort):
1494 """Sub-class of Abort that denotes an error from a bundle2 part."""
1494 """Sub-class of Abort that denotes an error from a bundle2 part."""
1495
1495
1496 @parthandler('error:abort', ('message', 'hint'))
1496 @parthandler('error:abort', ('message', 'hint'))
1497 def handleerrorabort(op, inpart):
1497 def handleerrorabort(op, inpart):
1498 """Used to transmit abort error over the wire"""
1498 """Used to transmit abort error over the wire"""
1499 raise AbortFromPart(inpart.params['message'],
1499 raise AbortFromPart(inpart.params['message'],
1500 hint=inpart.params.get('hint'))
1500 hint=inpart.params.get('hint'))
1501
1501
1502 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1502 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1503 'in-reply-to'))
1503 'in-reply-to'))
1504 def handleerrorpushkey(op, inpart):
1504 def handleerrorpushkey(op, inpart):
1505 """Used to transmit failure of a mandatory pushkey over the wire"""
1505 """Used to transmit failure of a mandatory pushkey over the wire"""
1506 kwargs = {}
1506 kwargs = {}
1507 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1507 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1508 value = inpart.params.get(name)
1508 value = inpart.params.get(name)
1509 if value is not None:
1509 if value is not None:
1510 kwargs[name] = value
1510 kwargs[name] = value
1511 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1511 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1512
1512
1513 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1513 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1514 def handleerrorunsupportedcontent(op, inpart):
1514 def handleerrorunsupportedcontent(op, inpart):
1515 """Used to transmit unknown content error over the wire"""
1515 """Used to transmit unknown content error over the wire"""
1516 kwargs = {}
1516 kwargs = {}
1517 parttype = inpart.params.get('parttype')
1517 parttype = inpart.params.get('parttype')
1518 if parttype is not None:
1518 if parttype is not None:
1519 kwargs['parttype'] = parttype
1519 kwargs['parttype'] = parttype
1520 params = inpart.params.get('params')
1520 params = inpart.params.get('params')
1521 if params is not None:
1521 if params is not None:
1522 kwargs['params'] = params.split('\0')
1522 kwargs['params'] = params.split('\0')
1523
1523
1524 raise error.BundleUnknownFeatureError(**kwargs)
1524 raise error.BundleUnknownFeatureError(**kwargs)
1525
1525
1526 @parthandler('error:pushraced', ('message',))
1526 @parthandler('error:pushraced', ('message',))
1527 def handleerrorpushraced(op, inpart):
1527 def handleerrorpushraced(op, inpart):
1528 """Used to transmit push race error over the wire"""
1528 """Used to transmit push race error over the wire"""
1529 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1529 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1530
1530
1531 @parthandler('listkeys', ('namespace',))
1531 @parthandler('listkeys', ('namespace',))
1532 def handlelistkeys(op, inpart):
1532 def handlelistkeys(op, inpart):
1533 """retrieve pushkey namespace content stored in a bundle2"""
1533 """retrieve pushkey namespace content stored in a bundle2"""
1534 namespace = inpart.params['namespace']
1534 namespace = inpart.params['namespace']
1535 r = pushkey.decodekeys(inpart.read())
1535 r = pushkey.decodekeys(inpart.read())
1536 op.records.add('listkeys', (namespace, r))
1536 op.records.add('listkeys', (namespace, r))
1537
1537
1538 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1538 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1539 def handlepushkey(op, inpart):
1539 def handlepushkey(op, inpart):
1540 """process a pushkey request"""
1540 """process a pushkey request"""
1541 dec = pushkey.decode
1541 dec = pushkey.decode
1542 namespace = dec(inpart.params['namespace'])
1542 namespace = dec(inpart.params['namespace'])
1543 key = dec(inpart.params['key'])
1543 key = dec(inpart.params['key'])
1544 old = dec(inpart.params['old'])
1544 old = dec(inpart.params['old'])
1545 new = dec(inpart.params['new'])
1545 new = dec(inpart.params['new'])
1546 # Grab the transaction to ensure that we have the lock before performing the
1546 # Grab the transaction to ensure that we have the lock before performing the
1547 # pushkey.
1547 # pushkey.
1548 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1548 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1549 op.gettransaction()
1549 op.gettransaction()
1550 ret = op.repo.pushkey(namespace, key, old, new)
1550 ret = op.repo.pushkey(namespace, key, old, new)
1551 record = {'namespace': namespace,
1551 record = {'namespace': namespace,
1552 'key': key,
1552 'key': key,
1553 'old': old,
1553 'old': old,
1554 'new': new}
1554 'new': new}
1555 op.records.add('pushkey', record)
1555 op.records.add('pushkey', record)
1556 if op.reply is not None:
1556 if op.reply is not None:
1557 rpart = op.reply.newpart('reply:pushkey')
1557 rpart = op.reply.newpart('reply:pushkey')
1558 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1558 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1559 rpart.addparam('return', '%i' % ret, mandatory=False)
1559 rpart.addparam('return', '%i' % ret, mandatory=False)
1560 if inpart.mandatory and not ret:
1560 if inpart.mandatory and not ret:
1561 kwargs = {}
1561 kwargs = {}
1562 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1562 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1563 if key in inpart.params:
1563 if key in inpart.params:
1564 kwargs[key] = inpart.params[key]
1564 kwargs[key] = inpart.params[key]
1565 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1565 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1566
1566
1567 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1567 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1568 def handlepushkeyreply(op, inpart):
1568 def handlepushkeyreply(op, inpart):
1569 """retrieve the result of a pushkey request"""
1569 """retrieve the result of a pushkey request"""
1570 ret = int(inpart.params['return'])
1570 ret = int(inpart.params['return'])
1571 partid = int(inpart.params['in-reply-to'])
1571 partid = int(inpart.params['in-reply-to'])
1572 op.records.add('pushkey', {'return': ret}, partid)
1572 op.records.add('pushkey', {'return': ret}, partid)
1573
1573
1574 @parthandler('obsmarkers')
1574 @parthandler('obsmarkers')
1575 def handleobsmarker(op, inpart):
1575 def handleobsmarker(op, inpart):
1576 """add a stream of obsmarkers to the repo"""
1576 """add a stream of obsmarkers to the repo"""
1577 tr = op.gettransaction()
1577 tr = op.gettransaction()
1578 markerdata = inpart.read()
1578 markerdata = inpart.read()
1579 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1579 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1580 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1580 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1581 % len(markerdata))
1581 % len(markerdata))
1582 # The mergemarkers call will crash if marker creation is not enabled.
1582 # The mergemarkers call will crash if marker creation is not enabled.
1583 # we want to avoid this if the part is advisory.
1583 # we want to avoid this if the part is advisory.
1584 if not inpart.mandatory and op.repo.obsstore.readonly:
1584 if not inpart.mandatory and op.repo.obsstore.readonly:
1585 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1585 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1586 return
1586 return
1587 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1587 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1588 if new:
1588 if new:
1589 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1589 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1590 op.records.add('obsmarkers', {'new': new})
1590 op.records.add('obsmarkers', {'new': new})
1591 if op.reply is not None:
1591 if op.reply is not None:
1592 rpart = op.reply.newpart('reply:obsmarkers')
1592 rpart = op.reply.newpart('reply:obsmarkers')
1593 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1593 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1594 rpart.addparam('new', '%i' % new, mandatory=False)
1594 rpart.addparam('new', '%i' % new, mandatory=False)
1595
1595
1596
1596
1597 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1597 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1598 def handleobsmarkerreply(op, inpart):
1598 def handleobsmarkerreply(op, inpart):
1599 """retrieve the result of a pushkey request"""
1599 """retrieve the result of a pushkey request"""
1600 ret = int(inpart.params['new'])
1600 ret = int(inpart.params['new'])
1601 partid = int(inpart.params['in-reply-to'])
1601 partid = int(inpart.params['in-reply-to'])
1602 op.records.add('obsmarkers', {'new': ret}, partid)
1602 op.records.add('obsmarkers', {'new': ret}, partid)
1603
1603
1604 @parthandler('hgtagsfnodes')
1604 @parthandler('hgtagsfnodes')
1605 def handlehgtagsfnodes(op, inpart):
1605 def handlehgtagsfnodes(op, inpart):
1606 """Applies .hgtags fnodes cache entries to the local repo.
1606 """Applies .hgtags fnodes cache entries to the local repo.
1607
1607
1608 Payload is pairs of 20 byte changeset nodes and filenodes.
1608 Payload is pairs of 20 byte changeset nodes and filenodes.
1609 """
1609 """
1610 # Grab the transaction so we ensure that we have the lock at this point.
1610 # Grab the transaction so we ensure that we have the lock at this point.
1611 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1611 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1612 op.gettransaction()
1612 op.gettransaction()
1613 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1613 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1614
1614
1615 count = 0
1615 count = 0
1616 while True:
1616 while True:
1617 node = inpart.read(20)
1617 node = inpart.read(20)
1618 fnode = inpart.read(20)
1618 fnode = inpart.read(20)
1619 if len(node) < 20 or len(fnode) < 20:
1619 if len(node) < 20 or len(fnode) < 20:
1620 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1620 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1621 break
1621 break
1622 cache.setfnode(node, fnode)
1622 cache.setfnode(node, fnode)
1623 count += 1
1623 count += 1
1624
1624
1625 cache.write()
1625 cache.write()
1626 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1626 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now