##// END OF EJS Templates
bundle2: fix assertion that 'compression' hasn't been set...
Siddharth Agarwal -
r30915:aa25989b stable
parent child Browse files
Show More
@@ -1,1622 +1,1622 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 pushkey,
161 pushkey,
162 pycompat,
162 pycompat,
163 tags,
163 tags,
164 url,
164 url,
165 util,
165 util,
166 )
166 )
167
167
168 urlerr = util.urlerr
168 urlerr = util.urlerr
169 urlreq = util.urlreq
169 urlreq = util.urlreq
170
170
171 _pack = struct.pack
171 _pack = struct.pack
172 _unpack = struct.unpack
172 _unpack = struct.unpack
173
173
174 _fstreamparamsize = '>i'
174 _fstreamparamsize = '>i'
175 _fpartheadersize = '>i'
175 _fpartheadersize = '>i'
176 _fparttypesize = '>B'
176 _fparttypesize = '>B'
177 _fpartid = '>I'
177 _fpartid = '>I'
178 _fpayloadsize = '>i'
178 _fpayloadsize = '>i'
179 _fpartparamcount = '>BB'
179 _fpartparamcount = '>BB'
180
180
181 preferedchunksize = 4096
181 preferedchunksize = 4096
182
182
183 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
183 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
184
184
185 def outdebug(ui, message):
185 def outdebug(ui, message):
186 """debug regarding output stream (bundling)"""
186 """debug regarding output stream (bundling)"""
187 if ui.configbool('devel', 'bundle2.debug', False):
187 if ui.configbool('devel', 'bundle2.debug', False):
188 ui.debug('bundle2-output: %s\n' % message)
188 ui.debug('bundle2-output: %s\n' % message)
189
189
190 def indebug(ui, message):
190 def indebug(ui, message):
191 """debug on input stream (unbundling)"""
191 """debug on input stream (unbundling)"""
192 if ui.configbool('devel', 'bundle2.debug', False):
192 if ui.configbool('devel', 'bundle2.debug', False):
193 ui.debug('bundle2-input: %s\n' % message)
193 ui.debug('bundle2-input: %s\n' % message)
194
194
195 def validateparttype(parttype):
195 def validateparttype(parttype):
196 """raise ValueError if a parttype contains invalid character"""
196 """raise ValueError if a parttype contains invalid character"""
197 if _parttypeforbidden.search(parttype):
197 if _parttypeforbidden.search(parttype):
198 raise ValueError(parttype)
198 raise ValueError(parttype)
199
199
200 def _makefpartparamsizes(nbparams):
200 def _makefpartparamsizes(nbparams):
201 """return a struct format to read part parameter sizes
201 """return a struct format to read part parameter sizes
202
202
203 The number parameters is variable so we need to build that format
203 The number parameters is variable so we need to build that format
204 dynamically.
204 dynamically.
205 """
205 """
206 return '>'+('BB'*nbparams)
206 return '>'+('BB'*nbparams)
207
207
208 parthandlermapping = {}
208 parthandlermapping = {}
209
209
210 def parthandler(parttype, params=()):
210 def parthandler(parttype, params=()):
211 """decorator that register a function as a bundle2 part handler
211 """decorator that register a function as a bundle2 part handler
212
212
213 eg::
213 eg::
214
214
215 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
215 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
216 def myparttypehandler(...):
216 def myparttypehandler(...):
217 '''process a part of type "my part".'''
217 '''process a part of type "my part".'''
218 ...
218 ...
219 """
219 """
220 validateparttype(parttype)
220 validateparttype(parttype)
221 def _decorator(func):
221 def _decorator(func):
222 lparttype = parttype.lower() # enforce lower case matching.
222 lparttype = parttype.lower() # enforce lower case matching.
223 assert lparttype not in parthandlermapping
223 assert lparttype not in parthandlermapping
224 parthandlermapping[lparttype] = func
224 parthandlermapping[lparttype] = func
225 func.params = frozenset(params)
225 func.params = frozenset(params)
226 return func
226 return func
227 return _decorator
227 return _decorator
228
228
229 class unbundlerecords(object):
229 class unbundlerecords(object):
230 """keep record of what happens during and unbundle
230 """keep record of what happens during and unbundle
231
231
232 New records are added using `records.add('cat', obj)`. Where 'cat' is a
232 New records are added using `records.add('cat', obj)`. Where 'cat' is a
233 category of record and obj is an arbitrary object.
233 category of record and obj is an arbitrary object.
234
234
235 `records['cat']` will return all entries of this category 'cat'.
235 `records['cat']` will return all entries of this category 'cat'.
236
236
237 Iterating on the object itself will yield `('category', obj)` tuples
237 Iterating on the object itself will yield `('category', obj)` tuples
238 for all entries.
238 for all entries.
239
239
240 All iterations happens in chronological order.
240 All iterations happens in chronological order.
241 """
241 """
242
242
243 def __init__(self):
243 def __init__(self):
244 self._categories = {}
244 self._categories = {}
245 self._sequences = []
245 self._sequences = []
246 self._replies = {}
246 self._replies = {}
247
247
248 def add(self, category, entry, inreplyto=None):
248 def add(self, category, entry, inreplyto=None):
249 """add a new record of a given category.
249 """add a new record of a given category.
250
250
251 The entry can then be retrieved in the list returned by
251 The entry can then be retrieved in the list returned by
252 self['category']."""
252 self['category']."""
253 self._categories.setdefault(category, []).append(entry)
253 self._categories.setdefault(category, []).append(entry)
254 self._sequences.append((category, entry))
254 self._sequences.append((category, entry))
255 if inreplyto is not None:
255 if inreplyto is not None:
256 self.getreplies(inreplyto).add(category, entry)
256 self.getreplies(inreplyto).add(category, entry)
257
257
258 def getreplies(self, partid):
258 def getreplies(self, partid):
259 """get the records that are replies to a specific part"""
259 """get the records that are replies to a specific part"""
260 return self._replies.setdefault(partid, unbundlerecords())
260 return self._replies.setdefault(partid, unbundlerecords())
261
261
262 def __getitem__(self, cat):
262 def __getitem__(self, cat):
263 return tuple(self._categories.get(cat, ()))
263 return tuple(self._categories.get(cat, ()))
264
264
265 def __iter__(self):
265 def __iter__(self):
266 return iter(self._sequences)
266 return iter(self._sequences)
267
267
268 def __len__(self):
268 def __len__(self):
269 return len(self._sequences)
269 return len(self._sequences)
270
270
271 def __nonzero__(self):
271 def __nonzero__(self):
272 return bool(self._sequences)
272 return bool(self._sequences)
273
273
274 class bundleoperation(object):
274 class bundleoperation(object):
275 """an object that represents a single bundling process
275 """an object that represents a single bundling process
276
276
277 Its purpose is to carry unbundle-related objects and states.
277 Its purpose is to carry unbundle-related objects and states.
278
278
279 A new object should be created at the beginning of each bundle processing.
279 A new object should be created at the beginning of each bundle processing.
280 The object is to be returned by the processing function.
280 The object is to be returned by the processing function.
281
281
282 The object has very little content now it will ultimately contain:
282 The object has very little content now it will ultimately contain:
283 * an access to the repo the bundle is applied to,
283 * an access to the repo the bundle is applied to,
284 * a ui object,
284 * a ui object,
285 * a way to retrieve a transaction to add changes to the repo,
285 * a way to retrieve a transaction to add changes to the repo,
286 * a way to record the result of processing each part,
286 * a way to record the result of processing each part,
287 * a way to construct a bundle response when applicable.
287 * a way to construct a bundle response when applicable.
288 """
288 """
289
289
290 def __init__(self, repo, transactiongetter, captureoutput=True):
290 def __init__(self, repo, transactiongetter, captureoutput=True):
291 self.repo = repo
291 self.repo = repo
292 self.ui = repo.ui
292 self.ui = repo.ui
293 self.records = unbundlerecords()
293 self.records = unbundlerecords()
294 self.gettransaction = transactiongetter
294 self.gettransaction = transactiongetter
295 self.reply = None
295 self.reply = None
296 self.captureoutput = captureoutput
296 self.captureoutput = captureoutput
297
297
298 class TransactionUnavailable(RuntimeError):
298 class TransactionUnavailable(RuntimeError):
299 pass
299 pass
300
300
301 def _notransaction():
301 def _notransaction():
302 """default method to get a transaction while processing a bundle
302 """default method to get a transaction while processing a bundle
303
303
304 Raise an exception to highlight the fact that no transaction was expected
304 Raise an exception to highlight the fact that no transaction was expected
305 to be created"""
305 to be created"""
306 raise TransactionUnavailable()
306 raise TransactionUnavailable()
307
307
308 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
308 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
309 # transform me into unbundler.apply() as soon as the freeze is lifted
309 # transform me into unbundler.apply() as soon as the freeze is lifted
310 tr.hookargs['bundle2'] = '1'
310 tr.hookargs['bundle2'] = '1'
311 if source is not None and 'source' not in tr.hookargs:
311 if source is not None and 'source' not in tr.hookargs:
312 tr.hookargs['source'] = source
312 tr.hookargs['source'] = source
313 if url is not None and 'url' not in tr.hookargs:
313 if url is not None and 'url' not in tr.hookargs:
314 tr.hookargs['url'] = url
314 tr.hookargs['url'] = url
315 return processbundle(repo, unbundler, lambda: tr, op=op)
315 return processbundle(repo, unbundler, lambda: tr, op=op)
316
316
317 def processbundle(repo, unbundler, transactiongetter=None, op=None):
317 def processbundle(repo, unbundler, transactiongetter=None, op=None):
318 """This function process a bundle, apply effect to/from a repo
318 """This function process a bundle, apply effect to/from a repo
319
319
320 It iterates over each part then searches for and uses the proper handling
320 It iterates over each part then searches for and uses the proper handling
321 code to process the part. Parts are processed in order.
321 code to process the part. Parts are processed in order.
322
322
323 This is very early version of this function that will be strongly reworked
323 This is very early version of this function that will be strongly reworked
324 before final usage.
324 before final usage.
325
325
326 Unknown Mandatory part will abort the process.
326 Unknown Mandatory part will abort the process.
327
327
328 It is temporarily possible to provide a prebuilt bundleoperation to the
328 It is temporarily possible to provide a prebuilt bundleoperation to the
329 function. This is used to ensure output is properly propagated in case of
329 function. This is used to ensure output is properly propagated in case of
330 an error during the unbundling. This output capturing part will likely be
330 an error during the unbundling. This output capturing part will likely be
331 reworked and this ability will probably go away in the process.
331 reworked and this ability will probably go away in the process.
332 """
332 """
333 if op is None:
333 if op is None:
334 if transactiongetter is None:
334 if transactiongetter is None:
335 transactiongetter = _notransaction
335 transactiongetter = _notransaction
336 op = bundleoperation(repo, transactiongetter)
336 op = bundleoperation(repo, transactiongetter)
337 # todo:
337 # todo:
338 # - replace this is a init function soon.
338 # - replace this is a init function soon.
339 # - exception catching
339 # - exception catching
340 unbundler.params
340 unbundler.params
341 if repo.ui.debugflag:
341 if repo.ui.debugflag:
342 msg = ['bundle2-input-bundle:']
342 msg = ['bundle2-input-bundle:']
343 if unbundler.params:
343 if unbundler.params:
344 msg.append(' %i params')
344 msg.append(' %i params')
345 if op.gettransaction is None:
345 if op.gettransaction is None:
346 msg.append(' no-transaction')
346 msg.append(' no-transaction')
347 else:
347 else:
348 msg.append(' with-transaction')
348 msg.append(' with-transaction')
349 msg.append('\n')
349 msg.append('\n')
350 repo.ui.debug(''.join(msg))
350 repo.ui.debug(''.join(msg))
351 iterparts = enumerate(unbundler.iterparts())
351 iterparts = enumerate(unbundler.iterparts())
352 part = None
352 part = None
353 nbpart = 0
353 nbpart = 0
354 try:
354 try:
355 for nbpart, part in iterparts:
355 for nbpart, part in iterparts:
356 _processpart(op, part)
356 _processpart(op, part)
357 except Exception as exc:
357 except Exception as exc:
358 for nbpart, part in iterparts:
358 for nbpart, part in iterparts:
359 # consume the bundle content
359 # consume the bundle content
360 part.seek(0, 2)
360 part.seek(0, 2)
361 # Small hack to let caller code distinguish exceptions from bundle2
361 # Small hack to let caller code distinguish exceptions from bundle2
362 # processing from processing the old format. This is mostly
362 # processing from processing the old format. This is mostly
363 # needed to handle different return codes to unbundle according to the
363 # needed to handle different return codes to unbundle according to the
364 # type of bundle. We should probably clean up or drop this return code
364 # type of bundle. We should probably clean up or drop this return code
365 # craziness in a future version.
365 # craziness in a future version.
366 exc.duringunbundle2 = True
366 exc.duringunbundle2 = True
367 salvaged = []
367 salvaged = []
368 replycaps = None
368 replycaps = None
369 if op.reply is not None:
369 if op.reply is not None:
370 salvaged = op.reply.salvageoutput()
370 salvaged = op.reply.salvageoutput()
371 replycaps = op.reply.capabilities
371 replycaps = op.reply.capabilities
372 exc._replycaps = replycaps
372 exc._replycaps = replycaps
373 exc._bundle2salvagedoutput = salvaged
373 exc._bundle2salvagedoutput = salvaged
374 raise
374 raise
375 finally:
375 finally:
376 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
376 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
377
377
378 return op
378 return op
379
379
380 def _processpart(op, part):
380 def _processpart(op, part):
381 """process a single part from a bundle
381 """process a single part from a bundle
382
382
383 The part is guaranteed to have been fully consumed when the function exits
383 The part is guaranteed to have been fully consumed when the function exits
384 (even if an exception is raised)."""
384 (even if an exception is raised)."""
385 status = 'unknown' # used by debug output
385 status = 'unknown' # used by debug output
386 hardabort = False
386 hardabort = False
387 try:
387 try:
388 try:
388 try:
389 handler = parthandlermapping.get(part.type)
389 handler = parthandlermapping.get(part.type)
390 if handler is None:
390 if handler is None:
391 status = 'unsupported-type'
391 status = 'unsupported-type'
392 raise error.BundleUnknownFeatureError(parttype=part.type)
392 raise error.BundleUnknownFeatureError(parttype=part.type)
393 indebug(op.ui, 'found a handler for part %r' % part.type)
393 indebug(op.ui, 'found a handler for part %r' % part.type)
394 unknownparams = part.mandatorykeys - handler.params
394 unknownparams = part.mandatorykeys - handler.params
395 if unknownparams:
395 if unknownparams:
396 unknownparams = list(unknownparams)
396 unknownparams = list(unknownparams)
397 unknownparams.sort()
397 unknownparams.sort()
398 status = 'unsupported-params (%s)' % unknownparams
398 status = 'unsupported-params (%s)' % unknownparams
399 raise error.BundleUnknownFeatureError(parttype=part.type,
399 raise error.BundleUnknownFeatureError(parttype=part.type,
400 params=unknownparams)
400 params=unknownparams)
401 status = 'supported'
401 status = 'supported'
402 except error.BundleUnknownFeatureError as exc:
402 except error.BundleUnknownFeatureError as exc:
403 if part.mandatory: # mandatory parts
403 if part.mandatory: # mandatory parts
404 raise
404 raise
405 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
405 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
406 return # skip to part processing
406 return # skip to part processing
407 finally:
407 finally:
408 if op.ui.debugflag:
408 if op.ui.debugflag:
409 msg = ['bundle2-input-part: "%s"' % part.type]
409 msg = ['bundle2-input-part: "%s"' % part.type]
410 if not part.mandatory:
410 if not part.mandatory:
411 msg.append(' (advisory)')
411 msg.append(' (advisory)')
412 nbmp = len(part.mandatorykeys)
412 nbmp = len(part.mandatorykeys)
413 nbap = len(part.params) - nbmp
413 nbap = len(part.params) - nbmp
414 if nbmp or nbap:
414 if nbmp or nbap:
415 msg.append(' (params:')
415 msg.append(' (params:')
416 if nbmp:
416 if nbmp:
417 msg.append(' %i mandatory' % nbmp)
417 msg.append(' %i mandatory' % nbmp)
418 if nbap:
418 if nbap:
419 msg.append(' %i advisory' % nbmp)
419 msg.append(' %i advisory' % nbmp)
420 msg.append(')')
420 msg.append(')')
421 msg.append(' %s\n' % status)
421 msg.append(' %s\n' % status)
422 op.ui.debug(''.join(msg))
422 op.ui.debug(''.join(msg))
423
423
424 # handler is called outside the above try block so that we don't
424 # handler is called outside the above try block so that we don't
425 # risk catching KeyErrors from anything other than the
425 # risk catching KeyErrors from anything other than the
426 # parthandlermapping lookup (any KeyError raised by handler()
426 # parthandlermapping lookup (any KeyError raised by handler()
427 # itself represents a defect of a different variety).
427 # itself represents a defect of a different variety).
428 output = None
428 output = None
429 if op.captureoutput and op.reply is not None:
429 if op.captureoutput and op.reply is not None:
430 op.ui.pushbuffer(error=True, subproc=True)
430 op.ui.pushbuffer(error=True, subproc=True)
431 output = ''
431 output = ''
432 try:
432 try:
433 handler(op, part)
433 handler(op, part)
434 finally:
434 finally:
435 if output is not None:
435 if output is not None:
436 output = op.ui.popbuffer()
436 output = op.ui.popbuffer()
437 if output:
437 if output:
438 outpart = op.reply.newpart('output', data=output,
438 outpart = op.reply.newpart('output', data=output,
439 mandatory=False)
439 mandatory=False)
440 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
440 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
441 # If exiting or interrupted, do not attempt to seek the stream in the
441 # If exiting or interrupted, do not attempt to seek the stream in the
442 # finally block below. This makes abort faster.
442 # finally block below. This makes abort faster.
443 except (SystemExit, KeyboardInterrupt):
443 except (SystemExit, KeyboardInterrupt):
444 hardabort = True
444 hardabort = True
445 raise
445 raise
446 finally:
446 finally:
447 # consume the part content to not corrupt the stream.
447 # consume the part content to not corrupt the stream.
448 if not hardabort:
448 if not hardabort:
449 part.seek(0, 2)
449 part.seek(0, 2)
450
450
451
451
452 def decodecaps(blob):
452 def decodecaps(blob):
453 """decode a bundle2 caps bytes blob into a dictionary
453 """decode a bundle2 caps bytes blob into a dictionary
454
454
455 The blob is a list of capabilities (one per line)
455 The blob is a list of capabilities (one per line)
456 Capabilities may have values using a line of the form::
456 Capabilities may have values using a line of the form::
457
457
458 capability=value1,value2,value3
458 capability=value1,value2,value3
459
459
460 The values are always a list."""
460 The values are always a list."""
461 caps = {}
461 caps = {}
462 for line in blob.splitlines():
462 for line in blob.splitlines():
463 if not line:
463 if not line:
464 continue
464 continue
465 if '=' not in line:
465 if '=' not in line:
466 key, vals = line, ()
466 key, vals = line, ()
467 else:
467 else:
468 key, vals = line.split('=', 1)
468 key, vals = line.split('=', 1)
469 vals = vals.split(',')
469 vals = vals.split(',')
470 key = urlreq.unquote(key)
470 key = urlreq.unquote(key)
471 vals = [urlreq.unquote(v) for v in vals]
471 vals = [urlreq.unquote(v) for v in vals]
472 caps[key] = vals
472 caps[key] = vals
473 return caps
473 return caps
474
474
475 def encodecaps(caps):
475 def encodecaps(caps):
476 """encode a bundle2 caps dictionary into a bytes blob"""
476 """encode a bundle2 caps dictionary into a bytes blob"""
477 chunks = []
477 chunks = []
478 for ca in sorted(caps):
478 for ca in sorted(caps):
479 vals = caps[ca]
479 vals = caps[ca]
480 ca = urlreq.quote(ca)
480 ca = urlreq.quote(ca)
481 vals = [urlreq.quote(v) for v in vals]
481 vals = [urlreq.quote(v) for v in vals]
482 if vals:
482 if vals:
483 ca = "%s=%s" % (ca, ','.join(vals))
483 ca = "%s=%s" % (ca, ','.join(vals))
484 chunks.append(ca)
484 chunks.append(ca)
485 return '\n'.join(chunks)
485 return '\n'.join(chunks)
486
486
487 bundletypes = {
487 bundletypes = {
488 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
488 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
489 # since the unification ssh accepts a header but there
489 # since the unification ssh accepts a header but there
490 # is no capability signaling it.
490 # is no capability signaling it.
491 "HG20": (), # special-cased below
491 "HG20": (), # special-cased below
492 "HG10UN": ("HG10UN", 'UN'),
492 "HG10UN": ("HG10UN", 'UN'),
493 "HG10BZ": ("HG10", 'BZ'),
493 "HG10BZ": ("HG10", 'BZ'),
494 "HG10GZ": ("HG10GZ", 'GZ'),
494 "HG10GZ": ("HG10GZ", 'GZ'),
495 }
495 }
496
496
497 # hgweb uses this list to communicate its preferred type
497 # hgweb uses this list to communicate its preferred type
498 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
498 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
499
499
500 class bundle20(object):
500 class bundle20(object):
501 """represent an outgoing bundle2 container
501 """represent an outgoing bundle2 container
502
502
503 Use the `addparam` method to add stream level parameter. and `newpart` to
503 Use the `addparam` method to add stream level parameter. and `newpart` to
504 populate it. Then call `getchunks` to retrieve all the binary chunks of
504 populate it. Then call `getchunks` to retrieve all the binary chunks of
505 data that compose the bundle2 container."""
505 data that compose the bundle2 container."""
506
506
507 _magicstring = 'HG20'
507 _magicstring = 'HG20'
508
508
509 def __init__(self, ui, capabilities=()):
509 def __init__(self, ui, capabilities=()):
510 self.ui = ui
510 self.ui = ui
511 self._params = []
511 self._params = []
512 self._parts = []
512 self._parts = []
513 self.capabilities = dict(capabilities)
513 self.capabilities = dict(capabilities)
514 self._compengine = util.compengines.forbundletype('UN')
514 self._compengine = util.compengines.forbundletype('UN')
515 self._compopts = None
515 self._compopts = None
516
516
517 def setcompression(self, alg, compopts=None):
517 def setcompression(self, alg, compopts=None):
518 """setup core part compression to <alg>"""
518 """setup core part compression to <alg>"""
519 if alg in (None, 'UN'):
519 if alg in (None, 'UN'):
520 return
520 return
521 assert not any(n.lower() == 'Compression' for n, v in self._params)
521 assert not any(n.lower() == 'compression' for n, v in self._params)
522 self.addparam('Compression', alg)
522 self.addparam('Compression', alg)
523 self._compengine = util.compengines.forbundletype(alg)
523 self._compengine = util.compengines.forbundletype(alg)
524 self._compopts = compopts
524 self._compopts = compopts
525
525
526 @property
526 @property
527 def nbparts(self):
527 def nbparts(self):
528 """total number of parts added to the bundler"""
528 """total number of parts added to the bundler"""
529 return len(self._parts)
529 return len(self._parts)
530
530
531 # methods used to defines the bundle2 content
531 # methods used to defines the bundle2 content
532 def addparam(self, name, value=None):
532 def addparam(self, name, value=None):
533 """add a stream level parameter"""
533 """add a stream level parameter"""
534 if not name:
534 if not name:
535 raise ValueError('empty parameter name')
535 raise ValueError('empty parameter name')
536 if name[0] not in string.letters:
536 if name[0] not in string.letters:
537 raise ValueError('non letter first character: %r' % name)
537 raise ValueError('non letter first character: %r' % name)
538 self._params.append((name, value))
538 self._params.append((name, value))
539
539
540 def addpart(self, part):
540 def addpart(self, part):
541 """add a new part to the bundle2 container
541 """add a new part to the bundle2 container
542
542
543 Parts contains the actual applicative payload."""
543 Parts contains the actual applicative payload."""
544 assert part.id is None
544 assert part.id is None
545 part.id = len(self._parts) # very cheap counter
545 part.id = len(self._parts) # very cheap counter
546 self._parts.append(part)
546 self._parts.append(part)
547
547
548 def newpart(self, typeid, *args, **kwargs):
548 def newpart(self, typeid, *args, **kwargs):
549 """create a new part and add it to the containers
549 """create a new part and add it to the containers
550
550
551 As the part is directly added to the containers. For now, this means
551 As the part is directly added to the containers. For now, this means
552 that any failure to properly initialize the part after calling
552 that any failure to properly initialize the part after calling
553 ``newpart`` should result in a failure of the whole bundling process.
553 ``newpart`` should result in a failure of the whole bundling process.
554
554
555 You can still fall back to manually create and add if you need better
555 You can still fall back to manually create and add if you need better
556 control."""
556 control."""
557 part = bundlepart(typeid, *args, **kwargs)
557 part = bundlepart(typeid, *args, **kwargs)
558 self.addpart(part)
558 self.addpart(part)
559 return part
559 return part
560
560
561 # methods used to generate the bundle2 stream
561 # methods used to generate the bundle2 stream
562 def getchunks(self):
562 def getchunks(self):
563 if self.ui.debugflag:
563 if self.ui.debugflag:
564 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
564 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
565 if self._params:
565 if self._params:
566 msg.append(' (%i params)' % len(self._params))
566 msg.append(' (%i params)' % len(self._params))
567 msg.append(' %i parts total\n' % len(self._parts))
567 msg.append(' %i parts total\n' % len(self._parts))
568 self.ui.debug(''.join(msg))
568 self.ui.debug(''.join(msg))
569 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
569 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
570 yield self._magicstring
570 yield self._magicstring
571 param = self._paramchunk()
571 param = self._paramchunk()
572 outdebug(self.ui, 'bundle parameter: %s' % param)
572 outdebug(self.ui, 'bundle parameter: %s' % param)
573 yield _pack(_fstreamparamsize, len(param))
573 yield _pack(_fstreamparamsize, len(param))
574 if param:
574 if param:
575 yield param
575 yield param
576 for chunk in self._compengine.compressstream(self._getcorechunk(),
576 for chunk in self._compengine.compressstream(self._getcorechunk(),
577 self._compopts):
577 self._compopts):
578 yield chunk
578 yield chunk
579
579
580 def _paramchunk(self):
580 def _paramchunk(self):
581 """return a encoded version of all stream parameters"""
581 """return a encoded version of all stream parameters"""
582 blocks = []
582 blocks = []
583 for par, value in self._params:
583 for par, value in self._params:
584 par = urlreq.quote(par)
584 par = urlreq.quote(par)
585 if value is not None:
585 if value is not None:
586 value = urlreq.quote(value)
586 value = urlreq.quote(value)
587 par = '%s=%s' % (par, value)
587 par = '%s=%s' % (par, value)
588 blocks.append(par)
588 blocks.append(par)
589 return ' '.join(blocks)
589 return ' '.join(blocks)
590
590
591 def _getcorechunk(self):
591 def _getcorechunk(self):
592 """yield chunk for the core part of the bundle
592 """yield chunk for the core part of the bundle
593
593
594 (all but headers and parameters)"""
594 (all but headers and parameters)"""
595 outdebug(self.ui, 'start of parts')
595 outdebug(self.ui, 'start of parts')
596 for part in self._parts:
596 for part in self._parts:
597 outdebug(self.ui, 'bundle part: "%s"' % part.type)
597 outdebug(self.ui, 'bundle part: "%s"' % part.type)
598 for chunk in part.getchunks(ui=self.ui):
598 for chunk in part.getchunks(ui=self.ui):
599 yield chunk
599 yield chunk
600 outdebug(self.ui, 'end of bundle')
600 outdebug(self.ui, 'end of bundle')
601 yield _pack(_fpartheadersize, 0)
601 yield _pack(_fpartheadersize, 0)
602
602
603
603
604 def salvageoutput(self):
604 def salvageoutput(self):
605 """return a list with a copy of all output parts in the bundle
605 """return a list with a copy of all output parts in the bundle
606
606
607 This is meant to be used during error handling to make sure we preserve
607 This is meant to be used during error handling to make sure we preserve
608 server output"""
608 server output"""
609 salvaged = []
609 salvaged = []
610 for part in self._parts:
610 for part in self._parts:
611 if part.type.startswith('output'):
611 if part.type.startswith('output'):
612 salvaged.append(part.copy())
612 salvaged.append(part.copy())
613 return salvaged
613 return salvaged
614
614
615
615
616 class unpackermixin(object):
616 class unpackermixin(object):
617 """A mixin to extract bytes and struct data from a stream"""
617 """A mixin to extract bytes and struct data from a stream"""
618
618
619 def __init__(self, fp):
619 def __init__(self, fp):
620 self._fp = fp
620 self._fp = fp
621 self._seekable = (util.safehasattr(fp, 'seek') and
621 self._seekable = (util.safehasattr(fp, 'seek') and
622 util.safehasattr(fp, 'tell'))
622 util.safehasattr(fp, 'tell'))
623
623
624 def _unpack(self, format):
624 def _unpack(self, format):
625 """unpack this struct format from the stream"""
625 """unpack this struct format from the stream"""
626 data = self._readexact(struct.calcsize(format))
626 data = self._readexact(struct.calcsize(format))
627 return _unpack(format, data)
627 return _unpack(format, data)
628
628
629 def _readexact(self, size):
629 def _readexact(self, size):
630 """read exactly <size> bytes from the stream"""
630 """read exactly <size> bytes from the stream"""
631 return changegroup.readexactly(self._fp, size)
631 return changegroup.readexactly(self._fp, size)
632
632
633 def seek(self, offset, whence=0):
633 def seek(self, offset, whence=0):
634 """move the underlying file pointer"""
634 """move the underlying file pointer"""
635 if self._seekable:
635 if self._seekable:
636 return self._fp.seek(offset, whence)
636 return self._fp.seek(offset, whence)
637 else:
637 else:
638 raise NotImplementedError(_('File pointer is not seekable'))
638 raise NotImplementedError(_('File pointer is not seekable'))
639
639
640 def tell(self):
640 def tell(self):
641 """return the file offset, or None if file is not seekable"""
641 """return the file offset, or None if file is not seekable"""
642 if self._seekable:
642 if self._seekable:
643 try:
643 try:
644 return self._fp.tell()
644 return self._fp.tell()
645 except IOError as e:
645 except IOError as e:
646 if e.errno == errno.ESPIPE:
646 if e.errno == errno.ESPIPE:
647 self._seekable = False
647 self._seekable = False
648 else:
648 else:
649 raise
649 raise
650 return None
650 return None
651
651
652 def close(self):
652 def close(self):
653 """close underlying file"""
653 """close underlying file"""
654 if util.safehasattr(self._fp, 'close'):
654 if util.safehasattr(self._fp, 'close'):
655 return self._fp.close()
655 return self._fp.close()
656
656
657 def getunbundler(ui, fp, magicstring=None):
657 def getunbundler(ui, fp, magicstring=None):
658 """return a valid unbundler object for a given magicstring"""
658 """return a valid unbundler object for a given magicstring"""
659 if magicstring is None:
659 if magicstring is None:
660 magicstring = changegroup.readexactly(fp, 4)
660 magicstring = changegroup.readexactly(fp, 4)
661 magic, version = magicstring[0:2], magicstring[2:4]
661 magic, version = magicstring[0:2], magicstring[2:4]
662 if magic != 'HG':
662 if magic != 'HG':
663 raise error.Abort(_('not a Mercurial bundle'))
663 raise error.Abort(_('not a Mercurial bundle'))
664 unbundlerclass = formatmap.get(version)
664 unbundlerclass = formatmap.get(version)
665 if unbundlerclass is None:
665 if unbundlerclass is None:
666 raise error.Abort(_('unknown bundle version %s') % version)
666 raise error.Abort(_('unknown bundle version %s') % version)
667 unbundler = unbundlerclass(ui, fp)
667 unbundler = unbundlerclass(ui, fp)
668 indebug(ui, 'start processing of %s stream' % magicstring)
668 indebug(ui, 'start processing of %s stream' % magicstring)
669 return unbundler
669 return unbundler
670
670
671 class unbundle20(unpackermixin):
671 class unbundle20(unpackermixin):
672 """interpret a bundle2 stream
672 """interpret a bundle2 stream
673
673
674 This class is fed with a binary stream and yields parts through its
674 This class is fed with a binary stream and yields parts through its
675 `iterparts` methods."""
675 `iterparts` methods."""
676
676
677 _magicstring = 'HG20'
677 _magicstring = 'HG20'
678
678
679 def __init__(self, ui, fp):
679 def __init__(self, ui, fp):
680 """If header is specified, we do not read it out of the stream."""
680 """If header is specified, we do not read it out of the stream."""
681 self.ui = ui
681 self.ui = ui
682 self._compengine = util.compengines.forbundletype('UN')
682 self._compengine = util.compengines.forbundletype('UN')
683 self._compressed = None
683 self._compressed = None
684 super(unbundle20, self).__init__(fp)
684 super(unbundle20, self).__init__(fp)
685
685
686 @util.propertycache
686 @util.propertycache
687 def params(self):
687 def params(self):
688 """dictionary of stream level parameters"""
688 """dictionary of stream level parameters"""
689 indebug(self.ui, 'reading bundle2 stream parameters')
689 indebug(self.ui, 'reading bundle2 stream parameters')
690 params = {}
690 params = {}
691 paramssize = self._unpack(_fstreamparamsize)[0]
691 paramssize = self._unpack(_fstreamparamsize)[0]
692 if paramssize < 0:
692 if paramssize < 0:
693 raise error.BundleValueError('negative bundle param size: %i'
693 raise error.BundleValueError('negative bundle param size: %i'
694 % paramssize)
694 % paramssize)
695 if paramssize:
695 if paramssize:
696 params = self._readexact(paramssize)
696 params = self._readexact(paramssize)
697 params = self._processallparams(params)
697 params = self._processallparams(params)
698 return params
698 return params
699
699
700 def _processallparams(self, paramsblock):
700 def _processallparams(self, paramsblock):
701 """"""
701 """"""
702 params = util.sortdict()
702 params = util.sortdict()
703 for p in paramsblock.split(' '):
703 for p in paramsblock.split(' '):
704 p = p.split('=', 1)
704 p = p.split('=', 1)
705 p = [urlreq.unquote(i) for i in p]
705 p = [urlreq.unquote(i) for i in p]
706 if len(p) < 2:
706 if len(p) < 2:
707 p.append(None)
707 p.append(None)
708 self._processparam(*p)
708 self._processparam(*p)
709 params[p[0]] = p[1]
709 params[p[0]] = p[1]
710 return params
710 return params
711
711
712
712
713 def _processparam(self, name, value):
713 def _processparam(self, name, value):
714 """process a parameter, applying its effect if needed
714 """process a parameter, applying its effect if needed
715
715
716 Parameter starting with a lower case letter are advisory and will be
716 Parameter starting with a lower case letter are advisory and will be
717 ignored when unknown. Those starting with an upper case letter are
717 ignored when unknown. Those starting with an upper case letter are
718 mandatory and will this function will raise a KeyError when unknown.
718 mandatory and will this function will raise a KeyError when unknown.
719
719
720 Note: no option are currently supported. Any input will be either
720 Note: no option are currently supported. Any input will be either
721 ignored or failing.
721 ignored or failing.
722 """
722 """
723 if not name:
723 if not name:
724 raise ValueError('empty parameter name')
724 raise ValueError('empty parameter name')
725 if name[0] not in string.letters:
725 if name[0] not in string.letters:
726 raise ValueError('non letter first character: %r' % name)
726 raise ValueError('non letter first character: %r' % name)
727 try:
727 try:
728 handler = b2streamparamsmap[name.lower()]
728 handler = b2streamparamsmap[name.lower()]
729 except KeyError:
729 except KeyError:
730 if name[0].islower():
730 if name[0].islower():
731 indebug(self.ui, "ignoring unknown parameter %r" % name)
731 indebug(self.ui, "ignoring unknown parameter %r" % name)
732 else:
732 else:
733 raise error.BundleUnknownFeatureError(params=(name,))
733 raise error.BundleUnknownFeatureError(params=(name,))
734 else:
734 else:
735 handler(self, name, value)
735 handler(self, name, value)
736
736
737 def _forwardchunks(self):
737 def _forwardchunks(self):
738 """utility to transfer a bundle2 as binary
738 """utility to transfer a bundle2 as binary
739
739
740 This is made necessary by the fact the 'getbundle' command over 'ssh'
740 This is made necessary by the fact the 'getbundle' command over 'ssh'
741 have no way to know then the reply end, relying on the bundle to be
741 have no way to know then the reply end, relying on the bundle to be
742 interpreted to know its end. This is terrible and we are sorry, but we
742 interpreted to know its end. This is terrible and we are sorry, but we
743 needed to move forward to get general delta enabled.
743 needed to move forward to get general delta enabled.
744 """
744 """
745 yield self._magicstring
745 yield self._magicstring
746 assert 'params' not in vars(self)
746 assert 'params' not in vars(self)
747 paramssize = self._unpack(_fstreamparamsize)[0]
747 paramssize = self._unpack(_fstreamparamsize)[0]
748 if paramssize < 0:
748 if paramssize < 0:
749 raise error.BundleValueError('negative bundle param size: %i'
749 raise error.BundleValueError('negative bundle param size: %i'
750 % paramssize)
750 % paramssize)
751 yield _pack(_fstreamparamsize, paramssize)
751 yield _pack(_fstreamparamsize, paramssize)
752 if paramssize:
752 if paramssize:
753 params = self._readexact(paramssize)
753 params = self._readexact(paramssize)
754 self._processallparams(params)
754 self._processallparams(params)
755 yield params
755 yield params
756 assert self._compengine.bundletype == 'UN'
756 assert self._compengine.bundletype == 'UN'
757 # From there, payload might need to be decompressed
757 # From there, payload might need to be decompressed
758 self._fp = self._compengine.decompressorreader(self._fp)
758 self._fp = self._compengine.decompressorreader(self._fp)
759 emptycount = 0
759 emptycount = 0
760 while emptycount < 2:
760 while emptycount < 2:
761 # so we can brainlessly loop
761 # so we can brainlessly loop
762 assert _fpartheadersize == _fpayloadsize
762 assert _fpartheadersize == _fpayloadsize
763 size = self._unpack(_fpartheadersize)[0]
763 size = self._unpack(_fpartheadersize)[0]
764 yield _pack(_fpartheadersize, size)
764 yield _pack(_fpartheadersize, size)
765 if size:
765 if size:
766 emptycount = 0
766 emptycount = 0
767 else:
767 else:
768 emptycount += 1
768 emptycount += 1
769 continue
769 continue
770 if size == flaginterrupt:
770 if size == flaginterrupt:
771 continue
771 continue
772 elif size < 0:
772 elif size < 0:
773 raise error.BundleValueError('negative chunk size: %i')
773 raise error.BundleValueError('negative chunk size: %i')
774 yield self._readexact(size)
774 yield self._readexact(size)
775
775
776
776
777 def iterparts(self):
777 def iterparts(self):
778 """yield all parts contained in the stream"""
778 """yield all parts contained in the stream"""
779 # make sure param have been loaded
779 # make sure param have been loaded
780 self.params
780 self.params
781 # From there, payload need to be decompressed
781 # From there, payload need to be decompressed
782 self._fp = self._compengine.decompressorreader(self._fp)
782 self._fp = self._compengine.decompressorreader(self._fp)
783 indebug(self.ui, 'start extraction of bundle2 parts')
783 indebug(self.ui, 'start extraction of bundle2 parts')
784 headerblock = self._readpartheader()
784 headerblock = self._readpartheader()
785 while headerblock is not None:
785 while headerblock is not None:
786 part = unbundlepart(self.ui, headerblock, self._fp)
786 part = unbundlepart(self.ui, headerblock, self._fp)
787 yield part
787 yield part
788 part.seek(0, 2)
788 part.seek(0, 2)
789 headerblock = self._readpartheader()
789 headerblock = self._readpartheader()
790 indebug(self.ui, 'end of bundle2 stream')
790 indebug(self.ui, 'end of bundle2 stream')
791
791
792 def _readpartheader(self):
792 def _readpartheader(self):
793 """reads a part header size and return the bytes blob
793 """reads a part header size and return the bytes blob
794
794
795 returns None if empty"""
795 returns None if empty"""
796 headersize = self._unpack(_fpartheadersize)[0]
796 headersize = self._unpack(_fpartheadersize)[0]
797 if headersize < 0:
797 if headersize < 0:
798 raise error.BundleValueError('negative part header size: %i'
798 raise error.BundleValueError('negative part header size: %i'
799 % headersize)
799 % headersize)
800 indebug(self.ui, 'part header size: %i' % headersize)
800 indebug(self.ui, 'part header size: %i' % headersize)
801 if headersize:
801 if headersize:
802 return self._readexact(headersize)
802 return self._readexact(headersize)
803 return None
803 return None
804
804
805 def compressed(self):
805 def compressed(self):
806 self.params # load params
806 self.params # load params
807 return self._compressed
807 return self._compressed
808
808
809 formatmap = {'20': unbundle20}
809 formatmap = {'20': unbundle20}
810
810
811 b2streamparamsmap = {}
811 b2streamparamsmap = {}
812
812
813 def b2streamparamhandler(name):
813 def b2streamparamhandler(name):
814 """register a handler for a stream level parameter"""
814 """register a handler for a stream level parameter"""
815 def decorator(func):
815 def decorator(func):
816 assert name not in formatmap
816 assert name not in formatmap
817 b2streamparamsmap[name] = func
817 b2streamparamsmap[name] = func
818 return func
818 return func
819 return decorator
819 return decorator
820
820
821 @b2streamparamhandler('compression')
821 @b2streamparamhandler('compression')
822 def processcompression(unbundler, param, value):
822 def processcompression(unbundler, param, value):
823 """read compression parameter and install payload decompression"""
823 """read compression parameter and install payload decompression"""
824 if value not in util.compengines.supportedbundletypes:
824 if value not in util.compengines.supportedbundletypes:
825 raise error.BundleUnknownFeatureError(params=(param,),
825 raise error.BundleUnknownFeatureError(params=(param,),
826 values=(value,))
826 values=(value,))
827 unbundler._compengine = util.compengines.forbundletype(value)
827 unbundler._compengine = util.compengines.forbundletype(value)
828 if value is not None:
828 if value is not None:
829 unbundler._compressed = True
829 unbundler._compressed = True
830
830
831 class bundlepart(object):
831 class bundlepart(object):
832 """A bundle2 part contains application level payload
832 """A bundle2 part contains application level payload
833
833
834 The part `type` is used to route the part to the application level
834 The part `type` is used to route the part to the application level
835 handler.
835 handler.
836
836
837 The part payload is contained in ``part.data``. It could be raw bytes or a
837 The part payload is contained in ``part.data``. It could be raw bytes or a
838 generator of byte chunks.
838 generator of byte chunks.
839
839
840 You can add parameters to the part using the ``addparam`` method.
840 You can add parameters to the part using the ``addparam`` method.
841 Parameters can be either mandatory (default) or advisory. Remote side
841 Parameters can be either mandatory (default) or advisory. Remote side
842 should be able to safely ignore the advisory ones.
842 should be able to safely ignore the advisory ones.
843
843
844 Both data and parameters cannot be modified after the generation has begun.
844 Both data and parameters cannot be modified after the generation has begun.
845 """
845 """
846
846
847 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
847 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
848 data='', mandatory=True):
848 data='', mandatory=True):
849 validateparttype(parttype)
849 validateparttype(parttype)
850 self.id = None
850 self.id = None
851 self.type = parttype
851 self.type = parttype
852 self._data = data
852 self._data = data
853 self._mandatoryparams = list(mandatoryparams)
853 self._mandatoryparams = list(mandatoryparams)
854 self._advisoryparams = list(advisoryparams)
854 self._advisoryparams = list(advisoryparams)
855 # checking for duplicated entries
855 # checking for duplicated entries
856 self._seenparams = set()
856 self._seenparams = set()
857 for pname, __ in self._mandatoryparams + self._advisoryparams:
857 for pname, __ in self._mandatoryparams + self._advisoryparams:
858 if pname in self._seenparams:
858 if pname in self._seenparams:
859 raise RuntimeError('duplicated params: %s' % pname)
859 raise RuntimeError('duplicated params: %s' % pname)
860 self._seenparams.add(pname)
860 self._seenparams.add(pname)
861 # status of the part's generation:
861 # status of the part's generation:
862 # - None: not started,
862 # - None: not started,
863 # - False: currently generated,
863 # - False: currently generated,
864 # - True: generation done.
864 # - True: generation done.
865 self._generated = None
865 self._generated = None
866 self.mandatory = mandatory
866 self.mandatory = mandatory
867
867
868 def copy(self):
868 def copy(self):
869 """return a copy of the part
869 """return a copy of the part
870
870
871 The new part have the very same content but no partid assigned yet.
871 The new part have the very same content but no partid assigned yet.
872 Parts with generated data cannot be copied."""
872 Parts with generated data cannot be copied."""
873 assert not util.safehasattr(self.data, 'next')
873 assert not util.safehasattr(self.data, 'next')
874 return self.__class__(self.type, self._mandatoryparams,
874 return self.__class__(self.type, self._mandatoryparams,
875 self._advisoryparams, self._data, self.mandatory)
875 self._advisoryparams, self._data, self.mandatory)
876
876
877 # methods used to defines the part content
877 # methods used to defines the part content
878 @property
878 @property
879 def data(self):
879 def data(self):
880 return self._data
880 return self._data
881
881
882 @data.setter
882 @data.setter
883 def data(self, data):
883 def data(self, data):
884 if self._generated is not None:
884 if self._generated is not None:
885 raise error.ReadOnlyPartError('part is being generated')
885 raise error.ReadOnlyPartError('part is being generated')
886 self._data = data
886 self._data = data
887
887
888 @property
888 @property
889 def mandatoryparams(self):
889 def mandatoryparams(self):
890 # make it an immutable tuple to force people through ``addparam``
890 # make it an immutable tuple to force people through ``addparam``
891 return tuple(self._mandatoryparams)
891 return tuple(self._mandatoryparams)
892
892
893 @property
893 @property
894 def advisoryparams(self):
894 def advisoryparams(self):
895 # make it an immutable tuple to force people through ``addparam``
895 # make it an immutable tuple to force people through ``addparam``
896 return tuple(self._advisoryparams)
896 return tuple(self._advisoryparams)
897
897
898 def addparam(self, name, value='', mandatory=True):
898 def addparam(self, name, value='', mandatory=True):
899 if self._generated is not None:
899 if self._generated is not None:
900 raise error.ReadOnlyPartError('part is being generated')
900 raise error.ReadOnlyPartError('part is being generated')
901 if name in self._seenparams:
901 if name in self._seenparams:
902 raise ValueError('duplicated params: %s' % name)
902 raise ValueError('duplicated params: %s' % name)
903 self._seenparams.add(name)
903 self._seenparams.add(name)
904 params = self._advisoryparams
904 params = self._advisoryparams
905 if mandatory:
905 if mandatory:
906 params = self._mandatoryparams
906 params = self._mandatoryparams
907 params.append((name, value))
907 params.append((name, value))
908
908
909 # methods used to generates the bundle2 stream
909 # methods used to generates the bundle2 stream
910 def getchunks(self, ui):
910 def getchunks(self, ui):
911 if self._generated is not None:
911 if self._generated is not None:
912 raise RuntimeError('part can only be consumed once')
912 raise RuntimeError('part can only be consumed once')
913 self._generated = False
913 self._generated = False
914
914
915 if ui.debugflag:
915 if ui.debugflag:
916 msg = ['bundle2-output-part: "%s"' % self.type]
916 msg = ['bundle2-output-part: "%s"' % self.type]
917 if not self.mandatory:
917 if not self.mandatory:
918 msg.append(' (advisory)')
918 msg.append(' (advisory)')
919 nbmp = len(self.mandatoryparams)
919 nbmp = len(self.mandatoryparams)
920 nbap = len(self.advisoryparams)
920 nbap = len(self.advisoryparams)
921 if nbmp or nbap:
921 if nbmp or nbap:
922 msg.append(' (params:')
922 msg.append(' (params:')
923 if nbmp:
923 if nbmp:
924 msg.append(' %i mandatory' % nbmp)
924 msg.append(' %i mandatory' % nbmp)
925 if nbap:
925 if nbap:
926 msg.append(' %i advisory' % nbmp)
926 msg.append(' %i advisory' % nbmp)
927 msg.append(')')
927 msg.append(')')
928 if not self.data:
928 if not self.data:
929 msg.append(' empty payload')
929 msg.append(' empty payload')
930 elif util.safehasattr(self.data, 'next'):
930 elif util.safehasattr(self.data, 'next'):
931 msg.append(' streamed payload')
931 msg.append(' streamed payload')
932 else:
932 else:
933 msg.append(' %i bytes payload' % len(self.data))
933 msg.append(' %i bytes payload' % len(self.data))
934 msg.append('\n')
934 msg.append('\n')
935 ui.debug(''.join(msg))
935 ui.debug(''.join(msg))
936
936
937 #### header
937 #### header
938 if self.mandatory:
938 if self.mandatory:
939 parttype = self.type.upper()
939 parttype = self.type.upper()
940 else:
940 else:
941 parttype = self.type.lower()
941 parttype = self.type.lower()
942 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
942 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
943 ## parttype
943 ## parttype
944 header = [_pack(_fparttypesize, len(parttype)),
944 header = [_pack(_fparttypesize, len(parttype)),
945 parttype, _pack(_fpartid, self.id),
945 parttype, _pack(_fpartid, self.id),
946 ]
946 ]
947 ## parameters
947 ## parameters
948 # count
948 # count
949 manpar = self.mandatoryparams
949 manpar = self.mandatoryparams
950 advpar = self.advisoryparams
950 advpar = self.advisoryparams
951 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
951 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
952 # size
952 # size
953 parsizes = []
953 parsizes = []
954 for key, value in manpar:
954 for key, value in manpar:
955 parsizes.append(len(key))
955 parsizes.append(len(key))
956 parsizes.append(len(value))
956 parsizes.append(len(value))
957 for key, value in advpar:
957 for key, value in advpar:
958 parsizes.append(len(key))
958 parsizes.append(len(key))
959 parsizes.append(len(value))
959 parsizes.append(len(value))
960 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
960 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
961 header.append(paramsizes)
961 header.append(paramsizes)
962 # key, value
962 # key, value
963 for key, value in manpar:
963 for key, value in manpar:
964 header.append(key)
964 header.append(key)
965 header.append(value)
965 header.append(value)
966 for key, value in advpar:
966 for key, value in advpar:
967 header.append(key)
967 header.append(key)
968 header.append(value)
968 header.append(value)
969 ## finalize header
969 ## finalize header
970 headerchunk = ''.join(header)
970 headerchunk = ''.join(header)
971 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
971 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
972 yield _pack(_fpartheadersize, len(headerchunk))
972 yield _pack(_fpartheadersize, len(headerchunk))
973 yield headerchunk
973 yield headerchunk
974 ## payload
974 ## payload
975 try:
975 try:
976 for chunk in self._payloadchunks():
976 for chunk in self._payloadchunks():
977 outdebug(ui, 'payload chunk size: %i' % len(chunk))
977 outdebug(ui, 'payload chunk size: %i' % len(chunk))
978 yield _pack(_fpayloadsize, len(chunk))
978 yield _pack(_fpayloadsize, len(chunk))
979 yield chunk
979 yield chunk
980 except GeneratorExit:
980 except GeneratorExit:
981 # GeneratorExit means that nobody is listening for our
981 # GeneratorExit means that nobody is listening for our
982 # results anyway, so just bail quickly rather than trying
982 # results anyway, so just bail quickly rather than trying
983 # to produce an error part.
983 # to produce an error part.
984 ui.debug('bundle2-generatorexit\n')
984 ui.debug('bundle2-generatorexit\n')
985 raise
985 raise
986 except BaseException as exc:
986 except BaseException as exc:
987 # backup exception data for later
987 # backup exception data for later
988 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
988 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
989 % exc)
989 % exc)
990 exc_info = sys.exc_info()
990 exc_info = sys.exc_info()
991 msg = 'unexpected error: %s' % exc
991 msg = 'unexpected error: %s' % exc
992 interpart = bundlepart('error:abort', [('message', msg)],
992 interpart = bundlepart('error:abort', [('message', msg)],
993 mandatory=False)
993 mandatory=False)
994 interpart.id = 0
994 interpart.id = 0
995 yield _pack(_fpayloadsize, -1)
995 yield _pack(_fpayloadsize, -1)
996 for chunk in interpart.getchunks(ui=ui):
996 for chunk in interpart.getchunks(ui=ui):
997 yield chunk
997 yield chunk
998 outdebug(ui, 'closing payload chunk')
998 outdebug(ui, 'closing payload chunk')
999 # abort current part payload
999 # abort current part payload
1000 yield _pack(_fpayloadsize, 0)
1000 yield _pack(_fpayloadsize, 0)
1001 if pycompat.ispy3:
1001 if pycompat.ispy3:
1002 raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
1002 raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
1003 else:
1003 else:
1004 exec("""raise exc_info[0], exc_info[1], exc_info[2]""")
1004 exec("""raise exc_info[0], exc_info[1], exc_info[2]""")
1005 # end of payload
1005 # end of payload
1006 outdebug(ui, 'closing payload chunk')
1006 outdebug(ui, 'closing payload chunk')
1007 yield _pack(_fpayloadsize, 0)
1007 yield _pack(_fpayloadsize, 0)
1008 self._generated = True
1008 self._generated = True
1009
1009
1010 def _payloadchunks(self):
1010 def _payloadchunks(self):
1011 """yield chunks of a the part payload
1011 """yield chunks of a the part payload
1012
1012
1013 Exists to handle the different methods to provide data to a part."""
1013 Exists to handle the different methods to provide data to a part."""
1014 # we only support fixed size data now.
1014 # we only support fixed size data now.
1015 # This will be improved in the future.
1015 # This will be improved in the future.
1016 if util.safehasattr(self.data, 'next'):
1016 if util.safehasattr(self.data, 'next'):
1017 buff = util.chunkbuffer(self.data)
1017 buff = util.chunkbuffer(self.data)
1018 chunk = buff.read(preferedchunksize)
1018 chunk = buff.read(preferedchunksize)
1019 while chunk:
1019 while chunk:
1020 yield chunk
1020 yield chunk
1021 chunk = buff.read(preferedchunksize)
1021 chunk = buff.read(preferedchunksize)
1022 elif len(self.data):
1022 elif len(self.data):
1023 yield self.data
1023 yield self.data
1024
1024
1025
1025
1026 flaginterrupt = -1
1026 flaginterrupt = -1
1027
1027
1028 class interrupthandler(unpackermixin):
1028 class interrupthandler(unpackermixin):
1029 """read one part and process it with restricted capability
1029 """read one part and process it with restricted capability
1030
1030
1031 This allows to transmit exception raised on the producer size during part
1031 This allows to transmit exception raised on the producer size during part
1032 iteration while the consumer is reading a part.
1032 iteration while the consumer is reading a part.
1033
1033
1034 Part processed in this manner only have access to a ui object,"""
1034 Part processed in this manner only have access to a ui object,"""
1035
1035
1036 def __init__(self, ui, fp):
1036 def __init__(self, ui, fp):
1037 super(interrupthandler, self).__init__(fp)
1037 super(interrupthandler, self).__init__(fp)
1038 self.ui = ui
1038 self.ui = ui
1039
1039
1040 def _readpartheader(self):
1040 def _readpartheader(self):
1041 """reads a part header size and return the bytes blob
1041 """reads a part header size and return the bytes blob
1042
1042
1043 returns None if empty"""
1043 returns None if empty"""
1044 headersize = self._unpack(_fpartheadersize)[0]
1044 headersize = self._unpack(_fpartheadersize)[0]
1045 if headersize < 0:
1045 if headersize < 0:
1046 raise error.BundleValueError('negative part header size: %i'
1046 raise error.BundleValueError('negative part header size: %i'
1047 % headersize)
1047 % headersize)
1048 indebug(self.ui, 'part header size: %i\n' % headersize)
1048 indebug(self.ui, 'part header size: %i\n' % headersize)
1049 if headersize:
1049 if headersize:
1050 return self._readexact(headersize)
1050 return self._readexact(headersize)
1051 return None
1051 return None
1052
1052
1053 def __call__(self):
1053 def __call__(self):
1054
1054
1055 self.ui.debug('bundle2-input-stream-interrupt:'
1055 self.ui.debug('bundle2-input-stream-interrupt:'
1056 ' opening out of band context\n')
1056 ' opening out of band context\n')
1057 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1057 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1058 headerblock = self._readpartheader()
1058 headerblock = self._readpartheader()
1059 if headerblock is None:
1059 if headerblock is None:
1060 indebug(self.ui, 'no part found during interruption.')
1060 indebug(self.ui, 'no part found during interruption.')
1061 return
1061 return
1062 part = unbundlepart(self.ui, headerblock, self._fp)
1062 part = unbundlepart(self.ui, headerblock, self._fp)
1063 op = interruptoperation(self.ui)
1063 op = interruptoperation(self.ui)
1064 _processpart(op, part)
1064 _processpart(op, part)
1065 self.ui.debug('bundle2-input-stream-interrupt:'
1065 self.ui.debug('bundle2-input-stream-interrupt:'
1066 ' closing out of band context\n')
1066 ' closing out of band context\n')
1067
1067
1068 class interruptoperation(object):
1068 class interruptoperation(object):
1069 """A limited operation to be use by part handler during interruption
1069 """A limited operation to be use by part handler during interruption
1070
1070
1071 It only have access to an ui object.
1071 It only have access to an ui object.
1072 """
1072 """
1073
1073
1074 def __init__(self, ui):
1074 def __init__(self, ui):
1075 self.ui = ui
1075 self.ui = ui
1076 self.reply = None
1076 self.reply = None
1077 self.captureoutput = False
1077 self.captureoutput = False
1078
1078
1079 @property
1079 @property
1080 def repo(self):
1080 def repo(self):
1081 raise RuntimeError('no repo access from stream interruption')
1081 raise RuntimeError('no repo access from stream interruption')
1082
1082
1083 def gettransaction(self):
1083 def gettransaction(self):
1084 raise TransactionUnavailable('no repo access from stream interruption')
1084 raise TransactionUnavailable('no repo access from stream interruption')
1085
1085
1086 class unbundlepart(unpackermixin):
1086 class unbundlepart(unpackermixin):
1087 """a bundle part read from a bundle"""
1087 """a bundle part read from a bundle"""
1088
1088
1089 def __init__(self, ui, header, fp):
1089 def __init__(self, ui, header, fp):
1090 super(unbundlepart, self).__init__(fp)
1090 super(unbundlepart, self).__init__(fp)
1091 self.ui = ui
1091 self.ui = ui
1092 # unbundle state attr
1092 # unbundle state attr
1093 self._headerdata = header
1093 self._headerdata = header
1094 self._headeroffset = 0
1094 self._headeroffset = 0
1095 self._initialized = False
1095 self._initialized = False
1096 self.consumed = False
1096 self.consumed = False
1097 # part data
1097 # part data
1098 self.id = None
1098 self.id = None
1099 self.type = None
1099 self.type = None
1100 self.mandatoryparams = None
1100 self.mandatoryparams = None
1101 self.advisoryparams = None
1101 self.advisoryparams = None
1102 self.params = None
1102 self.params = None
1103 self.mandatorykeys = ()
1103 self.mandatorykeys = ()
1104 self._payloadstream = None
1104 self._payloadstream = None
1105 self._readheader()
1105 self._readheader()
1106 self._mandatory = None
1106 self._mandatory = None
1107 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1107 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1108 self._pos = 0
1108 self._pos = 0
1109
1109
1110 def _fromheader(self, size):
1110 def _fromheader(self, size):
1111 """return the next <size> byte from the header"""
1111 """return the next <size> byte from the header"""
1112 offset = self._headeroffset
1112 offset = self._headeroffset
1113 data = self._headerdata[offset:(offset + size)]
1113 data = self._headerdata[offset:(offset + size)]
1114 self._headeroffset = offset + size
1114 self._headeroffset = offset + size
1115 return data
1115 return data
1116
1116
1117 def _unpackheader(self, format):
1117 def _unpackheader(self, format):
1118 """read given format from header
1118 """read given format from header
1119
1119
1120 This automatically compute the size of the format to read."""
1120 This automatically compute the size of the format to read."""
1121 data = self._fromheader(struct.calcsize(format))
1121 data = self._fromheader(struct.calcsize(format))
1122 return _unpack(format, data)
1122 return _unpack(format, data)
1123
1123
1124 def _initparams(self, mandatoryparams, advisoryparams):
1124 def _initparams(self, mandatoryparams, advisoryparams):
1125 """internal function to setup all logic related parameters"""
1125 """internal function to setup all logic related parameters"""
1126 # make it read only to prevent people touching it by mistake.
1126 # make it read only to prevent people touching it by mistake.
1127 self.mandatoryparams = tuple(mandatoryparams)
1127 self.mandatoryparams = tuple(mandatoryparams)
1128 self.advisoryparams = tuple(advisoryparams)
1128 self.advisoryparams = tuple(advisoryparams)
1129 # user friendly UI
1129 # user friendly UI
1130 self.params = util.sortdict(self.mandatoryparams)
1130 self.params = util.sortdict(self.mandatoryparams)
1131 self.params.update(self.advisoryparams)
1131 self.params.update(self.advisoryparams)
1132 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1132 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1133
1133
1134 def _payloadchunks(self, chunknum=0):
1134 def _payloadchunks(self, chunknum=0):
1135 '''seek to specified chunk and start yielding data'''
1135 '''seek to specified chunk and start yielding data'''
1136 if len(self._chunkindex) == 0:
1136 if len(self._chunkindex) == 0:
1137 assert chunknum == 0, 'Must start with chunk 0'
1137 assert chunknum == 0, 'Must start with chunk 0'
1138 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1138 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1139 else:
1139 else:
1140 assert chunknum < len(self._chunkindex), \
1140 assert chunknum < len(self._chunkindex), \
1141 'Unknown chunk %d' % chunknum
1141 'Unknown chunk %d' % chunknum
1142 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1142 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1143
1143
1144 pos = self._chunkindex[chunknum][0]
1144 pos = self._chunkindex[chunknum][0]
1145 payloadsize = self._unpack(_fpayloadsize)[0]
1145 payloadsize = self._unpack(_fpayloadsize)[0]
1146 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1146 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1147 while payloadsize:
1147 while payloadsize:
1148 if payloadsize == flaginterrupt:
1148 if payloadsize == flaginterrupt:
1149 # interruption detection, the handler will now read a
1149 # interruption detection, the handler will now read a
1150 # single part and process it.
1150 # single part and process it.
1151 interrupthandler(self.ui, self._fp)()
1151 interrupthandler(self.ui, self._fp)()
1152 elif payloadsize < 0:
1152 elif payloadsize < 0:
1153 msg = 'negative payload chunk size: %i' % payloadsize
1153 msg = 'negative payload chunk size: %i' % payloadsize
1154 raise error.BundleValueError(msg)
1154 raise error.BundleValueError(msg)
1155 else:
1155 else:
1156 result = self._readexact(payloadsize)
1156 result = self._readexact(payloadsize)
1157 chunknum += 1
1157 chunknum += 1
1158 pos += payloadsize
1158 pos += payloadsize
1159 if chunknum == len(self._chunkindex):
1159 if chunknum == len(self._chunkindex):
1160 self._chunkindex.append((pos,
1160 self._chunkindex.append((pos,
1161 super(unbundlepart, self).tell()))
1161 super(unbundlepart, self).tell()))
1162 yield result
1162 yield result
1163 payloadsize = self._unpack(_fpayloadsize)[0]
1163 payloadsize = self._unpack(_fpayloadsize)[0]
1164 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1164 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1165
1165
1166 def _findchunk(self, pos):
1166 def _findchunk(self, pos):
1167 '''for a given payload position, return a chunk number and offset'''
1167 '''for a given payload position, return a chunk number and offset'''
1168 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1168 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1169 if ppos == pos:
1169 if ppos == pos:
1170 return chunk, 0
1170 return chunk, 0
1171 elif ppos > pos:
1171 elif ppos > pos:
1172 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1172 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1173 raise ValueError('Unknown chunk')
1173 raise ValueError('Unknown chunk')
1174
1174
1175 def _readheader(self):
1175 def _readheader(self):
1176 """read the header and setup the object"""
1176 """read the header and setup the object"""
1177 typesize = self._unpackheader(_fparttypesize)[0]
1177 typesize = self._unpackheader(_fparttypesize)[0]
1178 self.type = self._fromheader(typesize)
1178 self.type = self._fromheader(typesize)
1179 indebug(self.ui, 'part type: "%s"' % self.type)
1179 indebug(self.ui, 'part type: "%s"' % self.type)
1180 self.id = self._unpackheader(_fpartid)[0]
1180 self.id = self._unpackheader(_fpartid)[0]
1181 indebug(self.ui, 'part id: "%s"' % self.id)
1181 indebug(self.ui, 'part id: "%s"' % self.id)
1182 # extract mandatory bit from type
1182 # extract mandatory bit from type
1183 self.mandatory = (self.type != self.type.lower())
1183 self.mandatory = (self.type != self.type.lower())
1184 self.type = self.type.lower()
1184 self.type = self.type.lower()
1185 ## reading parameters
1185 ## reading parameters
1186 # param count
1186 # param count
1187 mancount, advcount = self._unpackheader(_fpartparamcount)
1187 mancount, advcount = self._unpackheader(_fpartparamcount)
1188 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1188 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1189 # param size
1189 # param size
1190 fparamsizes = _makefpartparamsizes(mancount + advcount)
1190 fparamsizes = _makefpartparamsizes(mancount + advcount)
1191 paramsizes = self._unpackheader(fparamsizes)
1191 paramsizes = self._unpackheader(fparamsizes)
1192 # make it a list of couple again
1192 # make it a list of couple again
1193 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1193 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1194 # split mandatory from advisory
1194 # split mandatory from advisory
1195 mansizes = paramsizes[:mancount]
1195 mansizes = paramsizes[:mancount]
1196 advsizes = paramsizes[mancount:]
1196 advsizes = paramsizes[mancount:]
1197 # retrieve param value
1197 # retrieve param value
1198 manparams = []
1198 manparams = []
1199 for key, value in mansizes:
1199 for key, value in mansizes:
1200 manparams.append((self._fromheader(key), self._fromheader(value)))
1200 manparams.append((self._fromheader(key), self._fromheader(value)))
1201 advparams = []
1201 advparams = []
1202 for key, value in advsizes:
1202 for key, value in advsizes:
1203 advparams.append((self._fromheader(key), self._fromheader(value)))
1203 advparams.append((self._fromheader(key), self._fromheader(value)))
1204 self._initparams(manparams, advparams)
1204 self._initparams(manparams, advparams)
1205 ## part payload
1205 ## part payload
1206 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1206 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1207 # we read the data, tell it
1207 # we read the data, tell it
1208 self._initialized = True
1208 self._initialized = True
1209
1209
1210 def read(self, size=None):
1210 def read(self, size=None):
1211 """read payload data"""
1211 """read payload data"""
1212 if not self._initialized:
1212 if not self._initialized:
1213 self._readheader()
1213 self._readheader()
1214 if size is None:
1214 if size is None:
1215 data = self._payloadstream.read()
1215 data = self._payloadstream.read()
1216 else:
1216 else:
1217 data = self._payloadstream.read(size)
1217 data = self._payloadstream.read(size)
1218 self._pos += len(data)
1218 self._pos += len(data)
1219 if size is None or len(data) < size:
1219 if size is None or len(data) < size:
1220 if not self.consumed and self._pos:
1220 if not self.consumed and self._pos:
1221 self.ui.debug('bundle2-input-part: total payload size %i\n'
1221 self.ui.debug('bundle2-input-part: total payload size %i\n'
1222 % self._pos)
1222 % self._pos)
1223 self.consumed = True
1223 self.consumed = True
1224 return data
1224 return data
1225
1225
1226 def tell(self):
1226 def tell(self):
1227 return self._pos
1227 return self._pos
1228
1228
1229 def seek(self, offset, whence=0):
1229 def seek(self, offset, whence=0):
1230 if whence == 0:
1230 if whence == 0:
1231 newpos = offset
1231 newpos = offset
1232 elif whence == 1:
1232 elif whence == 1:
1233 newpos = self._pos + offset
1233 newpos = self._pos + offset
1234 elif whence == 2:
1234 elif whence == 2:
1235 if not self.consumed:
1235 if not self.consumed:
1236 self.read()
1236 self.read()
1237 newpos = self._chunkindex[-1][0] - offset
1237 newpos = self._chunkindex[-1][0] - offset
1238 else:
1238 else:
1239 raise ValueError('Unknown whence value: %r' % (whence,))
1239 raise ValueError('Unknown whence value: %r' % (whence,))
1240
1240
1241 if newpos > self._chunkindex[-1][0] and not self.consumed:
1241 if newpos > self._chunkindex[-1][0] and not self.consumed:
1242 self.read()
1242 self.read()
1243 if not 0 <= newpos <= self._chunkindex[-1][0]:
1243 if not 0 <= newpos <= self._chunkindex[-1][0]:
1244 raise ValueError('Offset out of range')
1244 raise ValueError('Offset out of range')
1245
1245
1246 if self._pos != newpos:
1246 if self._pos != newpos:
1247 chunk, internaloffset = self._findchunk(newpos)
1247 chunk, internaloffset = self._findchunk(newpos)
1248 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1248 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1249 adjust = self.read(internaloffset)
1249 adjust = self.read(internaloffset)
1250 if len(adjust) != internaloffset:
1250 if len(adjust) != internaloffset:
1251 raise error.Abort(_('Seek failed\n'))
1251 raise error.Abort(_('Seek failed\n'))
1252 self._pos = newpos
1252 self._pos = newpos
1253
1253
1254 # These are only the static capabilities.
1254 # These are only the static capabilities.
1255 # Check the 'getrepocaps' function for the rest.
1255 # Check the 'getrepocaps' function for the rest.
1256 capabilities = {'HG20': (),
1256 capabilities = {'HG20': (),
1257 'error': ('abort', 'unsupportedcontent', 'pushraced',
1257 'error': ('abort', 'unsupportedcontent', 'pushraced',
1258 'pushkey'),
1258 'pushkey'),
1259 'listkeys': (),
1259 'listkeys': (),
1260 'pushkey': (),
1260 'pushkey': (),
1261 'digests': tuple(sorted(util.DIGESTS.keys())),
1261 'digests': tuple(sorted(util.DIGESTS.keys())),
1262 'remote-changegroup': ('http', 'https'),
1262 'remote-changegroup': ('http', 'https'),
1263 'hgtagsfnodes': (),
1263 'hgtagsfnodes': (),
1264 }
1264 }
1265
1265
1266 def getrepocaps(repo, allowpushback=False):
1266 def getrepocaps(repo, allowpushback=False):
1267 """return the bundle2 capabilities for a given repo
1267 """return the bundle2 capabilities for a given repo
1268
1268
1269 Exists to allow extensions (like evolution) to mutate the capabilities.
1269 Exists to allow extensions (like evolution) to mutate the capabilities.
1270 """
1270 """
1271 caps = capabilities.copy()
1271 caps = capabilities.copy()
1272 caps['changegroup'] = tuple(sorted(
1272 caps['changegroup'] = tuple(sorted(
1273 changegroup.supportedincomingversions(repo)))
1273 changegroup.supportedincomingversions(repo)))
1274 if obsolete.isenabled(repo, obsolete.exchangeopt):
1274 if obsolete.isenabled(repo, obsolete.exchangeopt):
1275 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1275 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1276 caps['obsmarkers'] = supportedformat
1276 caps['obsmarkers'] = supportedformat
1277 if allowpushback:
1277 if allowpushback:
1278 caps['pushback'] = ()
1278 caps['pushback'] = ()
1279 return caps
1279 return caps
1280
1280
1281 def bundle2caps(remote):
1281 def bundle2caps(remote):
1282 """return the bundle capabilities of a peer as dict"""
1282 """return the bundle capabilities of a peer as dict"""
1283 raw = remote.capable('bundle2')
1283 raw = remote.capable('bundle2')
1284 if not raw and raw != '':
1284 if not raw and raw != '':
1285 return {}
1285 return {}
1286 capsblob = urlreq.unquote(remote.capable('bundle2'))
1286 capsblob = urlreq.unquote(remote.capable('bundle2'))
1287 return decodecaps(capsblob)
1287 return decodecaps(capsblob)
1288
1288
1289 def obsmarkersversion(caps):
1289 def obsmarkersversion(caps):
1290 """extract the list of supported obsmarkers versions from a bundle2caps dict
1290 """extract the list of supported obsmarkers versions from a bundle2caps dict
1291 """
1291 """
1292 obscaps = caps.get('obsmarkers', ())
1292 obscaps = caps.get('obsmarkers', ())
1293 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1293 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1294
1294
1295 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1295 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1296 compopts=None):
1296 compopts=None):
1297 """Write a bundle file and return its filename.
1297 """Write a bundle file and return its filename.
1298
1298
1299 Existing files will not be overwritten.
1299 Existing files will not be overwritten.
1300 If no filename is specified, a temporary file is created.
1300 If no filename is specified, a temporary file is created.
1301 bz2 compression can be turned off.
1301 bz2 compression can be turned off.
1302 The bundle file will be deleted in case of errors.
1302 The bundle file will be deleted in case of errors.
1303 """
1303 """
1304
1304
1305 if bundletype == "HG20":
1305 if bundletype == "HG20":
1306 bundle = bundle20(ui)
1306 bundle = bundle20(ui)
1307 bundle.setcompression(compression, compopts)
1307 bundle.setcompression(compression, compopts)
1308 part = bundle.newpart('changegroup', data=cg.getchunks())
1308 part = bundle.newpart('changegroup', data=cg.getchunks())
1309 part.addparam('version', cg.version)
1309 part.addparam('version', cg.version)
1310 if 'clcount' in cg.extras:
1310 if 'clcount' in cg.extras:
1311 part.addparam('nbchanges', str(cg.extras['clcount']),
1311 part.addparam('nbchanges', str(cg.extras['clcount']),
1312 mandatory=False)
1312 mandatory=False)
1313 chunkiter = bundle.getchunks()
1313 chunkiter = bundle.getchunks()
1314 else:
1314 else:
1315 # compression argument is only for the bundle2 case
1315 # compression argument is only for the bundle2 case
1316 assert compression is None
1316 assert compression is None
1317 if cg.version != '01':
1317 if cg.version != '01':
1318 raise error.Abort(_('old bundle types only supports v1 '
1318 raise error.Abort(_('old bundle types only supports v1 '
1319 'changegroups'))
1319 'changegroups'))
1320 header, comp = bundletypes[bundletype]
1320 header, comp = bundletypes[bundletype]
1321 if comp not in util.compengines.supportedbundletypes:
1321 if comp not in util.compengines.supportedbundletypes:
1322 raise error.Abort(_('unknown stream compression type: %s')
1322 raise error.Abort(_('unknown stream compression type: %s')
1323 % comp)
1323 % comp)
1324 compengine = util.compengines.forbundletype(comp)
1324 compengine = util.compengines.forbundletype(comp)
1325 def chunkiter():
1325 def chunkiter():
1326 yield header
1326 yield header
1327 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1327 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1328 yield chunk
1328 yield chunk
1329 chunkiter = chunkiter()
1329 chunkiter = chunkiter()
1330
1330
1331 # parse the changegroup data, otherwise we will block
1331 # parse the changegroup data, otherwise we will block
1332 # in case of sshrepo because we don't know the end of the stream
1332 # in case of sshrepo because we don't know the end of the stream
1333 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1333 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1334
1334
1335 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1335 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1336 def handlechangegroup(op, inpart):
1336 def handlechangegroup(op, inpart):
1337 """apply a changegroup part on the repo
1337 """apply a changegroup part on the repo
1338
1338
1339 This is a very early implementation that will massive rework before being
1339 This is a very early implementation that will massive rework before being
1340 inflicted to any end-user.
1340 inflicted to any end-user.
1341 """
1341 """
1342 # Make sure we trigger a transaction creation
1342 # Make sure we trigger a transaction creation
1343 #
1343 #
1344 # The addchangegroup function will get a transaction object by itself, but
1344 # The addchangegroup function will get a transaction object by itself, but
1345 # we need to make sure we trigger the creation of a transaction object used
1345 # we need to make sure we trigger the creation of a transaction object used
1346 # for the whole processing scope.
1346 # for the whole processing scope.
1347 op.gettransaction()
1347 op.gettransaction()
1348 unpackerversion = inpart.params.get('version', '01')
1348 unpackerversion = inpart.params.get('version', '01')
1349 # We should raise an appropriate exception here
1349 # We should raise an appropriate exception here
1350 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1350 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1351 # the source and url passed here are overwritten by the one contained in
1351 # the source and url passed here are overwritten by the one contained in
1352 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1352 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1353 nbchangesets = None
1353 nbchangesets = None
1354 if 'nbchanges' in inpart.params:
1354 if 'nbchanges' in inpart.params:
1355 nbchangesets = int(inpart.params.get('nbchanges'))
1355 nbchangesets = int(inpart.params.get('nbchanges'))
1356 if ('treemanifest' in inpart.params and
1356 if ('treemanifest' in inpart.params and
1357 'treemanifest' not in op.repo.requirements):
1357 'treemanifest' not in op.repo.requirements):
1358 if len(op.repo.changelog) != 0:
1358 if len(op.repo.changelog) != 0:
1359 raise error.Abort(_(
1359 raise error.Abort(_(
1360 "bundle contains tree manifests, but local repo is "
1360 "bundle contains tree manifests, but local repo is "
1361 "non-empty and does not use tree manifests"))
1361 "non-empty and does not use tree manifests"))
1362 op.repo.requirements.add('treemanifest')
1362 op.repo.requirements.add('treemanifest')
1363 op.repo._applyopenerreqs()
1363 op.repo._applyopenerreqs()
1364 op.repo._writerequirements()
1364 op.repo._writerequirements()
1365 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1365 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1366 op.records.add('changegroup', {'return': ret})
1366 op.records.add('changegroup', {'return': ret})
1367 if op.reply is not None:
1367 if op.reply is not None:
1368 # This is definitely not the final form of this
1368 # This is definitely not the final form of this
1369 # return. But one need to start somewhere.
1369 # return. But one need to start somewhere.
1370 part = op.reply.newpart('reply:changegroup', mandatory=False)
1370 part = op.reply.newpart('reply:changegroup', mandatory=False)
1371 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1371 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1372 part.addparam('return', '%i' % ret, mandatory=False)
1372 part.addparam('return', '%i' % ret, mandatory=False)
1373 assert not inpart.read()
1373 assert not inpart.read()
1374
1374
1375 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1375 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1376 ['digest:%s' % k for k in util.DIGESTS.keys()])
1376 ['digest:%s' % k for k in util.DIGESTS.keys()])
1377 @parthandler('remote-changegroup', _remotechangegroupparams)
1377 @parthandler('remote-changegroup', _remotechangegroupparams)
1378 def handleremotechangegroup(op, inpart):
1378 def handleremotechangegroup(op, inpart):
1379 """apply a bundle10 on the repo, given an url and validation information
1379 """apply a bundle10 on the repo, given an url and validation information
1380
1380
1381 All the information about the remote bundle to import are given as
1381 All the information about the remote bundle to import are given as
1382 parameters. The parameters include:
1382 parameters. The parameters include:
1383 - url: the url to the bundle10.
1383 - url: the url to the bundle10.
1384 - size: the bundle10 file size. It is used to validate what was
1384 - size: the bundle10 file size. It is used to validate what was
1385 retrieved by the client matches the server knowledge about the bundle.
1385 retrieved by the client matches the server knowledge about the bundle.
1386 - digests: a space separated list of the digest types provided as
1386 - digests: a space separated list of the digest types provided as
1387 parameters.
1387 parameters.
1388 - digest:<digest-type>: the hexadecimal representation of the digest with
1388 - digest:<digest-type>: the hexadecimal representation of the digest with
1389 that name. Like the size, it is used to validate what was retrieved by
1389 that name. Like the size, it is used to validate what was retrieved by
1390 the client matches what the server knows about the bundle.
1390 the client matches what the server knows about the bundle.
1391
1391
1392 When multiple digest types are given, all of them are checked.
1392 When multiple digest types are given, all of them are checked.
1393 """
1393 """
1394 try:
1394 try:
1395 raw_url = inpart.params['url']
1395 raw_url = inpart.params['url']
1396 except KeyError:
1396 except KeyError:
1397 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1397 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1398 parsed_url = util.url(raw_url)
1398 parsed_url = util.url(raw_url)
1399 if parsed_url.scheme not in capabilities['remote-changegroup']:
1399 if parsed_url.scheme not in capabilities['remote-changegroup']:
1400 raise error.Abort(_('remote-changegroup does not support %s urls') %
1400 raise error.Abort(_('remote-changegroup does not support %s urls') %
1401 parsed_url.scheme)
1401 parsed_url.scheme)
1402
1402
1403 try:
1403 try:
1404 size = int(inpart.params['size'])
1404 size = int(inpart.params['size'])
1405 except ValueError:
1405 except ValueError:
1406 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1406 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1407 % 'size')
1407 % 'size')
1408 except KeyError:
1408 except KeyError:
1409 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1409 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1410
1410
1411 digests = {}
1411 digests = {}
1412 for typ in inpart.params.get('digests', '').split():
1412 for typ in inpart.params.get('digests', '').split():
1413 param = 'digest:%s' % typ
1413 param = 'digest:%s' % typ
1414 try:
1414 try:
1415 value = inpart.params[param]
1415 value = inpart.params[param]
1416 except KeyError:
1416 except KeyError:
1417 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1417 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1418 param)
1418 param)
1419 digests[typ] = value
1419 digests[typ] = value
1420
1420
1421 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1421 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1422
1422
1423 # Make sure we trigger a transaction creation
1423 # Make sure we trigger a transaction creation
1424 #
1424 #
1425 # The addchangegroup function will get a transaction object by itself, but
1425 # The addchangegroup function will get a transaction object by itself, but
1426 # we need to make sure we trigger the creation of a transaction object used
1426 # we need to make sure we trigger the creation of a transaction object used
1427 # for the whole processing scope.
1427 # for the whole processing scope.
1428 op.gettransaction()
1428 op.gettransaction()
1429 from . import exchange
1429 from . import exchange
1430 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1430 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1431 if not isinstance(cg, changegroup.cg1unpacker):
1431 if not isinstance(cg, changegroup.cg1unpacker):
1432 raise error.Abort(_('%s: not a bundle version 1.0') %
1432 raise error.Abort(_('%s: not a bundle version 1.0') %
1433 util.hidepassword(raw_url))
1433 util.hidepassword(raw_url))
1434 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1434 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1435 op.records.add('changegroup', {'return': ret})
1435 op.records.add('changegroup', {'return': ret})
1436 if op.reply is not None:
1436 if op.reply is not None:
1437 # This is definitely not the final form of this
1437 # This is definitely not the final form of this
1438 # return. But one need to start somewhere.
1438 # return. But one need to start somewhere.
1439 part = op.reply.newpart('reply:changegroup')
1439 part = op.reply.newpart('reply:changegroup')
1440 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1440 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1441 part.addparam('return', '%i' % ret, mandatory=False)
1441 part.addparam('return', '%i' % ret, mandatory=False)
1442 try:
1442 try:
1443 real_part.validate()
1443 real_part.validate()
1444 except error.Abort as e:
1444 except error.Abort as e:
1445 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1445 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1446 (util.hidepassword(raw_url), str(e)))
1446 (util.hidepassword(raw_url), str(e)))
1447 assert not inpart.read()
1447 assert not inpart.read()
1448
1448
1449 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1449 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1450 def handlereplychangegroup(op, inpart):
1450 def handlereplychangegroup(op, inpart):
1451 ret = int(inpart.params['return'])
1451 ret = int(inpart.params['return'])
1452 replyto = int(inpart.params['in-reply-to'])
1452 replyto = int(inpart.params['in-reply-to'])
1453 op.records.add('changegroup', {'return': ret}, replyto)
1453 op.records.add('changegroup', {'return': ret}, replyto)
1454
1454
1455 @parthandler('check:heads')
1455 @parthandler('check:heads')
1456 def handlecheckheads(op, inpart):
1456 def handlecheckheads(op, inpart):
1457 """check that head of the repo did not change
1457 """check that head of the repo did not change
1458
1458
1459 This is used to detect a push race when using unbundle.
1459 This is used to detect a push race when using unbundle.
1460 This replaces the "heads" argument of unbundle."""
1460 This replaces the "heads" argument of unbundle."""
1461 h = inpart.read(20)
1461 h = inpart.read(20)
1462 heads = []
1462 heads = []
1463 while len(h) == 20:
1463 while len(h) == 20:
1464 heads.append(h)
1464 heads.append(h)
1465 h = inpart.read(20)
1465 h = inpart.read(20)
1466 assert not h
1466 assert not h
1467 # Trigger a transaction so that we are guaranteed to have the lock now.
1467 # Trigger a transaction so that we are guaranteed to have the lock now.
1468 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1468 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1469 op.gettransaction()
1469 op.gettransaction()
1470 if sorted(heads) != sorted(op.repo.heads()):
1470 if sorted(heads) != sorted(op.repo.heads()):
1471 raise error.PushRaced('repository changed while pushing - '
1471 raise error.PushRaced('repository changed while pushing - '
1472 'please try again')
1472 'please try again')
1473
1473
1474 @parthandler('output')
1474 @parthandler('output')
1475 def handleoutput(op, inpart):
1475 def handleoutput(op, inpart):
1476 """forward output captured on the server to the client"""
1476 """forward output captured on the server to the client"""
1477 for line in inpart.read().splitlines():
1477 for line in inpart.read().splitlines():
1478 op.ui.status(_('remote: %s\n') % line)
1478 op.ui.status(_('remote: %s\n') % line)
1479
1479
1480 @parthandler('replycaps')
1480 @parthandler('replycaps')
1481 def handlereplycaps(op, inpart):
1481 def handlereplycaps(op, inpart):
1482 """Notify that a reply bundle should be created
1482 """Notify that a reply bundle should be created
1483
1483
1484 The payload contains the capabilities information for the reply"""
1484 The payload contains the capabilities information for the reply"""
1485 caps = decodecaps(inpart.read())
1485 caps = decodecaps(inpart.read())
1486 if op.reply is None:
1486 if op.reply is None:
1487 op.reply = bundle20(op.ui, caps)
1487 op.reply = bundle20(op.ui, caps)
1488
1488
1489 class AbortFromPart(error.Abort):
1489 class AbortFromPart(error.Abort):
1490 """Sub-class of Abort that denotes an error from a bundle2 part."""
1490 """Sub-class of Abort that denotes an error from a bundle2 part."""
1491
1491
1492 @parthandler('error:abort', ('message', 'hint'))
1492 @parthandler('error:abort', ('message', 'hint'))
1493 def handleerrorabort(op, inpart):
1493 def handleerrorabort(op, inpart):
1494 """Used to transmit abort error over the wire"""
1494 """Used to transmit abort error over the wire"""
1495 raise AbortFromPart(inpart.params['message'],
1495 raise AbortFromPart(inpart.params['message'],
1496 hint=inpart.params.get('hint'))
1496 hint=inpart.params.get('hint'))
1497
1497
1498 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1498 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1499 'in-reply-to'))
1499 'in-reply-to'))
1500 def handleerrorpushkey(op, inpart):
1500 def handleerrorpushkey(op, inpart):
1501 """Used to transmit failure of a mandatory pushkey over the wire"""
1501 """Used to transmit failure of a mandatory pushkey over the wire"""
1502 kwargs = {}
1502 kwargs = {}
1503 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1503 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1504 value = inpart.params.get(name)
1504 value = inpart.params.get(name)
1505 if value is not None:
1505 if value is not None:
1506 kwargs[name] = value
1506 kwargs[name] = value
1507 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1507 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1508
1508
1509 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1509 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1510 def handleerrorunsupportedcontent(op, inpart):
1510 def handleerrorunsupportedcontent(op, inpart):
1511 """Used to transmit unknown content error over the wire"""
1511 """Used to transmit unknown content error over the wire"""
1512 kwargs = {}
1512 kwargs = {}
1513 parttype = inpart.params.get('parttype')
1513 parttype = inpart.params.get('parttype')
1514 if parttype is not None:
1514 if parttype is not None:
1515 kwargs['parttype'] = parttype
1515 kwargs['parttype'] = parttype
1516 params = inpart.params.get('params')
1516 params = inpart.params.get('params')
1517 if params is not None:
1517 if params is not None:
1518 kwargs['params'] = params.split('\0')
1518 kwargs['params'] = params.split('\0')
1519
1519
1520 raise error.BundleUnknownFeatureError(**kwargs)
1520 raise error.BundleUnknownFeatureError(**kwargs)
1521
1521
1522 @parthandler('error:pushraced', ('message',))
1522 @parthandler('error:pushraced', ('message',))
1523 def handleerrorpushraced(op, inpart):
1523 def handleerrorpushraced(op, inpart):
1524 """Used to transmit push race error over the wire"""
1524 """Used to transmit push race error over the wire"""
1525 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1525 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1526
1526
1527 @parthandler('listkeys', ('namespace',))
1527 @parthandler('listkeys', ('namespace',))
1528 def handlelistkeys(op, inpart):
1528 def handlelistkeys(op, inpart):
1529 """retrieve pushkey namespace content stored in a bundle2"""
1529 """retrieve pushkey namespace content stored in a bundle2"""
1530 namespace = inpart.params['namespace']
1530 namespace = inpart.params['namespace']
1531 r = pushkey.decodekeys(inpart.read())
1531 r = pushkey.decodekeys(inpart.read())
1532 op.records.add('listkeys', (namespace, r))
1532 op.records.add('listkeys', (namespace, r))
1533
1533
1534 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1534 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1535 def handlepushkey(op, inpart):
1535 def handlepushkey(op, inpart):
1536 """process a pushkey request"""
1536 """process a pushkey request"""
1537 dec = pushkey.decode
1537 dec = pushkey.decode
1538 namespace = dec(inpart.params['namespace'])
1538 namespace = dec(inpart.params['namespace'])
1539 key = dec(inpart.params['key'])
1539 key = dec(inpart.params['key'])
1540 old = dec(inpart.params['old'])
1540 old = dec(inpart.params['old'])
1541 new = dec(inpart.params['new'])
1541 new = dec(inpart.params['new'])
1542 # Grab the transaction to ensure that we have the lock before performing the
1542 # Grab the transaction to ensure that we have the lock before performing the
1543 # pushkey.
1543 # pushkey.
1544 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1544 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1545 op.gettransaction()
1545 op.gettransaction()
1546 ret = op.repo.pushkey(namespace, key, old, new)
1546 ret = op.repo.pushkey(namespace, key, old, new)
1547 record = {'namespace': namespace,
1547 record = {'namespace': namespace,
1548 'key': key,
1548 'key': key,
1549 'old': old,
1549 'old': old,
1550 'new': new}
1550 'new': new}
1551 op.records.add('pushkey', record)
1551 op.records.add('pushkey', record)
1552 if op.reply is not None:
1552 if op.reply is not None:
1553 rpart = op.reply.newpart('reply:pushkey')
1553 rpart = op.reply.newpart('reply:pushkey')
1554 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1554 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1555 rpart.addparam('return', '%i' % ret, mandatory=False)
1555 rpart.addparam('return', '%i' % ret, mandatory=False)
1556 if inpart.mandatory and not ret:
1556 if inpart.mandatory and not ret:
1557 kwargs = {}
1557 kwargs = {}
1558 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1558 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1559 if key in inpart.params:
1559 if key in inpart.params:
1560 kwargs[key] = inpart.params[key]
1560 kwargs[key] = inpart.params[key]
1561 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1561 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1562
1562
1563 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1563 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1564 def handlepushkeyreply(op, inpart):
1564 def handlepushkeyreply(op, inpart):
1565 """retrieve the result of a pushkey request"""
1565 """retrieve the result of a pushkey request"""
1566 ret = int(inpart.params['return'])
1566 ret = int(inpart.params['return'])
1567 partid = int(inpart.params['in-reply-to'])
1567 partid = int(inpart.params['in-reply-to'])
1568 op.records.add('pushkey', {'return': ret}, partid)
1568 op.records.add('pushkey', {'return': ret}, partid)
1569
1569
1570 @parthandler('obsmarkers')
1570 @parthandler('obsmarkers')
1571 def handleobsmarker(op, inpart):
1571 def handleobsmarker(op, inpart):
1572 """add a stream of obsmarkers to the repo"""
1572 """add a stream of obsmarkers to the repo"""
1573 tr = op.gettransaction()
1573 tr = op.gettransaction()
1574 markerdata = inpart.read()
1574 markerdata = inpart.read()
1575 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1575 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1576 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1576 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1577 % len(markerdata))
1577 % len(markerdata))
1578 # The mergemarkers call will crash if marker creation is not enabled.
1578 # The mergemarkers call will crash if marker creation is not enabled.
1579 # we want to avoid this if the part is advisory.
1579 # we want to avoid this if the part is advisory.
1580 if not inpart.mandatory and op.repo.obsstore.readonly:
1580 if not inpart.mandatory and op.repo.obsstore.readonly:
1581 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1581 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1582 return
1582 return
1583 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1583 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1584 if new:
1584 if new:
1585 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1585 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1586 op.records.add('obsmarkers', {'new': new})
1586 op.records.add('obsmarkers', {'new': new})
1587 if op.reply is not None:
1587 if op.reply is not None:
1588 rpart = op.reply.newpart('reply:obsmarkers')
1588 rpart = op.reply.newpart('reply:obsmarkers')
1589 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1589 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1590 rpart.addparam('new', '%i' % new, mandatory=False)
1590 rpart.addparam('new', '%i' % new, mandatory=False)
1591
1591
1592
1592
1593 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1593 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1594 def handleobsmarkerreply(op, inpart):
1594 def handleobsmarkerreply(op, inpart):
1595 """retrieve the result of a pushkey request"""
1595 """retrieve the result of a pushkey request"""
1596 ret = int(inpart.params['new'])
1596 ret = int(inpart.params['new'])
1597 partid = int(inpart.params['in-reply-to'])
1597 partid = int(inpart.params['in-reply-to'])
1598 op.records.add('obsmarkers', {'new': ret}, partid)
1598 op.records.add('obsmarkers', {'new': ret}, partid)
1599
1599
1600 @parthandler('hgtagsfnodes')
1600 @parthandler('hgtagsfnodes')
1601 def handlehgtagsfnodes(op, inpart):
1601 def handlehgtagsfnodes(op, inpart):
1602 """Applies .hgtags fnodes cache entries to the local repo.
1602 """Applies .hgtags fnodes cache entries to the local repo.
1603
1603
1604 Payload is pairs of 20 byte changeset nodes and filenodes.
1604 Payload is pairs of 20 byte changeset nodes and filenodes.
1605 """
1605 """
1606 # Grab the transaction so we ensure that we have the lock at this point.
1606 # Grab the transaction so we ensure that we have the lock at this point.
1607 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1607 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1608 op.gettransaction()
1608 op.gettransaction()
1609 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1609 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1610
1610
1611 count = 0
1611 count = 0
1612 while True:
1612 while True:
1613 node = inpart.read(20)
1613 node = inpart.read(20)
1614 fnode = inpart.read(20)
1614 fnode = inpart.read(20)
1615 if len(node) < 20 or len(fnode) < 20:
1615 if len(node) < 20 or len(fnode) < 20:
1616 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1616 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1617 break
1617 break
1618 cache.setfnode(node, fnode)
1618 cache.setfnode(node, fnode)
1619 count += 1
1619 count += 1
1620
1620
1621 cache.write()
1621 cache.write()
1622 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1622 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now