##// END OF EJS Templates
bundle2: use compressstream compression engine API...
Gregory Szorc -
r30357:5925bda4 default
parent child Browse files
Show More
@@ -1,1628 +1,1618 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 pushkey,
161 pushkey,
162 pycompat,
162 pycompat,
163 tags,
163 tags,
164 url,
164 url,
165 util,
165 util,
166 )
166 )
167
167
168 urlerr = util.urlerr
168 urlerr = util.urlerr
169 urlreq = util.urlreq
169 urlreq = util.urlreq
170
170
171 _pack = struct.pack
171 _pack = struct.pack
172 _unpack = struct.unpack
172 _unpack = struct.unpack
173
173
174 _fstreamparamsize = '>i'
174 _fstreamparamsize = '>i'
175 _fpartheadersize = '>i'
175 _fpartheadersize = '>i'
176 _fparttypesize = '>B'
176 _fparttypesize = '>B'
177 _fpartid = '>I'
177 _fpartid = '>I'
178 _fpayloadsize = '>i'
178 _fpayloadsize = '>i'
179 _fpartparamcount = '>BB'
179 _fpartparamcount = '>BB'
180
180
181 preferedchunksize = 4096
181 preferedchunksize = 4096
182
182
183 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
183 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
184
184
185 def outdebug(ui, message):
185 def outdebug(ui, message):
186 """debug regarding output stream (bundling)"""
186 """debug regarding output stream (bundling)"""
187 if ui.configbool('devel', 'bundle2.debug', False):
187 if ui.configbool('devel', 'bundle2.debug', False):
188 ui.debug('bundle2-output: %s\n' % message)
188 ui.debug('bundle2-output: %s\n' % message)
189
189
190 def indebug(ui, message):
190 def indebug(ui, message):
191 """debug on input stream (unbundling)"""
191 """debug on input stream (unbundling)"""
192 if ui.configbool('devel', 'bundle2.debug', False):
192 if ui.configbool('devel', 'bundle2.debug', False):
193 ui.debug('bundle2-input: %s\n' % message)
193 ui.debug('bundle2-input: %s\n' % message)
194
194
195 def validateparttype(parttype):
195 def validateparttype(parttype):
196 """raise ValueError if a parttype contains invalid character"""
196 """raise ValueError if a parttype contains invalid character"""
197 if _parttypeforbidden.search(parttype):
197 if _parttypeforbidden.search(parttype):
198 raise ValueError(parttype)
198 raise ValueError(parttype)
199
199
200 def _makefpartparamsizes(nbparams):
200 def _makefpartparamsizes(nbparams):
201 """return a struct format to read part parameter sizes
201 """return a struct format to read part parameter sizes
202
202
203 The number parameters is variable so we need to build that format
203 The number parameters is variable so we need to build that format
204 dynamically.
204 dynamically.
205 """
205 """
206 return '>'+('BB'*nbparams)
206 return '>'+('BB'*nbparams)
207
207
208 parthandlermapping = {}
208 parthandlermapping = {}
209
209
210 def parthandler(parttype, params=()):
210 def parthandler(parttype, params=()):
211 """decorator that register a function as a bundle2 part handler
211 """decorator that register a function as a bundle2 part handler
212
212
213 eg::
213 eg::
214
214
215 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
215 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
216 def myparttypehandler(...):
216 def myparttypehandler(...):
217 '''process a part of type "my part".'''
217 '''process a part of type "my part".'''
218 ...
218 ...
219 """
219 """
220 validateparttype(parttype)
220 validateparttype(parttype)
221 def _decorator(func):
221 def _decorator(func):
222 lparttype = parttype.lower() # enforce lower case matching.
222 lparttype = parttype.lower() # enforce lower case matching.
223 assert lparttype not in parthandlermapping
223 assert lparttype not in parthandlermapping
224 parthandlermapping[lparttype] = func
224 parthandlermapping[lparttype] = func
225 func.params = frozenset(params)
225 func.params = frozenset(params)
226 return func
226 return func
227 return _decorator
227 return _decorator
228
228
229 class unbundlerecords(object):
229 class unbundlerecords(object):
230 """keep record of what happens during and unbundle
230 """keep record of what happens during and unbundle
231
231
232 New records are added using `records.add('cat', obj)`. Where 'cat' is a
232 New records are added using `records.add('cat', obj)`. Where 'cat' is a
233 category of record and obj is an arbitrary object.
233 category of record and obj is an arbitrary object.
234
234
235 `records['cat']` will return all entries of this category 'cat'.
235 `records['cat']` will return all entries of this category 'cat'.
236
236
237 Iterating on the object itself will yield `('category', obj)` tuples
237 Iterating on the object itself will yield `('category', obj)` tuples
238 for all entries.
238 for all entries.
239
239
240 All iterations happens in chronological order.
240 All iterations happens in chronological order.
241 """
241 """
242
242
243 def __init__(self):
243 def __init__(self):
244 self._categories = {}
244 self._categories = {}
245 self._sequences = []
245 self._sequences = []
246 self._replies = {}
246 self._replies = {}
247
247
248 def add(self, category, entry, inreplyto=None):
248 def add(self, category, entry, inreplyto=None):
249 """add a new record of a given category.
249 """add a new record of a given category.
250
250
251 The entry can then be retrieved in the list returned by
251 The entry can then be retrieved in the list returned by
252 self['category']."""
252 self['category']."""
253 self._categories.setdefault(category, []).append(entry)
253 self._categories.setdefault(category, []).append(entry)
254 self._sequences.append((category, entry))
254 self._sequences.append((category, entry))
255 if inreplyto is not None:
255 if inreplyto is not None:
256 self.getreplies(inreplyto).add(category, entry)
256 self.getreplies(inreplyto).add(category, entry)
257
257
258 def getreplies(self, partid):
258 def getreplies(self, partid):
259 """get the records that are replies to a specific part"""
259 """get the records that are replies to a specific part"""
260 return self._replies.setdefault(partid, unbundlerecords())
260 return self._replies.setdefault(partid, unbundlerecords())
261
261
262 def __getitem__(self, cat):
262 def __getitem__(self, cat):
263 return tuple(self._categories.get(cat, ()))
263 return tuple(self._categories.get(cat, ()))
264
264
265 def __iter__(self):
265 def __iter__(self):
266 return iter(self._sequences)
266 return iter(self._sequences)
267
267
268 def __len__(self):
268 def __len__(self):
269 return len(self._sequences)
269 return len(self._sequences)
270
270
271 def __nonzero__(self):
271 def __nonzero__(self):
272 return bool(self._sequences)
272 return bool(self._sequences)
273
273
274 class bundleoperation(object):
274 class bundleoperation(object):
275 """an object that represents a single bundling process
275 """an object that represents a single bundling process
276
276
277 Its purpose is to carry unbundle-related objects and states.
277 Its purpose is to carry unbundle-related objects and states.
278
278
279 A new object should be created at the beginning of each bundle processing.
279 A new object should be created at the beginning of each bundle processing.
280 The object is to be returned by the processing function.
280 The object is to be returned by the processing function.
281
281
282 The object has very little content now it will ultimately contain:
282 The object has very little content now it will ultimately contain:
283 * an access to the repo the bundle is applied to,
283 * an access to the repo the bundle is applied to,
284 * a ui object,
284 * a ui object,
285 * a way to retrieve a transaction to add changes to the repo,
285 * a way to retrieve a transaction to add changes to the repo,
286 * a way to record the result of processing each part,
286 * a way to record the result of processing each part,
287 * a way to construct a bundle response when applicable.
287 * a way to construct a bundle response when applicable.
288 """
288 """
289
289
290 def __init__(self, repo, transactiongetter, captureoutput=True):
290 def __init__(self, repo, transactiongetter, captureoutput=True):
291 self.repo = repo
291 self.repo = repo
292 self.ui = repo.ui
292 self.ui = repo.ui
293 self.records = unbundlerecords()
293 self.records = unbundlerecords()
294 self.gettransaction = transactiongetter
294 self.gettransaction = transactiongetter
295 self.reply = None
295 self.reply = None
296 self.captureoutput = captureoutput
296 self.captureoutput = captureoutput
297
297
298 class TransactionUnavailable(RuntimeError):
298 class TransactionUnavailable(RuntimeError):
299 pass
299 pass
300
300
301 def _notransaction():
301 def _notransaction():
302 """default method to get a transaction while processing a bundle
302 """default method to get a transaction while processing a bundle
303
303
304 Raise an exception to highlight the fact that no transaction was expected
304 Raise an exception to highlight the fact that no transaction was expected
305 to be created"""
305 to be created"""
306 raise TransactionUnavailable()
306 raise TransactionUnavailable()
307
307
308 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
308 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
309 # transform me into unbundler.apply() as soon as the freeze is lifted
309 # transform me into unbundler.apply() as soon as the freeze is lifted
310 tr.hookargs['bundle2'] = '1'
310 tr.hookargs['bundle2'] = '1'
311 if source is not None and 'source' not in tr.hookargs:
311 if source is not None and 'source' not in tr.hookargs:
312 tr.hookargs['source'] = source
312 tr.hookargs['source'] = source
313 if url is not None and 'url' not in tr.hookargs:
313 if url is not None and 'url' not in tr.hookargs:
314 tr.hookargs['url'] = url
314 tr.hookargs['url'] = url
315 return processbundle(repo, unbundler, lambda: tr, op=op)
315 return processbundle(repo, unbundler, lambda: tr, op=op)
316
316
317 def processbundle(repo, unbundler, transactiongetter=None, op=None):
317 def processbundle(repo, unbundler, transactiongetter=None, op=None):
318 """This function process a bundle, apply effect to/from a repo
318 """This function process a bundle, apply effect to/from a repo
319
319
320 It iterates over each part then searches for and uses the proper handling
320 It iterates over each part then searches for and uses the proper handling
321 code to process the part. Parts are processed in order.
321 code to process the part. Parts are processed in order.
322
322
323 This is very early version of this function that will be strongly reworked
323 This is very early version of this function that will be strongly reworked
324 before final usage.
324 before final usage.
325
325
326 Unknown Mandatory part will abort the process.
326 Unknown Mandatory part will abort the process.
327
327
328 It is temporarily possible to provide a prebuilt bundleoperation to the
328 It is temporarily possible to provide a prebuilt bundleoperation to the
329 function. This is used to ensure output is properly propagated in case of
329 function. This is used to ensure output is properly propagated in case of
330 an error during the unbundling. This output capturing part will likely be
330 an error during the unbundling. This output capturing part will likely be
331 reworked and this ability will probably go away in the process.
331 reworked and this ability will probably go away in the process.
332 """
332 """
333 if op is None:
333 if op is None:
334 if transactiongetter is None:
334 if transactiongetter is None:
335 transactiongetter = _notransaction
335 transactiongetter = _notransaction
336 op = bundleoperation(repo, transactiongetter)
336 op = bundleoperation(repo, transactiongetter)
337 # todo:
337 # todo:
338 # - replace this is a init function soon.
338 # - replace this is a init function soon.
339 # - exception catching
339 # - exception catching
340 unbundler.params
340 unbundler.params
341 if repo.ui.debugflag:
341 if repo.ui.debugflag:
342 msg = ['bundle2-input-bundle:']
342 msg = ['bundle2-input-bundle:']
343 if unbundler.params:
343 if unbundler.params:
344 msg.append(' %i params')
344 msg.append(' %i params')
345 if op.gettransaction is None:
345 if op.gettransaction is None:
346 msg.append(' no-transaction')
346 msg.append(' no-transaction')
347 else:
347 else:
348 msg.append(' with-transaction')
348 msg.append(' with-transaction')
349 msg.append('\n')
349 msg.append('\n')
350 repo.ui.debug(''.join(msg))
350 repo.ui.debug(''.join(msg))
351 iterparts = enumerate(unbundler.iterparts())
351 iterparts = enumerate(unbundler.iterparts())
352 part = None
352 part = None
353 nbpart = 0
353 nbpart = 0
354 try:
354 try:
355 for nbpart, part in iterparts:
355 for nbpart, part in iterparts:
356 _processpart(op, part)
356 _processpart(op, part)
357 except Exception as exc:
357 except Exception as exc:
358 for nbpart, part in iterparts:
358 for nbpart, part in iterparts:
359 # consume the bundle content
359 # consume the bundle content
360 part.seek(0, 2)
360 part.seek(0, 2)
361 # Small hack to let caller code distinguish exceptions from bundle2
361 # Small hack to let caller code distinguish exceptions from bundle2
362 # processing from processing the old format. This is mostly
362 # processing from processing the old format. This is mostly
363 # needed to handle different return codes to unbundle according to the
363 # needed to handle different return codes to unbundle according to the
364 # type of bundle. We should probably clean up or drop this return code
364 # type of bundle. We should probably clean up or drop this return code
365 # craziness in a future version.
365 # craziness in a future version.
366 exc.duringunbundle2 = True
366 exc.duringunbundle2 = True
367 salvaged = []
367 salvaged = []
368 replycaps = None
368 replycaps = None
369 if op.reply is not None:
369 if op.reply is not None:
370 salvaged = op.reply.salvageoutput()
370 salvaged = op.reply.salvageoutput()
371 replycaps = op.reply.capabilities
371 replycaps = op.reply.capabilities
372 exc._replycaps = replycaps
372 exc._replycaps = replycaps
373 exc._bundle2salvagedoutput = salvaged
373 exc._bundle2salvagedoutput = salvaged
374 raise
374 raise
375 finally:
375 finally:
376 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
376 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
377
377
378 return op
378 return op
379
379
380 def _processpart(op, part):
380 def _processpart(op, part):
381 """process a single part from a bundle
381 """process a single part from a bundle
382
382
383 The part is guaranteed to have been fully consumed when the function exits
383 The part is guaranteed to have been fully consumed when the function exits
384 (even if an exception is raised)."""
384 (even if an exception is raised)."""
385 status = 'unknown' # used by debug output
385 status = 'unknown' # used by debug output
386 hardabort = False
386 hardabort = False
387 try:
387 try:
388 try:
388 try:
389 handler = parthandlermapping.get(part.type)
389 handler = parthandlermapping.get(part.type)
390 if handler is None:
390 if handler is None:
391 status = 'unsupported-type'
391 status = 'unsupported-type'
392 raise error.BundleUnknownFeatureError(parttype=part.type)
392 raise error.BundleUnknownFeatureError(parttype=part.type)
393 indebug(op.ui, 'found a handler for part %r' % part.type)
393 indebug(op.ui, 'found a handler for part %r' % part.type)
394 unknownparams = part.mandatorykeys - handler.params
394 unknownparams = part.mandatorykeys - handler.params
395 if unknownparams:
395 if unknownparams:
396 unknownparams = list(unknownparams)
396 unknownparams = list(unknownparams)
397 unknownparams.sort()
397 unknownparams.sort()
398 status = 'unsupported-params (%s)' % unknownparams
398 status = 'unsupported-params (%s)' % unknownparams
399 raise error.BundleUnknownFeatureError(parttype=part.type,
399 raise error.BundleUnknownFeatureError(parttype=part.type,
400 params=unknownparams)
400 params=unknownparams)
401 status = 'supported'
401 status = 'supported'
402 except error.BundleUnknownFeatureError as exc:
402 except error.BundleUnknownFeatureError as exc:
403 if part.mandatory: # mandatory parts
403 if part.mandatory: # mandatory parts
404 raise
404 raise
405 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
405 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
406 return # skip to part processing
406 return # skip to part processing
407 finally:
407 finally:
408 if op.ui.debugflag:
408 if op.ui.debugflag:
409 msg = ['bundle2-input-part: "%s"' % part.type]
409 msg = ['bundle2-input-part: "%s"' % part.type]
410 if not part.mandatory:
410 if not part.mandatory:
411 msg.append(' (advisory)')
411 msg.append(' (advisory)')
412 nbmp = len(part.mandatorykeys)
412 nbmp = len(part.mandatorykeys)
413 nbap = len(part.params) - nbmp
413 nbap = len(part.params) - nbmp
414 if nbmp or nbap:
414 if nbmp or nbap:
415 msg.append(' (params:')
415 msg.append(' (params:')
416 if nbmp:
416 if nbmp:
417 msg.append(' %i mandatory' % nbmp)
417 msg.append(' %i mandatory' % nbmp)
418 if nbap:
418 if nbap:
419 msg.append(' %i advisory' % nbmp)
419 msg.append(' %i advisory' % nbmp)
420 msg.append(')')
420 msg.append(')')
421 msg.append(' %s\n' % status)
421 msg.append(' %s\n' % status)
422 op.ui.debug(''.join(msg))
422 op.ui.debug(''.join(msg))
423
423
424 # handler is called outside the above try block so that we don't
424 # handler is called outside the above try block so that we don't
425 # risk catching KeyErrors from anything other than the
425 # risk catching KeyErrors from anything other than the
426 # parthandlermapping lookup (any KeyError raised by handler()
426 # parthandlermapping lookup (any KeyError raised by handler()
427 # itself represents a defect of a different variety).
427 # itself represents a defect of a different variety).
428 output = None
428 output = None
429 if op.captureoutput and op.reply is not None:
429 if op.captureoutput and op.reply is not None:
430 op.ui.pushbuffer(error=True, subproc=True)
430 op.ui.pushbuffer(error=True, subproc=True)
431 output = ''
431 output = ''
432 try:
432 try:
433 handler(op, part)
433 handler(op, part)
434 finally:
434 finally:
435 if output is not None:
435 if output is not None:
436 output = op.ui.popbuffer()
436 output = op.ui.popbuffer()
437 if output:
437 if output:
438 outpart = op.reply.newpart('output', data=output,
438 outpart = op.reply.newpart('output', data=output,
439 mandatory=False)
439 mandatory=False)
440 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
440 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
441 # If exiting or interrupted, do not attempt to seek the stream in the
441 # If exiting or interrupted, do not attempt to seek the stream in the
442 # finally block below. This makes abort faster.
442 # finally block below. This makes abort faster.
443 except (SystemExit, KeyboardInterrupt):
443 except (SystemExit, KeyboardInterrupt):
444 hardabort = True
444 hardabort = True
445 raise
445 raise
446 finally:
446 finally:
447 # consume the part content to not corrupt the stream.
447 # consume the part content to not corrupt the stream.
448 if not hardabort:
448 if not hardabort:
449 part.seek(0, 2)
449 part.seek(0, 2)
450
450
451
451
452 def decodecaps(blob):
452 def decodecaps(blob):
453 """decode a bundle2 caps bytes blob into a dictionary
453 """decode a bundle2 caps bytes blob into a dictionary
454
454
455 The blob is a list of capabilities (one per line)
455 The blob is a list of capabilities (one per line)
456 Capabilities may have values using a line of the form::
456 Capabilities may have values using a line of the form::
457
457
458 capability=value1,value2,value3
458 capability=value1,value2,value3
459
459
460 The values are always a list."""
460 The values are always a list."""
461 caps = {}
461 caps = {}
462 for line in blob.splitlines():
462 for line in blob.splitlines():
463 if not line:
463 if not line:
464 continue
464 continue
465 if '=' not in line:
465 if '=' not in line:
466 key, vals = line, ()
466 key, vals = line, ()
467 else:
467 else:
468 key, vals = line.split('=', 1)
468 key, vals = line.split('=', 1)
469 vals = vals.split(',')
469 vals = vals.split(',')
470 key = urlreq.unquote(key)
470 key = urlreq.unquote(key)
471 vals = [urlreq.unquote(v) for v in vals]
471 vals = [urlreq.unquote(v) for v in vals]
472 caps[key] = vals
472 caps[key] = vals
473 return caps
473 return caps
474
474
475 def encodecaps(caps):
475 def encodecaps(caps):
476 """encode a bundle2 caps dictionary into a bytes blob"""
476 """encode a bundle2 caps dictionary into a bytes blob"""
477 chunks = []
477 chunks = []
478 for ca in sorted(caps):
478 for ca in sorted(caps):
479 vals = caps[ca]
479 vals = caps[ca]
480 ca = urlreq.quote(ca)
480 ca = urlreq.quote(ca)
481 vals = [urlreq.quote(v) for v in vals]
481 vals = [urlreq.quote(v) for v in vals]
482 if vals:
482 if vals:
483 ca = "%s=%s" % (ca, ','.join(vals))
483 ca = "%s=%s" % (ca, ','.join(vals))
484 chunks.append(ca)
484 chunks.append(ca)
485 return '\n'.join(chunks)
485 return '\n'.join(chunks)
486
486
487 bundletypes = {
487 bundletypes = {
488 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
488 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
489 # since the unification ssh accepts a header but there
489 # since the unification ssh accepts a header but there
490 # is no capability signaling it.
490 # is no capability signaling it.
491 "HG20": (), # special-cased below
491 "HG20": (), # special-cased below
492 "HG10UN": ("HG10UN", 'UN'),
492 "HG10UN": ("HG10UN", 'UN'),
493 "HG10BZ": ("HG10", 'BZ'),
493 "HG10BZ": ("HG10", 'BZ'),
494 "HG10GZ": ("HG10GZ", 'GZ'),
494 "HG10GZ": ("HG10GZ", 'GZ'),
495 }
495 }
496
496
497 # hgweb uses this list to communicate its preferred type
497 # hgweb uses this list to communicate its preferred type
498 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
498 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
499
499
500 class bundle20(object):
500 class bundle20(object):
501 """represent an outgoing bundle2 container
501 """represent an outgoing bundle2 container
502
502
503 Use the `addparam` method to add stream level parameter. and `newpart` to
503 Use the `addparam` method to add stream level parameter. and `newpart` to
504 populate it. Then call `getchunks` to retrieve all the binary chunks of
504 populate it. Then call `getchunks` to retrieve all the binary chunks of
505 data that compose the bundle2 container."""
505 data that compose the bundle2 container."""
506
506
507 _magicstring = 'HG20'
507 _magicstring = 'HG20'
508
508
509 def __init__(self, ui, capabilities=()):
509 def __init__(self, ui, capabilities=()):
510 self.ui = ui
510 self.ui = ui
511 self._params = []
511 self._params = []
512 self._parts = []
512 self._parts = []
513 self.capabilities = dict(capabilities)
513 self.capabilities = dict(capabilities)
514 self._compengine = util.compengines.forbundletype('UN')
514 self._compengine = util.compengines.forbundletype('UN')
515
515
516 def setcompression(self, alg):
516 def setcompression(self, alg):
517 """setup core part compression to <alg>"""
517 """setup core part compression to <alg>"""
518 if alg is None:
518 if alg is None:
519 return
519 return
520 assert not any(n.lower() == 'Compression' for n, v in self._params)
520 assert not any(n.lower() == 'Compression' for n, v in self._params)
521 self.addparam('Compression', alg)
521 self.addparam('Compression', alg)
522 self._compengine = util.compengines.forbundletype(alg)
522 self._compengine = util.compengines.forbundletype(alg)
523
523
524 @property
524 @property
525 def nbparts(self):
525 def nbparts(self):
526 """total number of parts added to the bundler"""
526 """total number of parts added to the bundler"""
527 return len(self._parts)
527 return len(self._parts)
528
528
529 # methods used to defines the bundle2 content
529 # methods used to defines the bundle2 content
530 def addparam(self, name, value=None):
530 def addparam(self, name, value=None):
531 """add a stream level parameter"""
531 """add a stream level parameter"""
532 if not name:
532 if not name:
533 raise ValueError('empty parameter name')
533 raise ValueError('empty parameter name')
534 if name[0] not in string.letters:
534 if name[0] not in string.letters:
535 raise ValueError('non letter first character: %r' % name)
535 raise ValueError('non letter first character: %r' % name)
536 self._params.append((name, value))
536 self._params.append((name, value))
537
537
538 def addpart(self, part):
538 def addpart(self, part):
539 """add a new part to the bundle2 container
539 """add a new part to the bundle2 container
540
540
541 Parts contains the actual applicative payload."""
541 Parts contains the actual applicative payload."""
542 assert part.id is None
542 assert part.id is None
543 part.id = len(self._parts) # very cheap counter
543 part.id = len(self._parts) # very cheap counter
544 self._parts.append(part)
544 self._parts.append(part)
545
545
546 def newpart(self, typeid, *args, **kwargs):
546 def newpart(self, typeid, *args, **kwargs):
547 """create a new part and add it to the containers
547 """create a new part and add it to the containers
548
548
549 As the part is directly added to the containers. For now, this means
549 As the part is directly added to the containers. For now, this means
550 that any failure to properly initialize the part after calling
550 that any failure to properly initialize the part after calling
551 ``newpart`` should result in a failure of the whole bundling process.
551 ``newpart`` should result in a failure of the whole bundling process.
552
552
553 You can still fall back to manually create and add if you need better
553 You can still fall back to manually create and add if you need better
554 control."""
554 control."""
555 part = bundlepart(typeid, *args, **kwargs)
555 part = bundlepart(typeid, *args, **kwargs)
556 self.addpart(part)
556 self.addpart(part)
557 return part
557 return part
558
558
559 # methods used to generate the bundle2 stream
559 # methods used to generate the bundle2 stream
560 def getchunks(self):
560 def getchunks(self):
561 if self.ui.debugflag:
561 if self.ui.debugflag:
562 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
562 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
563 if self._params:
563 if self._params:
564 msg.append(' (%i params)' % len(self._params))
564 msg.append(' (%i params)' % len(self._params))
565 msg.append(' %i parts total\n' % len(self._parts))
565 msg.append(' %i parts total\n' % len(self._parts))
566 self.ui.debug(''.join(msg))
566 self.ui.debug(''.join(msg))
567 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
567 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
568 yield self._magicstring
568 yield self._magicstring
569 param = self._paramchunk()
569 param = self._paramchunk()
570 outdebug(self.ui, 'bundle parameter: %s' % param)
570 outdebug(self.ui, 'bundle parameter: %s' % param)
571 yield _pack(_fstreamparamsize, len(param))
571 yield _pack(_fstreamparamsize, len(param))
572 if param:
572 if param:
573 yield param
573 yield param
574 # starting compression
574 for chunk in self._compengine.compressstream(self._getcorechunk()):
575 compressor = self._compengine.compressorobj()
575 yield chunk
576 for chunk in self._getcorechunk():
577 data = compressor.compress(chunk)
578 if data:
579 yield data
580 yield compressor.flush()
581
576
582 def _paramchunk(self):
577 def _paramchunk(self):
583 """return a encoded version of all stream parameters"""
578 """return a encoded version of all stream parameters"""
584 blocks = []
579 blocks = []
585 for par, value in self._params:
580 for par, value in self._params:
586 par = urlreq.quote(par)
581 par = urlreq.quote(par)
587 if value is not None:
582 if value is not None:
588 value = urlreq.quote(value)
583 value = urlreq.quote(value)
589 par = '%s=%s' % (par, value)
584 par = '%s=%s' % (par, value)
590 blocks.append(par)
585 blocks.append(par)
591 return ' '.join(blocks)
586 return ' '.join(blocks)
592
587
593 def _getcorechunk(self):
588 def _getcorechunk(self):
594 """yield chunk for the core part of the bundle
589 """yield chunk for the core part of the bundle
595
590
596 (all but headers and parameters)"""
591 (all but headers and parameters)"""
597 outdebug(self.ui, 'start of parts')
592 outdebug(self.ui, 'start of parts')
598 for part in self._parts:
593 for part in self._parts:
599 outdebug(self.ui, 'bundle part: "%s"' % part.type)
594 outdebug(self.ui, 'bundle part: "%s"' % part.type)
600 for chunk in part.getchunks(ui=self.ui):
595 for chunk in part.getchunks(ui=self.ui):
601 yield chunk
596 yield chunk
602 outdebug(self.ui, 'end of bundle')
597 outdebug(self.ui, 'end of bundle')
603 yield _pack(_fpartheadersize, 0)
598 yield _pack(_fpartheadersize, 0)
604
599
605
600
606 def salvageoutput(self):
601 def salvageoutput(self):
607 """return a list with a copy of all output parts in the bundle
602 """return a list with a copy of all output parts in the bundle
608
603
609 This is meant to be used during error handling to make sure we preserve
604 This is meant to be used during error handling to make sure we preserve
610 server output"""
605 server output"""
611 salvaged = []
606 salvaged = []
612 for part in self._parts:
607 for part in self._parts:
613 if part.type.startswith('output'):
608 if part.type.startswith('output'):
614 salvaged.append(part.copy())
609 salvaged.append(part.copy())
615 return salvaged
610 return salvaged
616
611
617
612
618 class unpackermixin(object):
613 class unpackermixin(object):
619 """A mixin to extract bytes and struct data from a stream"""
614 """A mixin to extract bytes and struct data from a stream"""
620
615
621 def __init__(self, fp):
616 def __init__(self, fp):
622 self._fp = fp
617 self._fp = fp
623 self._seekable = (util.safehasattr(fp, 'seek') and
618 self._seekable = (util.safehasattr(fp, 'seek') and
624 util.safehasattr(fp, 'tell'))
619 util.safehasattr(fp, 'tell'))
625
620
626 def _unpack(self, format):
621 def _unpack(self, format):
627 """unpack this struct format from the stream"""
622 """unpack this struct format from the stream"""
628 data = self._readexact(struct.calcsize(format))
623 data = self._readexact(struct.calcsize(format))
629 return _unpack(format, data)
624 return _unpack(format, data)
630
625
631 def _readexact(self, size):
626 def _readexact(self, size):
632 """read exactly <size> bytes from the stream"""
627 """read exactly <size> bytes from the stream"""
633 return changegroup.readexactly(self._fp, size)
628 return changegroup.readexactly(self._fp, size)
634
629
635 def seek(self, offset, whence=0):
630 def seek(self, offset, whence=0):
636 """move the underlying file pointer"""
631 """move the underlying file pointer"""
637 if self._seekable:
632 if self._seekable:
638 return self._fp.seek(offset, whence)
633 return self._fp.seek(offset, whence)
639 else:
634 else:
640 raise NotImplementedError(_('File pointer is not seekable'))
635 raise NotImplementedError(_('File pointer is not seekable'))
641
636
642 def tell(self):
637 def tell(self):
643 """return the file offset, or None if file is not seekable"""
638 """return the file offset, or None if file is not seekable"""
644 if self._seekable:
639 if self._seekable:
645 try:
640 try:
646 return self._fp.tell()
641 return self._fp.tell()
647 except IOError as e:
642 except IOError as e:
648 if e.errno == errno.ESPIPE:
643 if e.errno == errno.ESPIPE:
649 self._seekable = False
644 self._seekable = False
650 else:
645 else:
651 raise
646 raise
652 return None
647 return None
653
648
654 def close(self):
649 def close(self):
655 """close underlying file"""
650 """close underlying file"""
656 if util.safehasattr(self._fp, 'close'):
651 if util.safehasattr(self._fp, 'close'):
657 return self._fp.close()
652 return self._fp.close()
658
653
659 def getunbundler(ui, fp, magicstring=None):
654 def getunbundler(ui, fp, magicstring=None):
660 """return a valid unbundler object for a given magicstring"""
655 """return a valid unbundler object for a given magicstring"""
661 if magicstring is None:
656 if magicstring is None:
662 magicstring = changegroup.readexactly(fp, 4)
657 magicstring = changegroup.readexactly(fp, 4)
663 magic, version = magicstring[0:2], magicstring[2:4]
658 magic, version = magicstring[0:2], magicstring[2:4]
664 if magic != 'HG':
659 if magic != 'HG':
665 raise error.Abort(_('not a Mercurial bundle'))
660 raise error.Abort(_('not a Mercurial bundle'))
666 unbundlerclass = formatmap.get(version)
661 unbundlerclass = formatmap.get(version)
667 if unbundlerclass is None:
662 if unbundlerclass is None:
668 raise error.Abort(_('unknown bundle version %s') % version)
663 raise error.Abort(_('unknown bundle version %s') % version)
669 unbundler = unbundlerclass(ui, fp)
664 unbundler = unbundlerclass(ui, fp)
670 indebug(ui, 'start processing of %s stream' % magicstring)
665 indebug(ui, 'start processing of %s stream' % magicstring)
671 return unbundler
666 return unbundler
672
667
673 class unbundle20(unpackermixin):
668 class unbundle20(unpackermixin):
674 """interpret a bundle2 stream
669 """interpret a bundle2 stream
675
670
676 This class is fed with a binary stream and yields parts through its
671 This class is fed with a binary stream and yields parts through its
677 `iterparts` methods."""
672 `iterparts` methods."""
678
673
679 _magicstring = 'HG20'
674 _magicstring = 'HG20'
680
675
681 def __init__(self, ui, fp):
676 def __init__(self, ui, fp):
682 """If header is specified, we do not read it out of the stream."""
677 """If header is specified, we do not read it out of the stream."""
683 self.ui = ui
678 self.ui = ui
684 self._compengine = util.compengines.forbundletype('UN')
679 self._compengine = util.compengines.forbundletype('UN')
685 self._compressed = None
680 self._compressed = None
686 super(unbundle20, self).__init__(fp)
681 super(unbundle20, self).__init__(fp)
687
682
688 @util.propertycache
683 @util.propertycache
689 def params(self):
684 def params(self):
690 """dictionary of stream level parameters"""
685 """dictionary of stream level parameters"""
691 indebug(self.ui, 'reading bundle2 stream parameters')
686 indebug(self.ui, 'reading bundle2 stream parameters')
692 params = {}
687 params = {}
693 paramssize = self._unpack(_fstreamparamsize)[0]
688 paramssize = self._unpack(_fstreamparamsize)[0]
694 if paramssize < 0:
689 if paramssize < 0:
695 raise error.BundleValueError('negative bundle param size: %i'
690 raise error.BundleValueError('negative bundle param size: %i'
696 % paramssize)
691 % paramssize)
697 if paramssize:
692 if paramssize:
698 params = self._readexact(paramssize)
693 params = self._readexact(paramssize)
699 params = self._processallparams(params)
694 params = self._processallparams(params)
700 return params
695 return params
701
696
702 def _processallparams(self, paramsblock):
697 def _processallparams(self, paramsblock):
703 """"""
698 """"""
704 params = util.sortdict()
699 params = util.sortdict()
705 for p in paramsblock.split(' '):
700 for p in paramsblock.split(' '):
706 p = p.split('=', 1)
701 p = p.split('=', 1)
707 p = [urlreq.unquote(i) for i in p]
702 p = [urlreq.unquote(i) for i in p]
708 if len(p) < 2:
703 if len(p) < 2:
709 p.append(None)
704 p.append(None)
710 self._processparam(*p)
705 self._processparam(*p)
711 params[p[0]] = p[1]
706 params[p[0]] = p[1]
712 return params
707 return params
713
708
714
709
715 def _processparam(self, name, value):
710 def _processparam(self, name, value):
716 """process a parameter, applying its effect if needed
711 """process a parameter, applying its effect if needed
717
712
718 Parameter starting with a lower case letter are advisory and will be
713 Parameter starting with a lower case letter are advisory and will be
719 ignored when unknown. Those starting with an upper case letter are
714 ignored when unknown. Those starting with an upper case letter are
720 mandatory and will this function will raise a KeyError when unknown.
715 mandatory and will this function will raise a KeyError when unknown.
721
716
722 Note: no option are currently supported. Any input will be either
717 Note: no option are currently supported. Any input will be either
723 ignored or failing.
718 ignored or failing.
724 """
719 """
725 if not name:
720 if not name:
726 raise ValueError('empty parameter name')
721 raise ValueError('empty parameter name')
727 if name[0] not in string.letters:
722 if name[0] not in string.letters:
728 raise ValueError('non letter first character: %r' % name)
723 raise ValueError('non letter first character: %r' % name)
729 try:
724 try:
730 handler = b2streamparamsmap[name.lower()]
725 handler = b2streamparamsmap[name.lower()]
731 except KeyError:
726 except KeyError:
732 if name[0].islower():
727 if name[0].islower():
733 indebug(self.ui, "ignoring unknown parameter %r" % name)
728 indebug(self.ui, "ignoring unknown parameter %r" % name)
734 else:
729 else:
735 raise error.BundleUnknownFeatureError(params=(name,))
730 raise error.BundleUnknownFeatureError(params=(name,))
736 else:
731 else:
737 handler(self, name, value)
732 handler(self, name, value)
738
733
739 def _forwardchunks(self):
734 def _forwardchunks(self):
740 """utility to transfer a bundle2 as binary
735 """utility to transfer a bundle2 as binary
741
736
742 This is made necessary by the fact the 'getbundle' command over 'ssh'
737 This is made necessary by the fact the 'getbundle' command over 'ssh'
743 have no way to know then the reply end, relying on the bundle to be
738 have no way to know then the reply end, relying on the bundle to be
744 interpreted to know its end. This is terrible and we are sorry, but we
739 interpreted to know its end. This is terrible and we are sorry, but we
745 needed to move forward to get general delta enabled.
740 needed to move forward to get general delta enabled.
746 """
741 """
747 yield self._magicstring
742 yield self._magicstring
748 assert 'params' not in vars(self)
743 assert 'params' not in vars(self)
749 paramssize = self._unpack(_fstreamparamsize)[0]
744 paramssize = self._unpack(_fstreamparamsize)[0]
750 if paramssize < 0:
745 if paramssize < 0:
751 raise error.BundleValueError('negative bundle param size: %i'
746 raise error.BundleValueError('negative bundle param size: %i'
752 % paramssize)
747 % paramssize)
753 yield _pack(_fstreamparamsize, paramssize)
748 yield _pack(_fstreamparamsize, paramssize)
754 if paramssize:
749 if paramssize:
755 params = self._readexact(paramssize)
750 params = self._readexact(paramssize)
756 self._processallparams(params)
751 self._processallparams(params)
757 yield params
752 yield params
758 assert self._compengine.bundletype == 'UN'
753 assert self._compengine.bundletype == 'UN'
759 # From there, payload might need to be decompressed
754 # From there, payload might need to be decompressed
760 self._fp = self._compengine.decompressorreader(self._fp)
755 self._fp = self._compengine.decompressorreader(self._fp)
761 emptycount = 0
756 emptycount = 0
762 while emptycount < 2:
757 while emptycount < 2:
763 # so we can brainlessly loop
758 # so we can brainlessly loop
764 assert _fpartheadersize == _fpayloadsize
759 assert _fpartheadersize == _fpayloadsize
765 size = self._unpack(_fpartheadersize)[0]
760 size = self._unpack(_fpartheadersize)[0]
766 yield _pack(_fpartheadersize, size)
761 yield _pack(_fpartheadersize, size)
767 if size:
762 if size:
768 emptycount = 0
763 emptycount = 0
769 else:
764 else:
770 emptycount += 1
765 emptycount += 1
771 continue
766 continue
772 if size == flaginterrupt:
767 if size == flaginterrupt:
773 continue
768 continue
774 elif size < 0:
769 elif size < 0:
775 raise error.BundleValueError('negative chunk size: %i')
770 raise error.BundleValueError('negative chunk size: %i')
776 yield self._readexact(size)
771 yield self._readexact(size)
777
772
778
773
779 def iterparts(self):
774 def iterparts(self):
780 """yield all parts contained in the stream"""
775 """yield all parts contained in the stream"""
781 # make sure param have been loaded
776 # make sure param have been loaded
782 self.params
777 self.params
783 # From there, payload need to be decompressed
778 # From there, payload need to be decompressed
784 self._fp = self._compengine.decompressorreader(self._fp)
779 self._fp = self._compengine.decompressorreader(self._fp)
785 indebug(self.ui, 'start extraction of bundle2 parts')
780 indebug(self.ui, 'start extraction of bundle2 parts')
786 headerblock = self._readpartheader()
781 headerblock = self._readpartheader()
787 while headerblock is not None:
782 while headerblock is not None:
788 part = unbundlepart(self.ui, headerblock, self._fp)
783 part = unbundlepart(self.ui, headerblock, self._fp)
789 yield part
784 yield part
790 part.seek(0, 2)
785 part.seek(0, 2)
791 headerblock = self._readpartheader()
786 headerblock = self._readpartheader()
792 indebug(self.ui, 'end of bundle2 stream')
787 indebug(self.ui, 'end of bundle2 stream')
793
788
794 def _readpartheader(self):
789 def _readpartheader(self):
795 """reads a part header size and return the bytes blob
790 """reads a part header size and return the bytes blob
796
791
797 returns None if empty"""
792 returns None if empty"""
798 headersize = self._unpack(_fpartheadersize)[0]
793 headersize = self._unpack(_fpartheadersize)[0]
799 if headersize < 0:
794 if headersize < 0:
800 raise error.BundleValueError('negative part header size: %i'
795 raise error.BundleValueError('negative part header size: %i'
801 % headersize)
796 % headersize)
802 indebug(self.ui, 'part header size: %i' % headersize)
797 indebug(self.ui, 'part header size: %i' % headersize)
803 if headersize:
798 if headersize:
804 return self._readexact(headersize)
799 return self._readexact(headersize)
805 return None
800 return None
806
801
807 def compressed(self):
802 def compressed(self):
808 self.params # load params
803 self.params # load params
809 return self._compressed
804 return self._compressed
810
805
811 formatmap = {'20': unbundle20}
806 formatmap = {'20': unbundle20}
812
807
813 b2streamparamsmap = {}
808 b2streamparamsmap = {}
814
809
815 def b2streamparamhandler(name):
810 def b2streamparamhandler(name):
816 """register a handler for a stream level parameter"""
811 """register a handler for a stream level parameter"""
817 def decorator(func):
812 def decorator(func):
818 assert name not in formatmap
813 assert name not in formatmap
819 b2streamparamsmap[name] = func
814 b2streamparamsmap[name] = func
820 return func
815 return func
821 return decorator
816 return decorator
822
817
823 @b2streamparamhandler('compression')
818 @b2streamparamhandler('compression')
824 def processcompression(unbundler, param, value):
819 def processcompression(unbundler, param, value):
825 """read compression parameter and install payload decompression"""
820 """read compression parameter and install payload decompression"""
826 if value not in util.compengines.supportedbundletypes:
821 if value not in util.compengines.supportedbundletypes:
827 raise error.BundleUnknownFeatureError(params=(param,),
822 raise error.BundleUnknownFeatureError(params=(param,),
828 values=(value,))
823 values=(value,))
829 unbundler._compengine = util.compengines.forbundletype(value)
824 unbundler._compengine = util.compengines.forbundletype(value)
830 if value is not None:
825 if value is not None:
831 unbundler._compressed = True
826 unbundler._compressed = True
832
827
833 class bundlepart(object):
828 class bundlepart(object):
834 """A bundle2 part contains application level payload
829 """A bundle2 part contains application level payload
835
830
836 The part `type` is used to route the part to the application level
831 The part `type` is used to route the part to the application level
837 handler.
832 handler.
838
833
839 The part payload is contained in ``part.data``. It could be raw bytes or a
834 The part payload is contained in ``part.data``. It could be raw bytes or a
840 generator of byte chunks.
835 generator of byte chunks.
841
836
842 You can add parameters to the part using the ``addparam`` method.
837 You can add parameters to the part using the ``addparam`` method.
843 Parameters can be either mandatory (default) or advisory. Remote side
838 Parameters can be either mandatory (default) or advisory. Remote side
844 should be able to safely ignore the advisory ones.
839 should be able to safely ignore the advisory ones.
845
840
846 Both data and parameters cannot be modified after the generation has begun.
841 Both data and parameters cannot be modified after the generation has begun.
847 """
842 """
848
843
849 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
844 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
850 data='', mandatory=True):
845 data='', mandatory=True):
851 validateparttype(parttype)
846 validateparttype(parttype)
852 self.id = None
847 self.id = None
853 self.type = parttype
848 self.type = parttype
854 self._data = data
849 self._data = data
855 self._mandatoryparams = list(mandatoryparams)
850 self._mandatoryparams = list(mandatoryparams)
856 self._advisoryparams = list(advisoryparams)
851 self._advisoryparams = list(advisoryparams)
857 # checking for duplicated entries
852 # checking for duplicated entries
858 self._seenparams = set()
853 self._seenparams = set()
859 for pname, __ in self._mandatoryparams + self._advisoryparams:
854 for pname, __ in self._mandatoryparams + self._advisoryparams:
860 if pname in self._seenparams:
855 if pname in self._seenparams:
861 raise RuntimeError('duplicated params: %s' % pname)
856 raise RuntimeError('duplicated params: %s' % pname)
862 self._seenparams.add(pname)
857 self._seenparams.add(pname)
863 # status of the part's generation:
858 # status of the part's generation:
864 # - None: not started,
859 # - None: not started,
865 # - False: currently generated,
860 # - False: currently generated,
866 # - True: generation done.
861 # - True: generation done.
867 self._generated = None
862 self._generated = None
868 self.mandatory = mandatory
863 self.mandatory = mandatory
869
864
870 def copy(self):
865 def copy(self):
871 """return a copy of the part
866 """return a copy of the part
872
867
873 The new part have the very same content but no partid assigned yet.
868 The new part have the very same content but no partid assigned yet.
874 Parts with generated data cannot be copied."""
869 Parts with generated data cannot be copied."""
875 assert not util.safehasattr(self.data, 'next')
870 assert not util.safehasattr(self.data, 'next')
876 return self.__class__(self.type, self._mandatoryparams,
871 return self.__class__(self.type, self._mandatoryparams,
877 self._advisoryparams, self._data, self.mandatory)
872 self._advisoryparams, self._data, self.mandatory)
878
873
879 # methods used to defines the part content
874 # methods used to defines the part content
880 @property
875 @property
881 def data(self):
876 def data(self):
882 return self._data
877 return self._data
883
878
884 @data.setter
879 @data.setter
885 def data(self, data):
880 def data(self, data):
886 if self._generated is not None:
881 if self._generated is not None:
887 raise error.ReadOnlyPartError('part is being generated')
882 raise error.ReadOnlyPartError('part is being generated')
888 self._data = data
883 self._data = data
889
884
890 @property
885 @property
891 def mandatoryparams(self):
886 def mandatoryparams(self):
892 # make it an immutable tuple to force people through ``addparam``
887 # make it an immutable tuple to force people through ``addparam``
893 return tuple(self._mandatoryparams)
888 return tuple(self._mandatoryparams)
894
889
895 @property
890 @property
896 def advisoryparams(self):
891 def advisoryparams(self):
897 # make it an immutable tuple to force people through ``addparam``
892 # make it an immutable tuple to force people through ``addparam``
898 return tuple(self._advisoryparams)
893 return tuple(self._advisoryparams)
899
894
900 def addparam(self, name, value='', mandatory=True):
895 def addparam(self, name, value='', mandatory=True):
901 if self._generated is not None:
896 if self._generated is not None:
902 raise error.ReadOnlyPartError('part is being generated')
897 raise error.ReadOnlyPartError('part is being generated')
903 if name in self._seenparams:
898 if name in self._seenparams:
904 raise ValueError('duplicated params: %s' % name)
899 raise ValueError('duplicated params: %s' % name)
905 self._seenparams.add(name)
900 self._seenparams.add(name)
906 params = self._advisoryparams
901 params = self._advisoryparams
907 if mandatory:
902 if mandatory:
908 params = self._mandatoryparams
903 params = self._mandatoryparams
909 params.append((name, value))
904 params.append((name, value))
910
905
911 # methods used to generates the bundle2 stream
906 # methods used to generates the bundle2 stream
912 def getchunks(self, ui):
907 def getchunks(self, ui):
913 if self._generated is not None:
908 if self._generated is not None:
914 raise RuntimeError('part can only be consumed once')
909 raise RuntimeError('part can only be consumed once')
915 self._generated = False
910 self._generated = False
916
911
917 if ui.debugflag:
912 if ui.debugflag:
918 msg = ['bundle2-output-part: "%s"' % self.type]
913 msg = ['bundle2-output-part: "%s"' % self.type]
919 if not self.mandatory:
914 if not self.mandatory:
920 msg.append(' (advisory)')
915 msg.append(' (advisory)')
921 nbmp = len(self.mandatoryparams)
916 nbmp = len(self.mandatoryparams)
922 nbap = len(self.advisoryparams)
917 nbap = len(self.advisoryparams)
923 if nbmp or nbap:
918 if nbmp or nbap:
924 msg.append(' (params:')
919 msg.append(' (params:')
925 if nbmp:
920 if nbmp:
926 msg.append(' %i mandatory' % nbmp)
921 msg.append(' %i mandatory' % nbmp)
927 if nbap:
922 if nbap:
928 msg.append(' %i advisory' % nbmp)
923 msg.append(' %i advisory' % nbmp)
929 msg.append(')')
924 msg.append(')')
930 if not self.data:
925 if not self.data:
931 msg.append(' empty payload')
926 msg.append(' empty payload')
932 elif util.safehasattr(self.data, 'next'):
927 elif util.safehasattr(self.data, 'next'):
933 msg.append(' streamed payload')
928 msg.append(' streamed payload')
934 else:
929 else:
935 msg.append(' %i bytes payload' % len(self.data))
930 msg.append(' %i bytes payload' % len(self.data))
936 msg.append('\n')
931 msg.append('\n')
937 ui.debug(''.join(msg))
932 ui.debug(''.join(msg))
938
933
939 #### header
934 #### header
940 if self.mandatory:
935 if self.mandatory:
941 parttype = self.type.upper()
936 parttype = self.type.upper()
942 else:
937 else:
943 parttype = self.type.lower()
938 parttype = self.type.lower()
944 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
939 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
945 ## parttype
940 ## parttype
946 header = [_pack(_fparttypesize, len(parttype)),
941 header = [_pack(_fparttypesize, len(parttype)),
947 parttype, _pack(_fpartid, self.id),
942 parttype, _pack(_fpartid, self.id),
948 ]
943 ]
949 ## parameters
944 ## parameters
950 # count
945 # count
951 manpar = self.mandatoryparams
946 manpar = self.mandatoryparams
952 advpar = self.advisoryparams
947 advpar = self.advisoryparams
953 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
948 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
954 # size
949 # size
955 parsizes = []
950 parsizes = []
956 for key, value in manpar:
951 for key, value in manpar:
957 parsizes.append(len(key))
952 parsizes.append(len(key))
958 parsizes.append(len(value))
953 parsizes.append(len(value))
959 for key, value in advpar:
954 for key, value in advpar:
960 parsizes.append(len(key))
955 parsizes.append(len(key))
961 parsizes.append(len(value))
956 parsizes.append(len(value))
962 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
957 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
963 header.append(paramsizes)
958 header.append(paramsizes)
964 # key, value
959 # key, value
965 for key, value in manpar:
960 for key, value in manpar:
966 header.append(key)
961 header.append(key)
967 header.append(value)
962 header.append(value)
968 for key, value in advpar:
963 for key, value in advpar:
969 header.append(key)
964 header.append(key)
970 header.append(value)
965 header.append(value)
971 ## finalize header
966 ## finalize header
972 headerchunk = ''.join(header)
967 headerchunk = ''.join(header)
973 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
968 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
974 yield _pack(_fpartheadersize, len(headerchunk))
969 yield _pack(_fpartheadersize, len(headerchunk))
975 yield headerchunk
970 yield headerchunk
976 ## payload
971 ## payload
977 try:
972 try:
978 for chunk in self._payloadchunks():
973 for chunk in self._payloadchunks():
979 outdebug(ui, 'payload chunk size: %i' % len(chunk))
974 outdebug(ui, 'payload chunk size: %i' % len(chunk))
980 yield _pack(_fpayloadsize, len(chunk))
975 yield _pack(_fpayloadsize, len(chunk))
981 yield chunk
976 yield chunk
982 except GeneratorExit:
977 except GeneratorExit:
983 # GeneratorExit means that nobody is listening for our
978 # GeneratorExit means that nobody is listening for our
984 # results anyway, so just bail quickly rather than trying
979 # results anyway, so just bail quickly rather than trying
985 # to produce an error part.
980 # to produce an error part.
986 ui.debug('bundle2-generatorexit\n')
981 ui.debug('bundle2-generatorexit\n')
987 raise
982 raise
988 except BaseException as exc:
983 except BaseException as exc:
989 # backup exception data for later
984 # backup exception data for later
990 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
985 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
991 % exc)
986 % exc)
992 exc_info = sys.exc_info()
987 exc_info = sys.exc_info()
993 msg = 'unexpected error: %s' % exc
988 msg = 'unexpected error: %s' % exc
994 interpart = bundlepart('error:abort', [('message', msg)],
989 interpart = bundlepart('error:abort', [('message', msg)],
995 mandatory=False)
990 mandatory=False)
996 interpart.id = 0
991 interpart.id = 0
997 yield _pack(_fpayloadsize, -1)
992 yield _pack(_fpayloadsize, -1)
998 for chunk in interpart.getchunks(ui=ui):
993 for chunk in interpart.getchunks(ui=ui):
999 yield chunk
994 yield chunk
1000 outdebug(ui, 'closing payload chunk')
995 outdebug(ui, 'closing payload chunk')
1001 # abort current part payload
996 # abort current part payload
1002 yield _pack(_fpayloadsize, 0)
997 yield _pack(_fpayloadsize, 0)
1003 if pycompat.ispy3:
998 if pycompat.ispy3:
1004 raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
999 raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
1005 else:
1000 else:
1006 exec("""raise exc_info[0], exc_info[1], exc_info[2]""")
1001 exec("""raise exc_info[0], exc_info[1], exc_info[2]""")
1007 # end of payload
1002 # end of payload
1008 outdebug(ui, 'closing payload chunk')
1003 outdebug(ui, 'closing payload chunk')
1009 yield _pack(_fpayloadsize, 0)
1004 yield _pack(_fpayloadsize, 0)
1010 self._generated = True
1005 self._generated = True
1011
1006
1012 def _payloadchunks(self):
1007 def _payloadchunks(self):
1013 """yield chunks of a the part payload
1008 """yield chunks of a the part payload
1014
1009
1015 Exists to handle the different methods to provide data to a part."""
1010 Exists to handle the different methods to provide data to a part."""
1016 # we only support fixed size data now.
1011 # we only support fixed size data now.
1017 # This will be improved in the future.
1012 # This will be improved in the future.
1018 if util.safehasattr(self.data, 'next'):
1013 if util.safehasattr(self.data, 'next'):
1019 buff = util.chunkbuffer(self.data)
1014 buff = util.chunkbuffer(self.data)
1020 chunk = buff.read(preferedchunksize)
1015 chunk = buff.read(preferedchunksize)
1021 while chunk:
1016 while chunk:
1022 yield chunk
1017 yield chunk
1023 chunk = buff.read(preferedchunksize)
1018 chunk = buff.read(preferedchunksize)
1024 elif len(self.data):
1019 elif len(self.data):
1025 yield self.data
1020 yield self.data
1026
1021
1027
1022
1028 flaginterrupt = -1
1023 flaginterrupt = -1
1029
1024
1030 class interrupthandler(unpackermixin):
1025 class interrupthandler(unpackermixin):
1031 """read one part and process it with restricted capability
1026 """read one part and process it with restricted capability
1032
1027
1033 This allows to transmit exception raised on the producer size during part
1028 This allows to transmit exception raised on the producer size during part
1034 iteration while the consumer is reading a part.
1029 iteration while the consumer is reading a part.
1035
1030
1036 Part processed in this manner only have access to a ui object,"""
1031 Part processed in this manner only have access to a ui object,"""
1037
1032
1038 def __init__(self, ui, fp):
1033 def __init__(self, ui, fp):
1039 super(interrupthandler, self).__init__(fp)
1034 super(interrupthandler, self).__init__(fp)
1040 self.ui = ui
1035 self.ui = ui
1041
1036
1042 def _readpartheader(self):
1037 def _readpartheader(self):
1043 """reads a part header size and return the bytes blob
1038 """reads a part header size and return the bytes blob
1044
1039
1045 returns None if empty"""
1040 returns None if empty"""
1046 headersize = self._unpack(_fpartheadersize)[0]
1041 headersize = self._unpack(_fpartheadersize)[0]
1047 if headersize < 0:
1042 if headersize < 0:
1048 raise error.BundleValueError('negative part header size: %i'
1043 raise error.BundleValueError('negative part header size: %i'
1049 % headersize)
1044 % headersize)
1050 indebug(self.ui, 'part header size: %i\n' % headersize)
1045 indebug(self.ui, 'part header size: %i\n' % headersize)
1051 if headersize:
1046 if headersize:
1052 return self._readexact(headersize)
1047 return self._readexact(headersize)
1053 return None
1048 return None
1054
1049
1055 def __call__(self):
1050 def __call__(self):
1056
1051
1057 self.ui.debug('bundle2-input-stream-interrupt:'
1052 self.ui.debug('bundle2-input-stream-interrupt:'
1058 ' opening out of band context\n')
1053 ' opening out of band context\n')
1059 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1054 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1060 headerblock = self._readpartheader()
1055 headerblock = self._readpartheader()
1061 if headerblock is None:
1056 if headerblock is None:
1062 indebug(self.ui, 'no part found during interruption.')
1057 indebug(self.ui, 'no part found during interruption.')
1063 return
1058 return
1064 part = unbundlepart(self.ui, headerblock, self._fp)
1059 part = unbundlepart(self.ui, headerblock, self._fp)
1065 op = interruptoperation(self.ui)
1060 op = interruptoperation(self.ui)
1066 _processpart(op, part)
1061 _processpart(op, part)
1067 self.ui.debug('bundle2-input-stream-interrupt:'
1062 self.ui.debug('bundle2-input-stream-interrupt:'
1068 ' closing out of band context\n')
1063 ' closing out of band context\n')
1069
1064
1070 class interruptoperation(object):
1065 class interruptoperation(object):
1071 """A limited operation to be use by part handler during interruption
1066 """A limited operation to be use by part handler during interruption
1072
1067
1073 It only have access to an ui object.
1068 It only have access to an ui object.
1074 """
1069 """
1075
1070
1076 def __init__(self, ui):
1071 def __init__(self, ui):
1077 self.ui = ui
1072 self.ui = ui
1078 self.reply = None
1073 self.reply = None
1079 self.captureoutput = False
1074 self.captureoutput = False
1080
1075
1081 @property
1076 @property
1082 def repo(self):
1077 def repo(self):
1083 raise RuntimeError('no repo access from stream interruption')
1078 raise RuntimeError('no repo access from stream interruption')
1084
1079
1085 def gettransaction(self):
1080 def gettransaction(self):
1086 raise TransactionUnavailable('no repo access from stream interruption')
1081 raise TransactionUnavailable('no repo access from stream interruption')
1087
1082
1088 class unbundlepart(unpackermixin):
1083 class unbundlepart(unpackermixin):
1089 """a bundle part read from a bundle"""
1084 """a bundle part read from a bundle"""
1090
1085
1091 def __init__(self, ui, header, fp):
1086 def __init__(self, ui, header, fp):
1092 super(unbundlepart, self).__init__(fp)
1087 super(unbundlepart, self).__init__(fp)
1093 self.ui = ui
1088 self.ui = ui
1094 # unbundle state attr
1089 # unbundle state attr
1095 self._headerdata = header
1090 self._headerdata = header
1096 self._headeroffset = 0
1091 self._headeroffset = 0
1097 self._initialized = False
1092 self._initialized = False
1098 self.consumed = False
1093 self.consumed = False
1099 # part data
1094 # part data
1100 self.id = None
1095 self.id = None
1101 self.type = None
1096 self.type = None
1102 self.mandatoryparams = None
1097 self.mandatoryparams = None
1103 self.advisoryparams = None
1098 self.advisoryparams = None
1104 self.params = None
1099 self.params = None
1105 self.mandatorykeys = ()
1100 self.mandatorykeys = ()
1106 self._payloadstream = None
1101 self._payloadstream = None
1107 self._readheader()
1102 self._readheader()
1108 self._mandatory = None
1103 self._mandatory = None
1109 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1104 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1110 self._pos = 0
1105 self._pos = 0
1111
1106
1112 def _fromheader(self, size):
1107 def _fromheader(self, size):
1113 """return the next <size> byte from the header"""
1108 """return the next <size> byte from the header"""
1114 offset = self._headeroffset
1109 offset = self._headeroffset
1115 data = self._headerdata[offset:(offset + size)]
1110 data = self._headerdata[offset:(offset + size)]
1116 self._headeroffset = offset + size
1111 self._headeroffset = offset + size
1117 return data
1112 return data
1118
1113
1119 def _unpackheader(self, format):
1114 def _unpackheader(self, format):
1120 """read given format from header
1115 """read given format from header
1121
1116
1122 This automatically compute the size of the format to read."""
1117 This automatically compute the size of the format to read."""
1123 data = self._fromheader(struct.calcsize(format))
1118 data = self._fromheader(struct.calcsize(format))
1124 return _unpack(format, data)
1119 return _unpack(format, data)
1125
1120
1126 def _initparams(self, mandatoryparams, advisoryparams):
1121 def _initparams(self, mandatoryparams, advisoryparams):
1127 """internal function to setup all logic related parameters"""
1122 """internal function to setup all logic related parameters"""
1128 # make it read only to prevent people touching it by mistake.
1123 # make it read only to prevent people touching it by mistake.
1129 self.mandatoryparams = tuple(mandatoryparams)
1124 self.mandatoryparams = tuple(mandatoryparams)
1130 self.advisoryparams = tuple(advisoryparams)
1125 self.advisoryparams = tuple(advisoryparams)
1131 # user friendly UI
1126 # user friendly UI
1132 self.params = util.sortdict(self.mandatoryparams)
1127 self.params = util.sortdict(self.mandatoryparams)
1133 self.params.update(self.advisoryparams)
1128 self.params.update(self.advisoryparams)
1134 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1129 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1135
1130
1136 def _payloadchunks(self, chunknum=0):
1131 def _payloadchunks(self, chunknum=0):
1137 '''seek to specified chunk and start yielding data'''
1132 '''seek to specified chunk and start yielding data'''
1138 if len(self._chunkindex) == 0:
1133 if len(self._chunkindex) == 0:
1139 assert chunknum == 0, 'Must start with chunk 0'
1134 assert chunknum == 0, 'Must start with chunk 0'
1140 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1135 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1141 else:
1136 else:
1142 assert chunknum < len(self._chunkindex), \
1137 assert chunknum < len(self._chunkindex), \
1143 'Unknown chunk %d' % chunknum
1138 'Unknown chunk %d' % chunknum
1144 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1139 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1145
1140
1146 pos = self._chunkindex[chunknum][0]
1141 pos = self._chunkindex[chunknum][0]
1147 payloadsize = self._unpack(_fpayloadsize)[0]
1142 payloadsize = self._unpack(_fpayloadsize)[0]
1148 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1143 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1149 while payloadsize:
1144 while payloadsize:
1150 if payloadsize == flaginterrupt:
1145 if payloadsize == flaginterrupt:
1151 # interruption detection, the handler will now read a
1146 # interruption detection, the handler will now read a
1152 # single part and process it.
1147 # single part and process it.
1153 interrupthandler(self.ui, self._fp)()
1148 interrupthandler(self.ui, self._fp)()
1154 elif payloadsize < 0:
1149 elif payloadsize < 0:
1155 msg = 'negative payload chunk size: %i' % payloadsize
1150 msg = 'negative payload chunk size: %i' % payloadsize
1156 raise error.BundleValueError(msg)
1151 raise error.BundleValueError(msg)
1157 else:
1152 else:
1158 result = self._readexact(payloadsize)
1153 result = self._readexact(payloadsize)
1159 chunknum += 1
1154 chunknum += 1
1160 pos += payloadsize
1155 pos += payloadsize
1161 if chunknum == len(self._chunkindex):
1156 if chunknum == len(self._chunkindex):
1162 self._chunkindex.append((pos,
1157 self._chunkindex.append((pos,
1163 super(unbundlepart, self).tell()))
1158 super(unbundlepart, self).tell()))
1164 yield result
1159 yield result
1165 payloadsize = self._unpack(_fpayloadsize)[0]
1160 payloadsize = self._unpack(_fpayloadsize)[0]
1166 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1161 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1167
1162
1168 def _findchunk(self, pos):
1163 def _findchunk(self, pos):
1169 '''for a given payload position, return a chunk number and offset'''
1164 '''for a given payload position, return a chunk number and offset'''
1170 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1165 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1171 if ppos == pos:
1166 if ppos == pos:
1172 return chunk, 0
1167 return chunk, 0
1173 elif ppos > pos:
1168 elif ppos > pos:
1174 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1169 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1175 raise ValueError('Unknown chunk')
1170 raise ValueError('Unknown chunk')
1176
1171
1177 def _readheader(self):
1172 def _readheader(self):
1178 """read the header and setup the object"""
1173 """read the header and setup the object"""
1179 typesize = self._unpackheader(_fparttypesize)[0]
1174 typesize = self._unpackheader(_fparttypesize)[0]
1180 self.type = self._fromheader(typesize)
1175 self.type = self._fromheader(typesize)
1181 indebug(self.ui, 'part type: "%s"' % self.type)
1176 indebug(self.ui, 'part type: "%s"' % self.type)
1182 self.id = self._unpackheader(_fpartid)[0]
1177 self.id = self._unpackheader(_fpartid)[0]
1183 indebug(self.ui, 'part id: "%s"' % self.id)
1178 indebug(self.ui, 'part id: "%s"' % self.id)
1184 # extract mandatory bit from type
1179 # extract mandatory bit from type
1185 self.mandatory = (self.type != self.type.lower())
1180 self.mandatory = (self.type != self.type.lower())
1186 self.type = self.type.lower()
1181 self.type = self.type.lower()
1187 ## reading parameters
1182 ## reading parameters
1188 # param count
1183 # param count
1189 mancount, advcount = self._unpackheader(_fpartparamcount)
1184 mancount, advcount = self._unpackheader(_fpartparamcount)
1190 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1185 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1191 # param size
1186 # param size
1192 fparamsizes = _makefpartparamsizes(mancount + advcount)
1187 fparamsizes = _makefpartparamsizes(mancount + advcount)
1193 paramsizes = self._unpackheader(fparamsizes)
1188 paramsizes = self._unpackheader(fparamsizes)
1194 # make it a list of couple again
1189 # make it a list of couple again
1195 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1190 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1196 # split mandatory from advisory
1191 # split mandatory from advisory
1197 mansizes = paramsizes[:mancount]
1192 mansizes = paramsizes[:mancount]
1198 advsizes = paramsizes[mancount:]
1193 advsizes = paramsizes[mancount:]
1199 # retrieve param value
1194 # retrieve param value
1200 manparams = []
1195 manparams = []
1201 for key, value in mansizes:
1196 for key, value in mansizes:
1202 manparams.append((self._fromheader(key), self._fromheader(value)))
1197 manparams.append((self._fromheader(key), self._fromheader(value)))
1203 advparams = []
1198 advparams = []
1204 for key, value in advsizes:
1199 for key, value in advsizes:
1205 advparams.append((self._fromheader(key), self._fromheader(value)))
1200 advparams.append((self._fromheader(key), self._fromheader(value)))
1206 self._initparams(manparams, advparams)
1201 self._initparams(manparams, advparams)
1207 ## part payload
1202 ## part payload
1208 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1203 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1209 # we read the data, tell it
1204 # we read the data, tell it
1210 self._initialized = True
1205 self._initialized = True
1211
1206
1212 def read(self, size=None):
1207 def read(self, size=None):
1213 """read payload data"""
1208 """read payload data"""
1214 if not self._initialized:
1209 if not self._initialized:
1215 self._readheader()
1210 self._readheader()
1216 if size is None:
1211 if size is None:
1217 data = self._payloadstream.read()
1212 data = self._payloadstream.read()
1218 else:
1213 else:
1219 data = self._payloadstream.read(size)
1214 data = self._payloadstream.read(size)
1220 self._pos += len(data)
1215 self._pos += len(data)
1221 if size is None or len(data) < size:
1216 if size is None or len(data) < size:
1222 if not self.consumed and self._pos:
1217 if not self.consumed and self._pos:
1223 self.ui.debug('bundle2-input-part: total payload size %i\n'
1218 self.ui.debug('bundle2-input-part: total payload size %i\n'
1224 % self._pos)
1219 % self._pos)
1225 self.consumed = True
1220 self.consumed = True
1226 return data
1221 return data
1227
1222
1228 def tell(self):
1223 def tell(self):
1229 return self._pos
1224 return self._pos
1230
1225
1231 def seek(self, offset, whence=0):
1226 def seek(self, offset, whence=0):
1232 if whence == 0:
1227 if whence == 0:
1233 newpos = offset
1228 newpos = offset
1234 elif whence == 1:
1229 elif whence == 1:
1235 newpos = self._pos + offset
1230 newpos = self._pos + offset
1236 elif whence == 2:
1231 elif whence == 2:
1237 if not self.consumed:
1232 if not self.consumed:
1238 self.read()
1233 self.read()
1239 newpos = self._chunkindex[-1][0] - offset
1234 newpos = self._chunkindex[-1][0] - offset
1240 else:
1235 else:
1241 raise ValueError('Unknown whence value: %r' % (whence,))
1236 raise ValueError('Unknown whence value: %r' % (whence,))
1242
1237
1243 if newpos > self._chunkindex[-1][0] and not self.consumed:
1238 if newpos > self._chunkindex[-1][0] and not self.consumed:
1244 self.read()
1239 self.read()
1245 if not 0 <= newpos <= self._chunkindex[-1][0]:
1240 if not 0 <= newpos <= self._chunkindex[-1][0]:
1246 raise ValueError('Offset out of range')
1241 raise ValueError('Offset out of range')
1247
1242
1248 if self._pos != newpos:
1243 if self._pos != newpos:
1249 chunk, internaloffset = self._findchunk(newpos)
1244 chunk, internaloffset = self._findchunk(newpos)
1250 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1245 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1251 adjust = self.read(internaloffset)
1246 adjust = self.read(internaloffset)
1252 if len(adjust) != internaloffset:
1247 if len(adjust) != internaloffset:
1253 raise error.Abort(_('Seek failed\n'))
1248 raise error.Abort(_('Seek failed\n'))
1254 self._pos = newpos
1249 self._pos = newpos
1255
1250
1256 # These are only the static capabilities.
1251 # These are only the static capabilities.
1257 # Check the 'getrepocaps' function for the rest.
1252 # Check the 'getrepocaps' function for the rest.
1258 capabilities = {'HG20': (),
1253 capabilities = {'HG20': (),
1259 'error': ('abort', 'unsupportedcontent', 'pushraced',
1254 'error': ('abort', 'unsupportedcontent', 'pushraced',
1260 'pushkey'),
1255 'pushkey'),
1261 'listkeys': (),
1256 'listkeys': (),
1262 'pushkey': (),
1257 'pushkey': (),
1263 'digests': tuple(sorted(util.DIGESTS.keys())),
1258 'digests': tuple(sorted(util.DIGESTS.keys())),
1264 'remote-changegroup': ('http', 'https'),
1259 'remote-changegroup': ('http', 'https'),
1265 'hgtagsfnodes': (),
1260 'hgtagsfnodes': (),
1266 }
1261 }
1267
1262
1268 def getrepocaps(repo, allowpushback=False):
1263 def getrepocaps(repo, allowpushback=False):
1269 """return the bundle2 capabilities for a given repo
1264 """return the bundle2 capabilities for a given repo
1270
1265
1271 Exists to allow extensions (like evolution) to mutate the capabilities.
1266 Exists to allow extensions (like evolution) to mutate the capabilities.
1272 """
1267 """
1273 caps = capabilities.copy()
1268 caps = capabilities.copy()
1274 caps['changegroup'] = tuple(sorted(
1269 caps['changegroup'] = tuple(sorted(
1275 changegroup.supportedincomingversions(repo)))
1270 changegroup.supportedincomingversions(repo)))
1276 if obsolete.isenabled(repo, obsolete.exchangeopt):
1271 if obsolete.isenabled(repo, obsolete.exchangeopt):
1277 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1272 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1278 caps['obsmarkers'] = supportedformat
1273 caps['obsmarkers'] = supportedformat
1279 if allowpushback:
1274 if allowpushback:
1280 caps['pushback'] = ()
1275 caps['pushback'] = ()
1281 return caps
1276 return caps
1282
1277
1283 def bundle2caps(remote):
1278 def bundle2caps(remote):
1284 """return the bundle capabilities of a peer as dict"""
1279 """return the bundle capabilities of a peer as dict"""
1285 raw = remote.capable('bundle2')
1280 raw = remote.capable('bundle2')
1286 if not raw and raw != '':
1281 if not raw and raw != '':
1287 return {}
1282 return {}
1288 capsblob = urlreq.unquote(remote.capable('bundle2'))
1283 capsblob = urlreq.unquote(remote.capable('bundle2'))
1289 return decodecaps(capsblob)
1284 return decodecaps(capsblob)
1290
1285
1291 def obsmarkersversion(caps):
1286 def obsmarkersversion(caps):
1292 """extract the list of supported obsmarkers versions from a bundle2caps dict
1287 """extract the list of supported obsmarkers versions from a bundle2caps dict
1293 """
1288 """
1294 obscaps = caps.get('obsmarkers', ())
1289 obscaps = caps.get('obsmarkers', ())
1295 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1290 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1296
1291
1297 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
1292 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
1298 """Write a bundle file and return its filename.
1293 """Write a bundle file and return its filename.
1299
1294
1300 Existing files will not be overwritten.
1295 Existing files will not be overwritten.
1301 If no filename is specified, a temporary file is created.
1296 If no filename is specified, a temporary file is created.
1302 bz2 compression can be turned off.
1297 bz2 compression can be turned off.
1303 The bundle file will be deleted in case of errors.
1298 The bundle file will be deleted in case of errors.
1304 """
1299 """
1305
1300
1306 if bundletype == "HG20":
1301 if bundletype == "HG20":
1307 bundle = bundle20(ui)
1302 bundle = bundle20(ui)
1308 bundle.setcompression(compression)
1303 bundle.setcompression(compression)
1309 part = bundle.newpart('changegroup', data=cg.getchunks())
1304 part = bundle.newpart('changegroup', data=cg.getchunks())
1310 part.addparam('version', cg.version)
1305 part.addparam('version', cg.version)
1311 if 'clcount' in cg.extras:
1306 if 'clcount' in cg.extras:
1312 part.addparam('nbchanges', str(cg.extras['clcount']),
1307 part.addparam('nbchanges', str(cg.extras['clcount']),
1313 mandatory=False)
1308 mandatory=False)
1314 chunkiter = bundle.getchunks()
1309 chunkiter = bundle.getchunks()
1315 else:
1310 else:
1316 # compression argument is only for the bundle2 case
1311 # compression argument is only for the bundle2 case
1317 assert compression is None
1312 assert compression is None
1318 if cg.version != '01':
1313 if cg.version != '01':
1319 raise error.Abort(_('old bundle types only supports v1 '
1314 raise error.Abort(_('old bundle types only supports v1 '
1320 'changegroups'))
1315 'changegroups'))
1321 header, comp = bundletypes[bundletype]
1316 header, comp = bundletypes[bundletype]
1322 if comp not in util.compengines.supportedbundletypes:
1317 if comp not in util.compengines.supportedbundletypes:
1323 raise error.Abort(_('unknown stream compression type: %s')
1318 raise error.Abort(_('unknown stream compression type: %s')
1324 % comp)
1319 % comp)
1325 compengine = util.compengines.forbundletype(comp)
1320 compengine = util.compengines.forbundletype(comp)
1326 compressor = compengine.compressorobj()
1327 subchunkiter = cg.getchunks()
1328 def chunkiter():
1321 def chunkiter():
1329 yield header
1322 yield header
1330 for chunk in subchunkiter:
1323 for chunk in compengine.compressstream(cg.getchunks()):
1331 data = compressor.compress(chunk)
1324 yield chunk
1332 if data:
1333 yield data
1334 yield compressor.flush()
1335 chunkiter = chunkiter()
1325 chunkiter = chunkiter()
1336
1326
1337 # parse the changegroup data, otherwise we will block
1327 # parse the changegroup data, otherwise we will block
1338 # in case of sshrepo because we don't know the end of the stream
1328 # in case of sshrepo because we don't know the end of the stream
1339 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1329 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1340
1330
1341 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1331 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1342 def handlechangegroup(op, inpart):
1332 def handlechangegroup(op, inpart):
1343 """apply a changegroup part on the repo
1333 """apply a changegroup part on the repo
1344
1334
1345 This is a very early implementation that will massive rework before being
1335 This is a very early implementation that will massive rework before being
1346 inflicted to any end-user.
1336 inflicted to any end-user.
1347 """
1337 """
1348 # Make sure we trigger a transaction creation
1338 # Make sure we trigger a transaction creation
1349 #
1339 #
1350 # The addchangegroup function will get a transaction object by itself, but
1340 # The addchangegroup function will get a transaction object by itself, but
1351 # we need to make sure we trigger the creation of a transaction object used
1341 # we need to make sure we trigger the creation of a transaction object used
1352 # for the whole processing scope.
1342 # for the whole processing scope.
1353 op.gettransaction()
1343 op.gettransaction()
1354 unpackerversion = inpart.params.get('version', '01')
1344 unpackerversion = inpart.params.get('version', '01')
1355 # We should raise an appropriate exception here
1345 # We should raise an appropriate exception here
1356 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1346 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1357 # the source and url passed here are overwritten by the one contained in
1347 # the source and url passed here are overwritten by the one contained in
1358 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1348 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1359 nbchangesets = None
1349 nbchangesets = None
1360 if 'nbchanges' in inpart.params:
1350 if 'nbchanges' in inpart.params:
1361 nbchangesets = int(inpart.params.get('nbchanges'))
1351 nbchangesets = int(inpart.params.get('nbchanges'))
1362 if ('treemanifest' in inpart.params and
1352 if ('treemanifest' in inpart.params and
1363 'treemanifest' not in op.repo.requirements):
1353 'treemanifest' not in op.repo.requirements):
1364 if len(op.repo.changelog) != 0:
1354 if len(op.repo.changelog) != 0:
1365 raise error.Abort(_(
1355 raise error.Abort(_(
1366 "bundle contains tree manifests, but local repo is "
1356 "bundle contains tree manifests, but local repo is "
1367 "non-empty and does not use tree manifests"))
1357 "non-empty and does not use tree manifests"))
1368 op.repo.requirements.add('treemanifest')
1358 op.repo.requirements.add('treemanifest')
1369 op.repo._applyopenerreqs()
1359 op.repo._applyopenerreqs()
1370 op.repo._writerequirements()
1360 op.repo._writerequirements()
1371 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1361 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1372 op.records.add('changegroup', {'return': ret})
1362 op.records.add('changegroup', {'return': ret})
1373 if op.reply is not None:
1363 if op.reply is not None:
1374 # This is definitely not the final form of this
1364 # This is definitely not the final form of this
1375 # return. But one need to start somewhere.
1365 # return. But one need to start somewhere.
1376 part = op.reply.newpart('reply:changegroup', mandatory=False)
1366 part = op.reply.newpart('reply:changegroup', mandatory=False)
1377 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1367 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1378 part.addparam('return', '%i' % ret, mandatory=False)
1368 part.addparam('return', '%i' % ret, mandatory=False)
1379 assert not inpart.read()
1369 assert not inpart.read()
1380
1370
1381 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1371 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1382 ['digest:%s' % k for k in util.DIGESTS.keys()])
1372 ['digest:%s' % k for k in util.DIGESTS.keys()])
1383 @parthandler('remote-changegroup', _remotechangegroupparams)
1373 @parthandler('remote-changegroup', _remotechangegroupparams)
1384 def handleremotechangegroup(op, inpart):
1374 def handleremotechangegroup(op, inpart):
1385 """apply a bundle10 on the repo, given an url and validation information
1375 """apply a bundle10 on the repo, given an url and validation information
1386
1376
1387 All the information about the remote bundle to import are given as
1377 All the information about the remote bundle to import are given as
1388 parameters. The parameters include:
1378 parameters. The parameters include:
1389 - url: the url to the bundle10.
1379 - url: the url to the bundle10.
1390 - size: the bundle10 file size. It is used to validate what was
1380 - size: the bundle10 file size. It is used to validate what was
1391 retrieved by the client matches the server knowledge about the bundle.
1381 retrieved by the client matches the server knowledge about the bundle.
1392 - digests: a space separated list of the digest types provided as
1382 - digests: a space separated list of the digest types provided as
1393 parameters.
1383 parameters.
1394 - digest:<digest-type>: the hexadecimal representation of the digest with
1384 - digest:<digest-type>: the hexadecimal representation of the digest with
1395 that name. Like the size, it is used to validate what was retrieved by
1385 that name. Like the size, it is used to validate what was retrieved by
1396 the client matches what the server knows about the bundle.
1386 the client matches what the server knows about the bundle.
1397
1387
1398 When multiple digest types are given, all of them are checked.
1388 When multiple digest types are given, all of them are checked.
1399 """
1389 """
1400 try:
1390 try:
1401 raw_url = inpart.params['url']
1391 raw_url = inpart.params['url']
1402 except KeyError:
1392 except KeyError:
1403 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1393 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1404 parsed_url = util.url(raw_url)
1394 parsed_url = util.url(raw_url)
1405 if parsed_url.scheme not in capabilities['remote-changegroup']:
1395 if parsed_url.scheme not in capabilities['remote-changegroup']:
1406 raise error.Abort(_('remote-changegroup does not support %s urls') %
1396 raise error.Abort(_('remote-changegroup does not support %s urls') %
1407 parsed_url.scheme)
1397 parsed_url.scheme)
1408
1398
1409 try:
1399 try:
1410 size = int(inpart.params['size'])
1400 size = int(inpart.params['size'])
1411 except ValueError:
1401 except ValueError:
1412 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1402 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1413 % 'size')
1403 % 'size')
1414 except KeyError:
1404 except KeyError:
1415 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1405 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1416
1406
1417 digests = {}
1407 digests = {}
1418 for typ in inpart.params.get('digests', '').split():
1408 for typ in inpart.params.get('digests', '').split():
1419 param = 'digest:%s' % typ
1409 param = 'digest:%s' % typ
1420 try:
1410 try:
1421 value = inpart.params[param]
1411 value = inpart.params[param]
1422 except KeyError:
1412 except KeyError:
1423 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1413 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1424 param)
1414 param)
1425 digests[typ] = value
1415 digests[typ] = value
1426
1416
1427 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1417 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1428
1418
1429 # Make sure we trigger a transaction creation
1419 # Make sure we trigger a transaction creation
1430 #
1420 #
1431 # The addchangegroup function will get a transaction object by itself, but
1421 # The addchangegroup function will get a transaction object by itself, but
1432 # we need to make sure we trigger the creation of a transaction object used
1422 # we need to make sure we trigger the creation of a transaction object used
1433 # for the whole processing scope.
1423 # for the whole processing scope.
1434 op.gettransaction()
1424 op.gettransaction()
1435 from . import exchange
1425 from . import exchange
1436 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1426 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1437 if not isinstance(cg, changegroup.cg1unpacker):
1427 if not isinstance(cg, changegroup.cg1unpacker):
1438 raise error.Abort(_('%s: not a bundle version 1.0') %
1428 raise error.Abort(_('%s: not a bundle version 1.0') %
1439 util.hidepassword(raw_url))
1429 util.hidepassword(raw_url))
1440 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1430 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1441 op.records.add('changegroup', {'return': ret})
1431 op.records.add('changegroup', {'return': ret})
1442 if op.reply is not None:
1432 if op.reply is not None:
1443 # This is definitely not the final form of this
1433 # This is definitely not the final form of this
1444 # return. But one need to start somewhere.
1434 # return. But one need to start somewhere.
1445 part = op.reply.newpart('reply:changegroup')
1435 part = op.reply.newpart('reply:changegroup')
1446 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1436 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1447 part.addparam('return', '%i' % ret, mandatory=False)
1437 part.addparam('return', '%i' % ret, mandatory=False)
1448 try:
1438 try:
1449 real_part.validate()
1439 real_part.validate()
1450 except error.Abort as e:
1440 except error.Abort as e:
1451 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1441 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1452 (util.hidepassword(raw_url), str(e)))
1442 (util.hidepassword(raw_url), str(e)))
1453 assert not inpart.read()
1443 assert not inpart.read()
1454
1444
1455 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1445 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1456 def handlereplychangegroup(op, inpart):
1446 def handlereplychangegroup(op, inpart):
1457 ret = int(inpart.params['return'])
1447 ret = int(inpart.params['return'])
1458 replyto = int(inpart.params['in-reply-to'])
1448 replyto = int(inpart.params['in-reply-to'])
1459 op.records.add('changegroup', {'return': ret}, replyto)
1449 op.records.add('changegroup', {'return': ret}, replyto)
1460
1450
1461 @parthandler('check:heads')
1451 @parthandler('check:heads')
1462 def handlecheckheads(op, inpart):
1452 def handlecheckheads(op, inpart):
1463 """check that head of the repo did not change
1453 """check that head of the repo did not change
1464
1454
1465 This is used to detect a push race when using unbundle.
1455 This is used to detect a push race when using unbundle.
1466 This replaces the "heads" argument of unbundle."""
1456 This replaces the "heads" argument of unbundle."""
1467 h = inpart.read(20)
1457 h = inpart.read(20)
1468 heads = []
1458 heads = []
1469 while len(h) == 20:
1459 while len(h) == 20:
1470 heads.append(h)
1460 heads.append(h)
1471 h = inpart.read(20)
1461 h = inpart.read(20)
1472 assert not h
1462 assert not h
1473 # Trigger a transaction so that we are guaranteed to have the lock now.
1463 # Trigger a transaction so that we are guaranteed to have the lock now.
1474 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1464 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1475 op.gettransaction()
1465 op.gettransaction()
1476 if sorted(heads) != sorted(op.repo.heads()):
1466 if sorted(heads) != sorted(op.repo.heads()):
1477 raise error.PushRaced('repository changed while pushing - '
1467 raise error.PushRaced('repository changed while pushing - '
1478 'please try again')
1468 'please try again')
1479
1469
1480 @parthandler('output')
1470 @parthandler('output')
1481 def handleoutput(op, inpart):
1471 def handleoutput(op, inpart):
1482 """forward output captured on the server to the client"""
1472 """forward output captured on the server to the client"""
1483 for line in inpart.read().splitlines():
1473 for line in inpart.read().splitlines():
1484 op.ui.status(_('remote: %s\n') % line)
1474 op.ui.status(_('remote: %s\n') % line)
1485
1475
1486 @parthandler('replycaps')
1476 @parthandler('replycaps')
1487 def handlereplycaps(op, inpart):
1477 def handlereplycaps(op, inpart):
1488 """Notify that a reply bundle should be created
1478 """Notify that a reply bundle should be created
1489
1479
1490 The payload contains the capabilities information for the reply"""
1480 The payload contains the capabilities information for the reply"""
1491 caps = decodecaps(inpart.read())
1481 caps = decodecaps(inpart.read())
1492 if op.reply is None:
1482 if op.reply is None:
1493 op.reply = bundle20(op.ui, caps)
1483 op.reply = bundle20(op.ui, caps)
1494
1484
1495 class AbortFromPart(error.Abort):
1485 class AbortFromPart(error.Abort):
1496 """Sub-class of Abort that denotes an error from a bundle2 part."""
1486 """Sub-class of Abort that denotes an error from a bundle2 part."""
1497
1487
1498 @parthandler('error:abort', ('message', 'hint'))
1488 @parthandler('error:abort', ('message', 'hint'))
1499 def handleerrorabort(op, inpart):
1489 def handleerrorabort(op, inpart):
1500 """Used to transmit abort error over the wire"""
1490 """Used to transmit abort error over the wire"""
1501 raise AbortFromPart(inpart.params['message'],
1491 raise AbortFromPart(inpart.params['message'],
1502 hint=inpart.params.get('hint'))
1492 hint=inpart.params.get('hint'))
1503
1493
1504 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1494 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1505 'in-reply-to'))
1495 'in-reply-to'))
1506 def handleerrorpushkey(op, inpart):
1496 def handleerrorpushkey(op, inpart):
1507 """Used to transmit failure of a mandatory pushkey over the wire"""
1497 """Used to transmit failure of a mandatory pushkey over the wire"""
1508 kwargs = {}
1498 kwargs = {}
1509 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1499 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1510 value = inpart.params.get(name)
1500 value = inpart.params.get(name)
1511 if value is not None:
1501 if value is not None:
1512 kwargs[name] = value
1502 kwargs[name] = value
1513 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1503 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1514
1504
1515 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1505 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1516 def handleerrorunsupportedcontent(op, inpart):
1506 def handleerrorunsupportedcontent(op, inpart):
1517 """Used to transmit unknown content error over the wire"""
1507 """Used to transmit unknown content error over the wire"""
1518 kwargs = {}
1508 kwargs = {}
1519 parttype = inpart.params.get('parttype')
1509 parttype = inpart.params.get('parttype')
1520 if parttype is not None:
1510 if parttype is not None:
1521 kwargs['parttype'] = parttype
1511 kwargs['parttype'] = parttype
1522 params = inpart.params.get('params')
1512 params = inpart.params.get('params')
1523 if params is not None:
1513 if params is not None:
1524 kwargs['params'] = params.split('\0')
1514 kwargs['params'] = params.split('\0')
1525
1515
1526 raise error.BundleUnknownFeatureError(**kwargs)
1516 raise error.BundleUnknownFeatureError(**kwargs)
1527
1517
1528 @parthandler('error:pushraced', ('message',))
1518 @parthandler('error:pushraced', ('message',))
1529 def handleerrorpushraced(op, inpart):
1519 def handleerrorpushraced(op, inpart):
1530 """Used to transmit push race error over the wire"""
1520 """Used to transmit push race error over the wire"""
1531 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1521 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1532
1522
1533 @parthandler('listkeys', ('namespace',))
1523 @parthandler('listkeys', ('namespace',))
1534 def handlelistkeys(op, inpart):
1524 def handlelistkeys(op, inpart):
1535 """retrieve pushkey namespace content stored in a bundle2"""
1525 """retrieve pushkey namespace content stored in a bundle2"""
1536 namespace = inpart.params['namespace']
1526 namespace = inpart.params['namespace']
1537 r = pushkey.decodekeys(inpart.read())
1527 r = pushkey.decodekeys(inpart.read())
1538 op.records.add('listkeys', (namespace, r))
1528 op.records.add('listkeys', (namespace, r))
1539
1529
1540 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1530 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1541 def handlepushkey(op, inpart):
1531 def handlepushkey(op, inpart):
1542 """process a pushkey request"""
1532 """process a pushkey request"""
1543 dec = pushkey.decode
1533 dec = pushkey.decode
1544 namespace = dec(inpart.params['namespace'])
1534 namespace = dec(inpart.params['namespace'])
1545 key = dec(inpart.params['key'])
1535 key = dec(inpart.params['key'])
1546 old = dec(inpart.params['old'])
1536 old = dec(inpart.params['old'])
1547 new = dec(inpart.params['new'])
1537 new = dec(inpart.params['new'])
1548 # Grab the transaction to ensure that we have the lock before performing the
1538 # Grab the transaction to ensure that we have the lock before performing the
1549 # pushkey.
1539 # pushkey.
1550 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1540 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1551 op.gettransaction()
1541 op.gettransaction()
1552 ret = op.repo.pushkey(namespace, key, old, new)
1542 ret = op.repo.pushkey(namespace, key, old, new)
1553 record = {'namespace': namespace,
1543 record = {'namespace': namespace,
1554 'key': key,
1544 'key': key,
1555 'old': old,
1545 'old': old,
1556 'new': new}
1546 'new': new}
1557 op.records.add('pushkey', record)
1547 op.records.add('pushkey', record)
1558 if op.reply is not None:
1548 if op.reply is not None:
1559 rpart = op.reply.newpart('reply:pushkey')
1549 rpart = op.reply.newpart('reply:pushkey')
1560 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1550 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1561 rpart.addparam('return', '%i' % ret, mandatory=False)
1551 rpart.addparam('return', '%i' % ret, mandatory=False)
1562 if inpart.mandatory and not ret:
1552 if inpart.mandatory and not ret:
1563 kwargs = {}
1553 kwargs = {}
1564 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1554 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1565 if key in inpart.params:
1555 if key in inpart.params:
1566 kwargs[key] = inpart.params[key]
1556 kwargs[key] = inpart.params[key]
1567 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1557 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1568
1558
1569 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1559 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1570 def handlepushkeyreply(op, inpart):
1560 def handlepushkeyreply(op, inpart):
1571 """retrieve the result of a pushkey request"""
1561 """retrieve the result of a pushkey request"""
1572 ret = int(inpart.params['return'])
1562 ret = int(inpart.params['return'])
1573 partid = int(inpart.params['in-reply-to'])
1563 partid = int(inpart.params['in-reply-to'])
1574 op.records.add('pushkey', {'return': ret}, partid)
1564 op.records.add('pushkey', {'return': ret}, partid)
1575
1565
1576 @parthandler('obsmarkers')
1566 @parthandler('obsmarkers')
1577 def handleobsmarker(op, inpart):
1567 def handleobsmarker(op, inpart):
1578 """add a stream of obsmarkers to the repo"""
1568 """add a stream of obsmarkers to the repo"""
1579 tr = op.gettransaction()
1569 tr = op.gettransaction()
1580 markerdata = inpart.read()
1570 markerdata = inpart.read()
1581 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1571 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1582 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1572 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1583 % len(markerdata))
1573 % len(markerdata))
1584 # The mergemarkers call will crash if marker creation is not enabled.
1574 # The mergemarkers call will crash if marker creation is not enabled.
1585 # we want to avoid this if the part is advisory.
1575 # we want to avoid this if the part is advisory.
1586 if not inpart.mandatory and op.repo.obsstore.readonly:
1576 if not inpart.mandatory and op.repo.obsstore.readonly:
1587 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1577 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1588 return
1578 return
1589 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1579 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1590 if new:
1580 if new:
1591 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1581 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1592 op.records.add('obsmarkers', {'new': new})
1582 op.records.add('obsmarkers', {'new': new})
1593 if op.reply is not None:
1583 if op.reply is not None:
1594 rpart = op.reply.newpart('reply:obsmarkers')
1584 rpart = op.reply.newpart('reply:obsmarkers')
1595 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1585 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1596 rpart.addparam('new', '%i' % new, mandatory=False)
1586 rpart.addparam('new', '%i' % new, mandatory=False)
1597
1587
1598
1588
1599 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1589 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1600 def handleobsmarkerreply(op, inpart):
1590 def handleobsmarkerreply(op, inpart):
1601 """retrieve the result of a pushkey request"""
1591 """retrieve the result of a pushkey request"""
1602 ret = int(inpart.params['new'])
1592 ret = int(inpart.params['new'])
1603 partid = int(inpart.params['in-reply-to'])
1593 partid = int(inpart.params['in-reply-to'])
1604 op.records.add('obsmarkers', {'new': ret}, partid)
1594 op.records.add('obsmarkers', {'new': ret}, partid)
1605
1595
1606 @parthandler('hgtagsfnodes')
1596 @parthandler('hgtagsfnodes')
1607 def handlehgtagsfnodes(op, inpart):
1597 def handlehgtagsfnodes(op, inpart):
1608 """Applies .hgtags fnodes cache entries to the local repo.
1598 """Applies .hgtags fnodes cache entries to the local repo.
1609
1599
1610 Payload is pairs of 20 byte changeset nodes and filenodes.
1600 Payload is pairs of 20 byte changeset nodes and filenodes.
1611 """
1601 """
1612 # Grab the transaction so we ensure that we have the lock at this point.
1602 # Grab the transaction so we ensure that we have the lock at this point.
1613 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1603 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1614 op.gettransaction()
1604 op.gettransaction()
1615 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1605 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1616
1606
1617 count = 0
1607 count = 0
1618 while True:
1608 while True:
1619 node = inpart.read(20)
1609 node = inpart.read(20)
1620 fnode = inpart.read(20)
1610 fnode = inpart.read(20)
1621 if len(node) < 20 or len(fnode) < 20:
1611 if len(node) < 20 or len(fnode) < 20:
1622 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1612 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1623 break
1613 break
1624 cache.setfnode(node, fnode)
1614 cache.setfnode(node, fnode)
1625 count += 1
1615 count += 1
1626
1616
1627 cache.write()
1617 cache.write()
1628 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1618 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now