##// END OF EJS Templates
bundle2: move 'seek' and 'tell' methods off the unpackermixin class...
Pierre-Yves David -
r31889:a02e7730 default
parent child Browse files
Show More
@@ -1,1645 +1,1656 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 pushkey,
161 pushkey,
162 pycompat,
162 pycompat,
163 tags,
163 tags,
164 url,
164 url,
165 util,
165 util,
166 )
166 )
167
167
168 urlerr = util.urlerr
168 urlerr = util.urlerr
169 urlreq = util.urlreq
169 urlreq = util.urlreq
170
170
171 _pack = struct.pack
171 _pack = struct.pack
172 _unpack = struct.unpack
172 _unpack = struct.unpack
173
173
174 _fstreamparamsize = '>i'
174 _fstreamparamsize = '>i'
175 _fpartheadersize = '>i'
175 _fpartheadersize = '>i'
176 _fparttypesize = '>B'
176 _fparttypesize = '>B'
177 _fpartid = '>I'
177 _fpartid = '>I'
178 _fpayloadsize = '>i'
178 _fpayloadsize = '>i'
179 _fpartparamcount = '>BB'
179 _fpartparamcount = '>BB'
180
180
181 preferedchunksize = 4096
181 preferedchunksize = 4096
182
182
183 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
183 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
184
184
185 def outdebug(ui, message):
185 def outdebug(ui, message):
186 """debug regarding output stream (bundling)"""
186 """debug regarding output stream (bundling)"""
187 if ui.configbool('devel', 'bundle2.debug', False):
187 if ui.configbool('devel', 'bundle2.debug', False):
188 ui.debug('bundle2-output: %s\n' % message)
188 ui.debug('bundle2-output: %s\n' % message)
189
189
190 def indebug(ui, message):
190 def indebug(ui, message):
191 """debug on input stream (unbundling)"""
191 """debug on input stream (unbundling)"""
192 if ui.configbool('devel', 'bundle2.debug', False):
192 if ui.configbool('devel', 'bundle2.debug', False):
193 ui.debug('bundle2-input: %s\n' % message)
193 ui.debug('bundle2-input: %s\n' % message)
194
194
195 def validateparttype(parttype):
195 def validateparttype(parttype):
196 """raise ValueError if a parttype contains invalid character"""
196 """raise ValueError if a parttype contains invalid character"""
197 if _parttypeforbidden.search(parttype):
197 if _parttypeforbidden.search(parttype):
198 raise ValueError(parttype)
198 raise ValueError(parttype)
199
199
200 def _makefpartparamsizes(nbparams):
200 def _makefpartparamsizes(nbparams):
201 """return a struct format to read part parameter sizes
201 """return a struct format to read part parameter sizes
202
202
203 The number parameters is variable so we need to build that format
203 The number parameters is variable so we need to build that format
204 dynamically.
204 dynamically.
205 """
205 """
206 return '>'+('BB'*nbparams)
206 return '>'+('BB'*nbparams)
207
207
208 parthandlermapping = {}
208 parthandlermapping = {}
209
209
210 def parthandler(parttype, params=()):
210 def parthandler(parttype, params=()):
211 """decorator that register a function as a bundle2 part handler
211 """decorator that register a function as a bundle2 part handler
212
212
213 eg::
213 eg::
214
214
215 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
215 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
216 def myparttypehandler(...):
216 def myparttypehandler(...):
217 '''process a part of type "my part".'''
217 '''process a part of type "my part".'''
218 ...
218 ...
219 """
219 """
220 validateparttype(parttype)
220 validateparttype(parttype)
221 def _decorator(func):
221 def _decorator(func):
222 lparttype = parttype.lower() # enforce lower case matching.
222 lparttype = parttype.lower() # enforce lower case matching.
223 assert lparttype not in parthandlermapping
223 assert lparttype not in parthandlermapping
224 parthandlermapping[lparttype] = func
224 parthandlermapping[lparttype] = func
225 func.params = frozenset(params)
225 func.params = frozenset(params)
226 return func
226 return func
227 return _decorator
227 return _decorator
228
228
229 class unbundlerecords(object):
229 class unbundlerecords(object):
230 """keep record of what happens during and unbundle
230 """keep record of what happens during and unbundle
231
231
232 New records are added using `records.add('cat', obj)`. Where 'cat' is a
232 New records are added using `records.add('cat', obj)`. Where 'cat' is a
233 category of record and obj is an arbitrary object.
233 category of record and obj is an arbitrary object.
234
234
235 `records['cat']` will return all entries of this category 'cat'.
235 `records['cat']` will return all entries of this category 'cat'.
236
236
237 Iterating on the object itself will yield `('category', obj)` tuples
237 Iterating on the object itself will yield `('category', obj)` tuples
238 for all entries.
238 for all entries.
239
239
240 All iterations happens in chronological order.
240 All iterations happens in chronological order.
241 """
241 """
242
242
243 def __init__(self):
243 def __init__(self):
244 self._categories = {}
244 self._categories = {}
245 self._sequences = []
245 self._sequences = []
246 self._replies = {}
246 self._replies = {}
247
247
248 def add(self, category, entry, inreplyto=None):
248 def add(self, category, entry, inreplyto=None):
249 """add a new record of a given category.
249 """add a new record of a given category.
250
250
251 The entry can then be retrieved in the list returned by
251 The entry can then be retrieved in the list returned by
252 self['category']."""
252 self['category']."""
253 self._categories.setdefault(category, []).append(entry)
253 self._categories.setdefault(category, []).append(entry)
254 self._sequences.append((category, entry))
254 self._sequences.append((category, entry))
255 if inreplyto is not None:
255 if inreplyto is not None:
256 self.getreplies(inreplyto).add(category, entry)
256 self.getreplies(inreplyto).add(category, entry)
257
257
258 def getreplies(self, partid):
258 def getreplies(self, partid):
259 """get the records that are replies to a specific part"""
259 """get the records that are replies to a specific part"""
260 return self._replies.setdefault(partid, unbundlerecords())
260 return self._replies.setdefault(partid, unbundlerecords())
261
261
262 def __getitem__(self, cat):
262 def __getitem__(self, cat):
263 return tuple(self._categories.get(cat, ()))
263 return tuple(self._categories.get(cat, ()))
264
264
265 def __iter__(self):
265 def __iter__(self):
266 return iter(self._sequences)
266 return iter(self._sequences)
267
267
268 def __len__(self):
268 def __len__(self):
269 return len(self._sequences)
269 return len(self._sequences)
270
270
271 def __nonzero__(self):
271 def __nonzero__(self):
272 return bool(self._sequences)
272 return bool(self._sequences)
273
273
274 __bool__ = __nonzero__
274 __bool__ = __nonzero__
275
275
276 class bundleoperation(object):
276 class bundleoperation(object):
277 """an object that represents a single bundling process
277 """an object that represents a single bundling process
278
278
279 Its purpose is to carry unbundle-related objects and states.
279 Its purpose is to carry unbundle-related objects and states.
280
280
281 A new object should be created at the beginning of each bundle processing.
281 A new object should be created at the beginning of each bundle processing.
282 The object is to be returned by the processing function.
282 The object is to be returned by the processing function.
283
283
284 The object has very little content now it will ultimately contain:
284 The object has very little content now it will ultimately contain:
285 * an access to the repo the bundle is applied to,
285 * an access to the repo the bundle is applied to,
286 * a ui object,
286 * a ui object,
287 * a way to retrieve a transaction to add changes to the repo,
287 * a way to retrieve a transaction to add changes to the repo,
288 * a way to record the result of processing each part,
288 * a way to record the result of processing each part,
289 * a way to construct a bundle response when applicable.
289 * a way to construct a bundle response when applicable.
290 """
290 """
291
291
292 def __init__(self, repo, transactiongetter, captureoutput=True):
292 def __init__(self, repo, transactiongetter, captureoutput=True):
293 self.repo = repo
293 self.repo = repo
294 self.ui = repo.ui
294 self.ui = repo.ui
295 self.records = unbundlerecords()
295 self.records = unbundlerecords()
296 self.gettransaction = transactiongetter
296 self.gettransaction = transactiongetter
297 self.reply = None
297 self.reply = None
298 self.captureoutput = captureoutput
298 self.captureoutput = captureoutput
299
299
300 class TransactionUnavailable(RuntimeError):
300 class TransactionUnavailable(RuntimeError):
301 pass
301 pass
302
302
303 def _notransaction():
303 def _notransaction():
304 """default method to get a transaction while processing a bundle
304 """default method to get a transaction while processing a bundle
305
305
306 Raise an exception to highlight the fact that no transaction was expected
306 Raise an exception to highlight the fact that no transaction was expected
307 to be created"""
307 to be created"""
308 raise TransactionUnavailable()
308 raise TransactionUnavailable()
309
309
310 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
310 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
311 # transform me into unbundler.apply() as soon as the freeze is lifted
311 # transform me into unbundler.apply() as soon as the freeze is lifted
312 tr.hookargs['bundle2'] = '1'
312 tr.hookargs['bundle2'] = '1'
313 if source is not None and 'source' not in tr.hookargs:
313 if source is not None and 'source' not in tr.hookargs:
314 tr.hookargs['source'] = source
314 tr.hookargs['source'] = source
315 if url is not None and 'url' not in tr.hookargs:
315 if url is not None and 'url' not in tr.hookargs:
316 tr.hookargs['url'] = url
316 tr.hookargs['url'] = url
317 return processbundle(repo, unbundler, lambda: tr, op=op)
317 return processbundle(repo, unbundler, lambda: tr, op=op)
318
318
319 def processbundle(repo, unbundler, transactiongetter=None, op=None):
319 def processbundle(repo, unbundler, transactiongetter=None, op=None):
320 """This function process a bundle, apply effect to/from a repo
320 """This function process a bundle, apply effect to/from a repo
321
321
322 It iterates over each part then searches for and uses the proper handling
322 It iterates over each part then searches for and uses the proper handling
323 code to process the part. Parts are processed in order.
323 code to process the part. Parts are processed in order.
324
324
325 Unknown Mandatory part will abort the process.
325 Unknown Mandatory part will abort the process.
326
326
327 It is temporarily possible to provide a prebuilt bundleoperation to the
327 It is temporarily possible to provide a prebuilt bundleoperation to the
328 function. This is used to ensure output is properly propagated in case of
328 function. This is used to ensure output is properly propagated in case of
329 an error during the unbundling. This output capturing part will likely be
329 an error during the unbundling. This output capturing part will likely be
330 reworked and this ability will probably go away in the process.
330 reworked and this ability will probably go away in the process.
331 """
331 """
332 if op is None:
332 if op is None:
333 if transactiongetter is None:
333 if transactiongetter is None:
334 transactiongetter = _notransaction
334 transactiongetter = _notransaction
335 op = bundleoperation(repo, transactiongetter)
335 op = bundleoperation(repo, transactiongetter)
336 # todo:
336 # todo:
337 # - replace this is a init function soon.
337 # - replace this is a init function soon.
338 # - exception catching
338 # - exception catching
339 unbundler.params
339 unbundler.params
340 if repo.ui.debugflag:
340 if repo.ui.debugflag:
341 msg = ['bundle2-input-bundle:']
341 msg = ['bundle2-input-bundle:']
342 if unbundler.params:
342 if unbundler.params:
343 msg.append(' %i params')
343 msg.append(' %i params')
344 if op.gettransaction is None:
344 if op.gettransaction is None:
345 msg.append(' no-transaction')
345 msg.append(' no-transaction')
346 else:
346 else:
347 msg.append(' with-transaction')
347 msg.append(' with-transaction')
348 msg.append('\n')
348 msg.append('\n')
349 repo.ui.debug(''.join(msg))
349 repo.ui.debug(''.join(msg))
350 iterparts = enumerate(unbundler.iterparts())
350 iterparts = enumerate(unbundler.iterparts())
351 part = None
351 part = None
352 nbpart = 0
352 nbpart = 0
353 try:
353 try:
354 for nbpart, part in iterparts:
354 for nbpart, part in iterparts:
355 _processpart(op, part)
355 _processpart(op, part)
356 except Exception as exc:
356 except Exception as exc:
357 for nbpart, part in iterparts:
357 for nbpart, part in iterparts:
358 # consume the bundle content
358 # consume the bundle content
359 part.seek(0, 2)
359 part.seek(0, 2)
360 # Small hack to let caller code distinguish exceptions from bundle2
360 # Small hack to let caller code distinguish exceptions from bundle2
361 # processing from processing the old format. This is mostly
361 # processing from processing the old format. This is mostly
362 # needed to handle different return codes to unbundle according to the
362 # needed to handle different return codes to unbundle according to the
363 # type of bundle. We should probably clean up or drop this return code
363 # type of bundle. We should probably clean up or drop this return code
364 # craziness in a future version.
364 # craziness in a future version.
365 exc.duringunbundle2 = True
365 exc.duringunbundle2 = True
366 salvaged = []
366 salvaged = []
367 replycaps = None
367 replycaps = None
368 if op.reply is not None:
368 if op.reply is not None:
369 salvaged = op.reply.salvageoutput()
369 salvaged = op.reply.salvageoutput()
370 replycaps = op.reply.capabilities
370 replycaps = op.reply.capabilities
371 exc._replycaps = replycaps
371 exc._replycaps = replycaps
372 exc._bundle2salvagedoutput = salvaged
372 exc._bundle2salvagedoutput = salvaged
373 raise
373 raise
374 finally:
374 finally:
375 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
375 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
376
376
377 return op
377 return op
378
378
379 def _processpart(op, part):
379 def _processpart(op, part):
380 """process a single part from a bundle
380 """process a single part from a bundle
381
381
382 The part is guaranteed to have been fully consumed when the function exits
382 The part is guaranteed to have been fully consumed when the function exits
383 (even if an exception is raised)."""
383 (even if an exception is raised)."""
384 status = 'unknown' # used by debug output
384 status = 'unknown' # used by debug output
385 hardabort = False
385 hardabort = False
386 try:
386 try:
387 try:
387 try:
388 handler = parthandlermapping.get(part.type)
388 handler = parthandlermapping.get(part.type)
389 if handler is None:
389 if handler is None:
390 status = 'unsupported-type'
390 status = 'unsupported-type'
391 raise error.BundleUnknownFeatureError(parttype=part.type)
391 raise error.BundleUnknownFeatureError(parttype=part.type)
392 indebug(op.ui, 'found a handler for part %r' % part.type)
392 indebug(op.ui, 'found a handler for part %r' % part.type)
393 unknownparams = part.mandatorykeys - handler.params
393 unknownparams = part.mandatorykeys - handler.params
394 if unknownparams:
394 if unknownparams:
395 unknownparams = list(unknownparams)
395 unknownparams = list(unknownparams)
396 unknownparams.sort()
396 unknownparams.sort()
397 status = 'unsupported-params (%s)' % unknownparams
397 status = 'unsupported-params (%s)' % unknownparams
398 raise error.BundleUnknownFeatureError(parttype=part.type,
398 raise error.BundleUnknownFeatureError(parttype=part.type,
399 params=unknownparams)
399 params=unknownparams)
400 status = 'supported'
400 status = 'supported'
401 except error.BundleUnknownFeatureError as exc:
401 except error.BundleUnknownFeatureError as exc:
402 if part.mandatory: # mandatory parts
402 if part.mandatory: # mandatory parts
403 raise
403 raise
404 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
404 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
405 return # skip to part processing
405 return # skip to part processing
406 finally:
406 finally:
407 if op.ui.debugflag:
407 if op.ui.debugflag:
408 msg = ['bundle2-input-part: "%s"' % part.type]
408 msg = ['bundle2-input-part: "%s"' % part.type]
409 if not part.mandatory:
409 if not part.mandatory:
410 msg.append(' (advisory)')
410 msg.append(' (advisory)')
411 nbmp = len(part.mandatorykeys)
411 nbmp = len(part.mandatorykeys)
412 nbap = len(part.params) - nbmp
412 nbap = len(part.params) - nbmp
413 if nbmp or nbap:
413 if nbmp or nbap:
414 msg.append(' (params:')
414 msg.append(' (params:')
415 if nbmp:
415 if nbmp:
416 msg.append(' %i mandatory' % nbmp)
416 msg.append(' %i mandatory' % nbmp)
417 if nbap:
417 if nbap:
418 msg.append(' %i advisory' % nbmp)
418 msg.append(' %i advisory' % nbmp)
419 msg.append(')')
419 msg.append(')')
420 msg.append(' %s\n' % status)
420 msg.append(' %s\n' % status)
421 op.ui.debug(''.join(msg))
421 op.ui.debug(''.join(msg))
422
422
423 # handler is called outside the above try block so that we don't
423 # handler is called outside the above try block so that we don't
424 # risk catching KeyErrors from anything other than the
424 # risk catching KeyErrors from anything other than the
425 # parthandlermapping lookup (any KeyError raised by handler()
425 # parthandlermapping lookup (any KeyError raised by handler()
426 # itself represents a defect of a different variety).
426 # itself represents a defect of a different variety).
427 output = None
427 output = None
428 if op.captureoutput and op.reply is not None:
428 if op.captureoutput and op.reply is not None:
429 op.ui.pushbuffer(error=True, subproc=True)
429 op.ui.pushbuffer(error=True, subproc=True)
430 output = ''
430 output = ''
431 try:
431 try:
432 handler(op, part)
432 handler(op, part)
433 finally:
433 finally:
434 if output is not None:
434 if output is not None:
435 output = op.ui.popbuffer()
435 output = op.ui.popbuffer()
436 if output:
436 if output:
437 outpart = op.reply.newpart('output', data=output,
437 outpart = op.reply.newpart('output', data=output,
438 mandatory=False)
438 mandatory=False)
439 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
439 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
440 # If exiting or interrupted, do not attempt to seek the stream in the
440 # If exiting or interrupted, do not attempt to seek the stream in the
441 # finally block below. This makes abort faster.
441 # finally block below. This makes abort faster.
442 except (SystemExit, KeyboardInterrupt):
442 except (SystemExit, KeyboardInterrupt):
443 hardabort = True
443 hardabort = True
444 raise
444 raise
445 finally:
445 finally:
446 # consume the part content to not corrupt the stream.
446 # consume the part content to not corrupt the stream.
447 if not hardabort:
447 if not hardabort:
448 part.seek(0, 2)
448 part.seek(0, 2)
449
449
450
450
451 def decodecaps(blob):
451 def decodecaps(blob):
452 """decode a bundle2 caps bytes blob into a dictionary
452 """decode a bundle2 caps bytes blob into a dictionary
453
453
454 The blob is a list of capabilities (one per line)
454 The blob is a list of capabilities (one per line)
455 Capabilities may have values using a line of the form::
455 Capabilities may have values using a line of the form::
456
456
457 capability=value1,value2,value3
457 capability=value1,value2,value3
458
458
459 The values are always a list."""
459 The values are always a list."""
460 caps = {}
460 caps = {}
461 for line in blob.splitlines():
461 for line in blob.splitlines():
462 if not line:
462 if not line:
463 continue
463 continue
464 if '=' not in line:
464 if '=' not in line:
465 key, vals = line, ()
465 key, vals = line, ()
466 else:
466 else:
467 key, vals = line.split('=', 1)
467 key, vals = line.split('=', 1)
468 vals = vals.split(',')
468 vals = vals.split(',')
469 key = urlreq.unquote(key)
469 key = urlreq.unquote(key)
470 vals = [urlreq.unquote(v) for v in vals]
470 vals = [urlreq.unquote(v) for v in vals]
471 caps[key] = vals
471 caps[key] = vals
472 return caps
472 return caps
473
473
474 def encodecaps(caps):
474 def encodecaps(caps):
475 """encode a bundle2 caps dictionary into a bytes blob"""
475 """encode a bundle2 caps dictionary into a bytes blob"""
476 chunks = []
476 chunks = []
477 for ca in sorted(caps):
477 for ca in sorted(caps):
478 vals = caps[ca]
478 vals = caps[ca]
479 ca = urlreq.quote(ca)
479 ca = urlreq.quote(ca)
480 vals = [urlreq.quote(v) for v in vals]
480 vals = [urlreq.quote(v) for v in vals]
481 if vals:
481 if vals:
482 ca = "%s=%s" % (ca, ','.join(vals))
482 ca = "%s=%s" % (ca, ','.join(vals))
483 chunks.append(ca)
483 chunks.append(ca)
484 return '\n'.join(chunks)
484 return '\n'.join(chunks)
485
485
486 bundletypes = {
486 bundletypes = {
487 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
487 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
488 # since the unification ssh accepts a header but there
488 # since the unification ssh accepts a header but there
489 # is no capability signaling it.
489 # is no capability signaling it.
490 "HG20": (), # special-cased below
490 "HG20": (), # special-cased below
491 "HG10UN": ("HG10UN", 'UN'),
491 "HG10UN": ("HG10UN", 'UN'),
492 "HG10BZ": ("HG10", 'BZ'),
492 "HG10BZ": ("HG10", 'BZ'),
493 "HG10GZ": ("HG10GZ", 'GZ'),
493 "HG10GZ": ("HG10GZ", 'GZ'),
494 }
494 }
495
495
496 # hgweb uses this list to communicate its preferred type
496 # hgweb uses this list to communicate its preferred type
497 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
497 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
498
498
499 class bundle20(object):
499 class bundle20(object):
500 """represent an outgoing bundle2 container
500 """represent an outgoing bundle2 container
501
501
502 Use the `addparam` method to add stream level parameter. and `newpart` to
502 Use the `addparam` method to add stream level parameter. and `newpart` to
503 populate it. Then call `getchunks` to retrieve all the binary chunks of
503 populate it. Then call `getchunks` to retrieve all the binary chunks of
504 data that compose the bundle2 container."""
504 data that compose the bundle2 container."""
505
505
506 _magicstring = 'HG20'
506 _magicstring = 'HG20'
507
507
508 def __init__(self, ui, capabilities=()):
508 def __init__(self, ui, capabilities=()):
509 self.ui = ui
509 self.ui = ui
510 self._params = []
510 self._params = []
511 self._parts = []
511 self._parts = []
512 self.capabilities = dict(capabilities)
512 self.capabilities = dict(capabilities)
513 self._compengine = util.compengines.forbundletype('UN')
513 self._compengine = util.compengines.forbundletype('UN')
514 self._compopts = None
514 self._compopts = None
515
515
516 def setcompression(self, alg, compopts=None):
516 def setcompression(self, alg, compopts=None):
517 """setup core part compression to <alg>"""
517 """setup core part compression to <alg>"""
518 if alg in (None, 'UN'):
518 if alg in (None, 'UN'):
519 return
519 return
520 assert not any(n.lower() == 'compression' for n, v in self._params)
520 assert not any(n.lower() == 'compression' for n, v in self._params)
521 self.addparam('Compression', alg)
521 self.addparam('Compression', alg)
522 self._compengine = util.compengines.forbundletype(alg)
522 self._compengine = util.compengines.forbundletype(alg)
523 self._compopts = compopts
523 self._compopts = compopts
524
524
525 @property
525 @property
526 def nbparts(self):
526 def nbparts(self):
527 """total number of parts added to the bundler"""
527 """total number of parts added to the bundler"""
528 return len(self._parts)
528 return len(self._parts)
529
529
530 # methods used to defines the bundle2 content
530 # methods used to defines the bundle2 content
531 def addparam(self, name, value=None):
531 def addparam(self, name, value=None):
532 """add a stream level parameter"""
532 """add a stream level parameter"""
533 if not name:
533 if not name:
534 raise ValueError('empty parameter name')
534 raise ValueError('empty parameter name')
535 if name[0] not in string.letters:
535 if name[0] not in string.letters:
536 raise ValueError('non letter first character: %r' % name)
536 raise ValueError('non letter first character: %r' % name)
537 self._params.append((name, value))
537 self._params.append((name, value))
538
538
539 def addpart(self, part):
539 def addpart(self, part):
540 """add a new part to the bundle2 container
540 """add a new part to the bundle2 container
541
541
542 Parts contains the actual applicative payload."""
542 Parts contains the actual applicative payload."""
543 assert part.id is None
543 assert part.id is None
544 part.id = len(self._parts) # very cheap counter
544 part.id = len(self._parts) # very cheap counter
545 self._parts.append(part)
545 self._parts.append(part)
546
546
547 def newpart(self, typeid, *args, **kwargs):
547 def newpart(self, typeid, *args, **kwargs):
548 """create a new part and add it to the containers
548 """create a new part and add it to the containers
549
549
550 As the part is directly added to the containers. For now, this means
550 As the part is directly added to the containers. For now, this means
551 that any failure to properly initialize the part after calling
551 that any failure to properly initialize the part after calling
552 ``newpart`` should result in a failure of the whole bundling process.
552 ``newpart`` should result in a failure of the whole bundling process.
553
553
554 You can still fall back to manually create and add if you need better
554 You can still fall back to manually create and add if you need better
555 control."""
555 control."""
556 part = bundlepart(typeid, *args, **kwargs)
556 part = bundlepart(typeid, *args, **kwargs)
557 self.addpart(part)
557 self.addpart(part)
558 return part
558 return part
559
559
560 # methods used to generate the bundle2 stream
560 # methods used to generate the bundle2 stream
561 def getchunks(self):
561 def getchunks(self):
562 if self.ui.debugflag:
562 if self.ui.debugflag:
563 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
563 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
564 if self._params:
564 if self._params:
565 msg.append(' (%i params)' % len(self._params))
565 msg.append(' (%i params)' % len(self._params))
566 msg.append(' %i parts total\n' % len(self._parts))
566 msg.append(' %i parts total\n' % len(self._parts))
567 self.ui.debug(''.join(msg))
567 self.ui.debug(''.join(msg))
568 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
568 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
569 yield self._magicstring
569 yield self._magicstring
570 param = self._paramchunk()
570 param = self._paramchunk()
571 outdebug(self.ui, 'bundle parameter: %s' % param)
571 outdebug(self.ui, 'bundle parameter: %s' % param)
572 yield _pack(_fstreamparamsize, len(param))
572 yield _pack(_fstreamparamsize, len(param))
573 if param:
573 if param:
574 yield param
574 yield param
575 for chunk in self._compengine.compressstream(self._getcorechunk(),
575 for chunk in self._compengine.compressstream(self._getcorechunk(),
576 self._compopts):
576 self._compopts):
577 yield chunk
577 yield chunk
578
578
579 def _paramchunk(self):
579 def _paramchunk(self):
580 """return a encoded version of all stream parameters"""
580 """return a encoded version of all stream parameters"""
581 blocks = []
581 blocks = []
582 for par, value in self._params:
582 for par, value in self._params:
583 par = urlreq.quote(par)
583 par = urlreq.quote(par)
584 if value is not None:
584 if value is not None:
585 value = urlreq.quote(value)
585 value = urlreq.quote(value)
586 par = '%s=%s' % (par, value)
586 par = '%s=%s' % (par, value)
587 blocks.append(par)
587 blocks.append(par)
588 return ' '.join(blocks)
588 return ' '.join(blocks)
589
589
590 def _getcorechunk(self):
590 def _getcorechunk(self):
591 """yield chunk for the core part of the bundle
591 """yield chunk for the core part of the bundle
592
592
593 (all but headers and parameters)"""
593 (all but headers and parameters)"""
594 outdebug(self.ui, 'start of parts')
594 outdebug(self.ui, 'start of parts')
595 for part in self._parts:
595 for part in self._parts:
596 outdebug(self.ui, 'bundle part: "%s"' % part.type)
596 outdebug(self.ui, 'bundle part: "%s"' % part.type)
597 for chunk in part.getchunks(ui=self.ui):
597 for chunk in part.getchunks(ui=self.ui):
598 yield chunk
598 yield chunk
599 outdebug(self.ui, 'end of bundle')
599 outdebug(self.ui, 'end of bundle')
600 yield _pack(_fpartheadersize, 0)
600 yield _pack(_fpartheadersize, 0)
601
601
602
602
603 def salvageoutput(self):
603 def salvageoutput(self):
604 """return a list with a copy of all output parts in the bundle
604 """return a list with a copy of all output parts in the bundle
605
605
606 This is meant to be used during error handling to make sure we preserve
606 This is meant to be used during error handling to make sure we preserve
607 server output"""
607 server output"""
608 salvaged = []
608 salvaged = []
609 for part in self._parts:
609 for part in self._parts:
610 if part.type.startswith('output'):
610 if part.type.startswith('output'):
611 salvaged.append(part.copy())
611 salvaged.append(part.copy())
612 return salvaged
612 return salvaged
613
613
614
614
615 class unpackermixin(object):
615 class unpackermixin(object):
616 """A mixin to extract bytes and struct data from a stream"""
616 """A mixin to extract bytes and struct data from a stream"""
617
617
618 def __init__(self, fp):
618 def __init__(self, fp):
619 self._fp = fp
619 self._fp = fp
620 self._seekable = (util.safehasattr(fp, 'seek') and
621 util.safehasattr(fp, 'tell'))
622
620
623 def _unpack(self, format):
621 def _unpack(self, format):
624 """unpack this struct format from the stream
622 """unpack this struct format from the stream
625
623
626 This method is meant for internal usage by the bundle2 protocol only.
624 This method is meant for internal usage by the bundle2 protocol only.
627 They directly manipulate the low level stream including bundle2 level
625 They directly manipulate the low level stream including bundle2 level
628 instruction.
626 instruction.
629
627
630 Do not use it to implement higher-level logic or methods."""
628 Do not use it to implement higher-level logic or methods."""
631 data = self._readexact(struct.calcsize(format))
629 data = self._readexact(struct.calcsize(format))
632 return _unpack(format, data)
630 return _unpack(format, data)
633
631
634 def _readexact(self, size):
632 def _readexact(self, size):
635 """read exactly <size> bytes from the stream
633 """read exactly <size> bytes from the stream
636
634
637 This method is meant for internal usage by the bundle2 protocol only.
635 This method is meant for internal usage by the bundle2 protocol only.
638 They directly manipulate the low level stream including bundle2 level
636 They directly manipulate the low level stream including bundle2 level
639 instruction.
637 instruction.
640
638
641 Do not use it to implement higher-level logic or methods."""
639 Do not use it to implement higher-level logic or methods."""
642 return changegroup.readexactly(self._fp, size)
640 return changegroup.readexactly(self._fp, size)
643
641
644 def seek(self, offset, whence=0):
645 """move the underlying file pointer"""
646 if self._seekable:
647 return self._fp.seek(offset, whence)
648 else:
649 raise NotImplementedError(_('File pointer is not seekable'))
650
651 def tell(self):
652 """return the file offset, or None if file is not seekable"""
653 if self._seekable:
654 try:
655 return self._fp.tell()
656 except IOError as e:
657 if e.errno == errno.ESPIPE:
658 self._seekable = False
659 else:
660 raise
661 return None
662
663 def getunbundler(ui, fp, magicstring=None):
642 def getunbundler(ui, fp, magicstring=None):
664 """return a valid unbundler object for a given magicstring"""
643 """return a valid unbundler object for a given magicstring"""
665 if magicstring is None:
644 if magicstring is None:
666 magicstring = changegroup.readexactly(fp, 4)
645 magicstring = changegroup.readexactly(fp, 4)
667 magic, version = magicstring[0:2], magicstring[2:4]
646 magic, version = magicstring[0:2], magicstring[2:4]
668 if magic != 'HG':
647 if magic != 'HG':
669 raise error.Abort(_('not a Mercurial bundle'))
648 raise error.Abort(_('not a Mercurial bundle'))
670 unbundlerclass = formatmap.get(version)
649 unbundlerclass = formatmap.get(version)
671 if unbundlerclass is None:
650 if unbundlerclass is None:
672 raise error.Abort(_('unknown bundle version %s') % version)
651 raise error.Abort(_('unknown bundle version %s') % version)
673 unbundler = unbundlerclass(ui, fp)
652 unbundler = unbundlerclass(ui, fp)
674 indebug(ui, 'start processing of %s stream' % magicstring)
653 indebug(ui, 'start processing of %s stream' % magicstring)
675 return unbundler
654 return unbundler
676
655
677 class unbundle20(unpackermixin):
656 class unbundle20(unpackermixin):
678 """interpret a bundle2 stream
657 """interpret a bundle2 stream
679
658
680 This class is fed with a binary stream and yields parts through its
659 This class is fed with a binary stream and yields parts through its
681 `iterparts` methods."""
660 `iterparts` methods."""
682
661
683 _magicstring = 'HG20'
662 _magicstring = 'HG20'
684
663
685 def __init__(self, ui, fp):
664 def __init__(self, ui, fp):
686 """If header is specified, we do not read it out of the stream."""
665 """If header is specified, we do not read it out of the stream."""
687 self.ui = ui
666 self.ui = ui
688 self._compengine = util.compengines.forbundletype('UN')
667 self._compengine = util.compengines.forbundletype('UN')
689 self._compressed = None
668 self._compressed = None
690 super(unbundle20, self).__init__(fp)
669 super(unbundle20, self).__init__(fp)
691
670
692 @util.propertycache
671 @util.propertycache
693 def params(self):
672 def params(self):
694 """dictionary of stream level parameters"""
673 """dictionary of stream level parameters"""
695 indebug(self.ui, 'reading bundle2 stream parameters')
674 indebug(self.ui, 'reading bundle2 stream parameters')
696 params = {}
675 params = {}
697 paramssize = self._unpack(_fstreamparamsize)[0]
676 paramssize = self._unpack(_fstreamparamsize)[0]
698 if paramssize < 0:
677 if paramssize < 0:
699 raise error.BundleValueError('negative bundle param size: %i'
678 raise error.BundleValueError('negative bundle param size: %i'
700 % paramssize)
679 % paramssize)
701 if paramssize:
680 if paramssize:
702 params = self._readexact(paramssize)
681 params = self._readexact(paramssize)
703 params = self._processallparams(params)
682 params = self._processallparams(params)
704 return params
683 return params
705
684
706 def _processallparams(self, paramsblock):
685 def _processallparams(self, paramsblock):
707 """"""
686 """"""
708 params = util.sortdict()
687 params = util.sortdict()
709 for p in paramsblock.split(' '):
688 for p in paramsblock.split(' '):
710 p = p.split('=', 1)
689 p = p.split('=', 1)
711 p = [urlreq.unquote(i) for i in p]
690 p = [urlreq.unquote(i) for i in p]
712 if len(p) < 2:
691 if len(p) < 2:
713 p.append(None)
692 p.append(None)
714 self._processparam(*p)
693 self._processparam(*p)
715 params[p[0]] = p[1]
694 params[p[0]] = p[1]
716 return params
695 return params
717
696
718
697
719 def _processparam(self, name, value):
698 def _processparam(self, name, value):
720 """process a parameter, applying its effect if needed
699 """process a parameter, applying its effect if needed
721
700
722 Parameter starting with a lower case letter are advisory and will be
701 Parameter starting with a lower case letter are advisory and will be
723 ignored when unknown. Those starting with an upper case letter are
702 ignored when unknown. Those starting with an upper case letter are
724 mandatory and will this function will raise a KeyError when unknown.
703 mandatory and will this function will raise a KeyError when unknown.
725
704
726 Note: no option are currently supported. Any input will be either
705 Note: no option are currently supported. Any input will be either
727 ignored or failing.
706 ignored or failing.
728 """
707 """
729 if not name:
708 if not name:
730 raise ValueError('empty parameter name')
709 raise ValueError('empty parameter name')
731 if name[0] not in string.letters:
710 if name[0] not in string.letters:
732 raise ValueError('non letter first character: %r' % name)
711 raise ValueError('non letter first character: %r' % name)
733 try:
712 try:
734 handler = b2streamparamsmap[name.lower()]
713 handler = b2streamparamsmap[name.lower()]
735 except KeyError:
714 except KeyError:
736 if name[0].islower():
715 if name[0].islower():
737 indebug(self.ui, "ignoring unknown parameter %r" % name)
716 indebug(self.ui, "ignoring unknown parameter %r" % name)
738 else:
717 else:
739 raise error.BundleUnknownFeatureError(params=(name,))
718 raise error.BundleUnknownFeatureError(params=(name,))
740 else:
719 else:
741 handler(self, name, value)
720 handler(self, name, value)
742
721
743 def _forwardchunks(self):
722 def _forwardchunks(self):
744 """utility to transfer a bundle2 as binary
723 """utility to transfer a bundle2 as binary
745
724
746 This is made necessary by the fact the 'getbundle' command over 'ssh'
725 This is made necessary by the fact the 'getbundle' command over 'ssh'
747 have no way to know then the reply end, relying on the bundle to be
726 have no way to know then the reply end, relying on the bundle to be
748 interpreted to know its end. This is terrible and we are sorry, but we
727 interpreted to know its end. This is terrible and we are sorry, but we
749 needed to move forward to get general delta enabled.
728 needed to move forward to get general delta enabled.
750 """
729 """
751 yield self._magicstring
730 yield self._magicstring
752 assert 'params' not in vars(self)
731 assert 'params' not in vars(self)
753 paramssize = self._unpack(_fstreamparamsize)[0]
732 paramssize = self._unpack(_fstreamparamsize)[0]
754 if paramssize < 0:
733 if paramssize < 0:
755 raise error.BundleValueError('negative bundle param size: %i'
734 raise error.BundleValueError('negative bundle param size: %i'
756 % paramssize)
735 % paramssize)
757 yield _pack(_fstreamparamsize, paramssize)
736 yield _pack(_fstreamparamsize, paramssize)
758 if paramssize:
737 if paramssize:
759 params = self._readexact(paramssize)
738 params = self._readexact(paramssize)
760 self._processallparams(params)
739 self._processallparams(params)
761 yield params
740 yield params
762 assert self._compengine.bundletype == 'UN'
741 assert self._compengine.bundletype == 'UN'
763 # From there, payload might need to be decompressed
742 # From there, payload might need to be decompressed
764 self._fp = self._compengine.decompressorreader(self._fp)
743 self._fp = self._compengine.decompressorreader(self._fp)
765 emptycount = 0
744 emptycount = 0
766 while emptycount < 2:
745 while emptycount < 2:
767 # so we can brainlessly loop
746 # so we can brainlessly loop
768 assert _fpartheadersize == _fpayloadsize
747 assert _fpartheadersize == _fpayloadsize
769 size = self._unpack(_fpartheadersize)[0]
748 size = self._unpack(_fpartheadersize)[0]
770 yield _pack(_fpartheadersize, size)
749 yield _pack(_fpartheadersize, size)
771 if size:
750 if size:
772 emptycount = 0
751 emptycount = 0
773 else:
752 else:
774 emptycount += 1
753 emptycount += 1
775 continue
754 continue
776 if size == flaginterrupt:
755 if size == flaginterrupt:
777 continue
756 continue
778 elif size < 0:
757 elif size < 0:
779 raise error.BundleValueError('negative chunk size: %i')
758 raise error.BundleValueError('negative chunk size: %i')
780 yield self._readexact(size)
759 yield self._readexact(size)
781
760
782
761
783 def iterparts(self):
762 def iterparts(self):
784 """yield all parts contained in the stream"""
763 """yield all parts contained in the stream"""
785 # make sure param have been loaded
764 # make sure param have been loaded
786 self.params
765 self.params
787 # From there, payload need to be decompressed
766 # From there, payload need to be decompressed
788 self._fp = self._compengine.decompressorreader(self._fp)
767 self._fp = self._compengine.decompressorreader(self._fp)
789 indebug(self.ui, 'start extraction of bundle2 parts')
768 indebug(self.ui, 'start extraction of bundle2 parts')
790 headerblock = self._readpartheader()
769 headerblock = self._readpartheader()
791 while headerblock is not None:
770 while headerblock is not None:
792 part = unbundlepart(self.ui, headerblock, self._fp)
771 part = unbundlepart(self.ui, headerblock, self._fp)
793 yield part
772 yield part
794 part.seek(0, 2)
773 part.seek(0, 2)
795 headerblock = self._readpartheader()
774 headerblock = self._readpartheader()
796 indebug(self.ui, 'end of bundle2 stream')
775 indebug(self.ui, 'end of bundle2 stream')
797
776
798 def _readpartheader(self):
777 def _readpartheader(self):
799 """reads a part header size and return the bytes blob
778 """reads a part header size and return the bytes blob
800
779
801 returns None if empty"""
780 returns None if empty"""
802 headersize = self._unpack(_fpartheadersize)[0]
781 headersize = self._unpack(_fpartheadersize)[0]
803 if headersize < 0:
782 if headersize < 0:
804 raise error.BundleValueError('negative part header size: %i'
783 raise error.BundleValueError('negative part header size: %i'
805 % headersize)
784 % headersize)
806 indebug(self.ui, 'part header size: %i' % headersize)
785 indebug(self.ui, 'part header size: %i' % headersize)
807 if headersize:
786 if headersize:
808 return self._readexact(headersize)
787 return self._readexact(headersize)
809 return None
788 return None
810
789
811 def compressed(self):
790 def compressed(self):
812 self.params # load params
791 self.params # load params
813 return self._compressed
792 return self._compressed
814
793
815 def close(self):
794 def close(self):
816 """close underlying file"""
795 """close underlying file"""
817 if util.safehasattr(self._fp, 'close'):
796 if util.safehasattr(self._fp, 'close'):
818 return self._fp.close()
797 return self._fp.close()
819
798
820 formatmap = {'20': unbundle20}
799 formatmap = {'20': unbundle20}
821
800
822 b2streamparamsmap = {}
801 b2streamparamsmap = {}
823
802
824 def b2streamparamhandler(name):
803 def b2streamparamhandler(name):
825 """register a handler for a stream level parameter"""
804 """register a handler for a stream level parameter"""
826 def decorator(func):
805 def decorator(func):
827 assert name not in formatmap
806 assert name not in formatmap
828 b2streamparamsmap[name] = func
807 b2streamparamsmap[name] = func
829 return func
808 return func
830 return decorator
809 return decorator
831
810
832 @b2streamparamhandler('compression')
811 @b2streamparamhandler('compression')
833 def processcompression(unbundler, param, value):
812 def processcompression(unbundler, param, value):
834 """read compression parameter and install payload decompression"""
813 """read compression parameter and install payload decompression"""
835 if value not in util.compengines.supportedbundletypes:
814 if value not in util.compengines.supportedbundletypes:
836 raise error.BundleUnknownFeatureError(params=(param,),
815 raise error.BundleUnknownFeatureError(params=(param,),
837 values=(value,))
816 values=(value,))
838 unbundler._compengine = util.compengines.forbundletype(value)
817 unbundler._compengine = util.compengines.forbundletype(value)
839 if value is not None:
818 if value is not None:
840 unbundler._compressed = True
819 unbundler._compressed = True
841
820
842 class bundlepart(object):
821 class bundlepart(object):
843 """A bundle2 part contains application level payload
822 """A bundle2 part contains application level payload
844
823
845 The part `type` is used to route the part to the application level
824 The part `type` is used to route the part to the application level
846 handler.
825 handler.
847
826
848 The part payload is contained in ``part.data``. It could be raw bytes or a
827 The part payload is contained in ``part.data``. It could be raw bytes or a
849 generator of byte chunks.
828 generator of byte chunks.
850
829
851 You can add parameters to the part using the ``addparam`` method.
830 You can add parameters to the part using the ``addparam`` method.
852 Parameters can be either mandatory (default) or advisory. Remote side
831 Parameters can be either mandatory (default) or advisory. Remote side
853 should be able to safely ignore the advisory ones.
832 should be able to safely ignore the advisory ones.
854
833
855 Both data and parameters cannot be modified after the generation has begun.
834 Both data and parameters cannot be modified after the generation has begun.
856 """
835 """
857
836
858 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
837 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
859 data='', mandatory=True):
838 data='', mandatory=True):
860 validateparttype(parttype)
839 validateparttype(parttype)
861 self.id = None
840 self.id = None
862 self.type = parttype
841 self.type = parttype
863 self._data = data
842 self._data = data
864 self._mandatoryparams = list(mandatoryparams)
843 self._mandatoryparams = list(mandatoryparams)
865 self._advisoryparams = list(advisoryparams)
844 self._advisoryparams = list(advisoryparams)
866 # checking for duplicated entries
845 # checking for duplicated entries
867 self._seenparams = set()
846 self._seenparams = set()
868 for pname, __ in self._mandatoryparams + self._advisoryparams:
847 for pname, __ in self._mandatoryparams + self._advisoryparams:
869 if pname in self._seenparams:
848 if pname in self._seenparams:
870 raise error.ProgrammingError('duplicated params: %s' % pname)
849 raise error.ProgrammingError('duplicated params: %s' % pname)
871 self._seenparams.add(pname)
850 self._seenparams.add(pname)
872 # status of the part's generation:
851 # status of the part's generation:
873 # - None: not started,
852 # - None: not started,
874 # - False: currently generated,
853 # - False: currently generated,
875 # - True: generation done.
854 # - True: generation done.
876 self._generated = None
855 self._generated = None
877 self.mandatory = mandatory
856 self.mandatory = mandatory
878
857
879 def __repr__(self):
858 def __repr__(self):
880 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
859 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
881 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
860 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
882 % (cls, id(self), self.id, self.type, self.mandatory))
861 % (cls, id(self), self.id, self.type, self.mandatory))
883
862
884 def copy(self):
863 def copy(self):
885 """return a copy of the part
864 """return a copy of the part
886
865
887 The new part have the very same content but no partid assigned yet.
866 The new part have the very same content but no partid assigned yet.
888 Parts with generated data cannot be copied."""
867 Parts with generated data cannot be copied."""
889 assert not util.safehasattr(self.data, 'next')
868 assert not util.safehasattr(self.data, 'next')
890 return self.__class__(self.type, self._mandatoryparams,
869 return self.__class__(self.type, self._mandatoryparams,
891 self._advisoryparams, self._data, self.mandatory)
870 self._advisoryparams, self._data, self.mandatory)
892
871
893 # methods used to defines the part content
872 # methods used to defines the part content
894 @property
873 @property
895 def data(self):
874 def data(self):
896 return self._data
875 return self._data
897
876
898 @data.setter
877 @data.setter
899 def data(self, data):
878 def data(self, data):
900 if self._generated is not None:
879 if self._generated is not None:
901 raise error.ReadOnlyPartError('part is being generated')
880 raise error.ReadOnlyPartError('part is being generated')
902 self._data = data
881 self._data = data
903
882
904 @property
883 @property
905 def mandatoryparams(self):
884 def mandatoryparams(self):
906 # make it an immutable tuple to force people through ``addparam``
885 # make it an immutable tuple to force people through ``addparam``
907 return tuple(self._mandatoryparams)
886 return tuple(self._mandatoryparams)
908
887
909 @property
888 @property
910 def advisoryparams(self):
889 def advisoryparams(self):
911 # make it an immutable tuple to force people through ``addparam``
890 # make it an immutable tuple to force people through ``addparam``
912 return tuple(self._advisoryparams)
891 return tuple(self._advisoryparams)
913
892
914 def addparam(self, name, value='', mandatory=True):
893 def addparam(self, name, value='', mandatory=True):
915 """add a parameter to the part
894 """add a parameter to the part
916
895
917 If 'mandatory' is set to True, the remote handler must claim support
896 If 'mandatory' is set to True, the remote handler must claim support
918 for this parameter or the unbundling will be aborted.
897 for this parameter or the unbundling will be aborted.
919
898
920 The 'name' and 'value' cannot exceed 255 bytes each.
899 The 'name' and 'value' cannot exceed 255 bytes each.
921 """
900 """
922 if self._generated is not None:
901 if self._generated is not None:
923 raise error.ReadOnlyPartError('part is being generated')
902 raise error.ReadOnlyPartError('part is being generated')
924 if name in self._seenparams:
903 if name in self._seenparams:
925 raise ValueError('duplicated params: %s' % name)
904 raise ValueError('duplicated params: %s' % name)
926 self._seenparams.add(name)
905 self._seenparams.add(name)
927 params = self._advisoryparams
906 params = self._advisoryparams
928 if mandatory:
907 if mandatory:
929 params = self._mandatoryparams
908 params = self._mandatoryparams
930 params.append((name, value))
909 params.append((name, value))
931
910
932 # methods used to generates the bundle2 stream
911 # methods used to generates the bundle2 stream
933 def getchunks(self, ui):
912 def getchunks(self, ui):
934 if self._generated is not None:
913 if self._generated is not None:
935 raise error.ProgrammingError('part can only be consumed once')
914 raise error.ProgrammingError('part can only be consumed once')
936 self._generated = False
915 self._generated = False
937
916
938 if ui.debugflag:
917 if ui.debugflag:
939 msg = ['bundle2-output-part: "%s"' % self.type]
918 msg = ['bundle2-output-part: "%s"' % self.type]
940 if not self.mandatory:
919 if not self.mandatory:
941 msg.append(' (advisory)')
920 msg.append(' (advisory)')
942 nbmp = len(self.mandatoryparams)
921 nbmp = len(self.mandatoryparams)
943 nbap = len(self.advisoryparams)
922 nbap = len(self.advisoryparams)
944 if nbmp or nbap:
923 if nbmp or nbap:
945 msg.append(' (params:')
924 msg.append(' (params:')
946 if nbmp:
925 if nbmp:
947 msg.append(' %i mandatory' % nbmp)
926 msg.append(' %i mandatory' % nbmp)
948 if nbap:
927 if nbap:
949 msg.append(' %i advisory' % nbmp)
928 msg.append(' %i advisory' % nbmp)
950 msg.append(')')
929 msg.append(')')
951 if not self.data:
930 if not self.data:
952 msg.append(' empty payload')
931 msg.append(' empty payload')
953 elif util.safehasattr(self.data, 'next'):
932 elif util.safehasattr(self.data, 'next'):
954 msg.append(' streamed payload')
933 msg.append(' streamed payload')
955 else:
934 else:
956 msg.append(' %i bytes payload' % len(self.data))
935 msg.append(' %i bytes payload' % len(self.data))
957 msg.append('\n')
936 msg.append('\n')
958 ui.debug(''.join(msg))
937 ui.debug(''.join(msg))
959
938
960 #### header
939 #### header
961 if self.mandatory:
940 if self.mandatory:
962 parttype = self.type.upper()
941 parttype = self.type.upper()
963 else:
942 else:
964 parttype = self.type.lower()
943 parttype = self.type.lower()
965 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
944 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
966 ## parttype
945 ## parttype
967 header = [_pack(_fparttypesize, len(parttype)),
946 header = [_pack(_fparttypesize, len(parttype)),
968 parttype, _pack(_fpartid, self.id),
947 parttype, _pack(_fpartid, self.id),
969 ]
948 ]
970 ## parameters
949 ## parameters
971 # count
950 # count
972 manpar = self.mandatoryparams
951 manpar = self.mandatoryparams
973 advpar = self.advisoryparams
952 advpar = self.advisoryparams
974 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
953 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
975 # size
954 # size
976 parsizes = []
955 parsizes = []
977 for key, value in manpar:
956 for key, value in manpar:
978 parsizes.append(len(key))
957 parsizes.append(len(key))
979 parsizes.append(len(value))
958 parsizes.append(len(value))
980 for key, value in advpar:
959 for key, value in advpar:
981 parsizes.append(len(key))
960 parsizes.append(len(key))
982 parsizes.append(len(value))
961 parsizes.append(len(value))
983 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
962 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
984 header.append(paramsizes)
963 header.append(paramsizes)
985 # key, value
964 # key, value
986 for key, value in manpar:
965 for key, value in manpar:
987 header.append(key)
966 header.append(key)
988 header.append(value)
967 header.append(value)
989 for key, value in advpar:
968 for key, value in advpar:
990 header.append(key)
969 header.append(key)
991 header.append(value)
970 header.append(value)
992 ## finalize header
971 ## finalize header
993 headerchunk = ''.join(header)
972 headerchunk = ''.join(header)
994 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
973 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
995 yield _pack(_fpartheadersize, len(headerchunk))
974 yield _pack(_fpartheadersize, len(headerchunk))
996 yield headerchunk
975 yield headerchunk
997 ## payload
976 ## payload
998 try:
977 try:
999 for chunk in self._payloadchunks():
978 for chunk in self._payloadchunks():
1000 outdebug(ui, 'payload chunk size: %i' % len(chunk))
979 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1001 yield _pack(_fpayloadsize, len(chunk))
980 yield _pack(_fpayloadsize, len(chunk))
1002 yield chunk
981 yield chunk
1003 except GeneratorExit:
982 except GeneratorExit:
1004 # GeneratorExit means that nobody is listening for our
983 # GeneratorExit means that nobody is listening for our
1005 # results anyway, so just bail quickly rather than trying
984 # results anyway, so just bail quickly rather than trying
1006 # to produce an error part.
985 # to produce an error part.
1007 ui.debug('bundle2-generatorexit\n')
986 ui.debug('bundle2-generatorexit\n')
1008 raise
987 raise
1009 except BaseException as exc:
988 except BaseException as exc:
1010 # backup exception data for later
989 # backup exception data for later
1011 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
990 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1012 % exc)
991 % exc)
1013 exc_info = sys.exc_info()
992 exc_info = sys.exc_info()
1014 msg = 'unexpected error: %s' % exc
993 msg = 'unexpected error: %s' % exc
1015 interpart = bundlepart('error:abort', [('message', msg)],
994 interpart = bundlepart('error:abort', [('message', msg)],
1016 mandatory=False)
995 mandatory=False)
1017 interpart.id = 0
996 interpart.id = 0
1018 yield _pack(_fpayloadsize, -1)
997 yield _pack(_fpayloadsize, -1)
1019 for chunk in interpart.getchunks(ui=ui):
998 for chunk in interpart.getchunks(ui=ui):
1020 yield chunk
999 yield chunk
1021 outdebug(ui, 'closing payload chunk')
1000 outdebug(ui, 'closing payload chunk')
1022 # abort current part payload
1001 # abort current part payload
1023 yield _pack(_fpayloadsize, 0)
1002 yield _pack(_fpayloadsize, 0)
1024 if pycompat.ispy3:
1003 if pycompat.ispy3:
1025 raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
1004 raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
1026 else:
1005 else:
1027 exec("""raise exc_info[0], exc_info[1], exc_info[2]""")
1006 exec("""raise exc_info[0], exc_info[1], exc_info[2]""")
1028 # end of payload
1007 # end of payload
1029 outdebug(ui, 'closing payload chunk')
1008 outdebug(ui, 'closing payload chunk')
1030 yield _pack(_fpayloadsize, 0)
1009 yield _pack(_fpayloadsize, 0)
1031 self._generated = True
1010 self._generated = True
1032
1011
1033 def _payloadchunks(self):
1012 def _payloadchunks(self):
1034 """yield chunks of a the part payload
1013 """yield chunks of a the part payload
1035
1014
1036 Exists to handle the different methods to provide data to a part."""
1015 Exists to handle the different methods to provide data to a part."""
1037 # we only support fixed size data now.
1016 # we only support fixed size data now.
1038 # This will be improved in the future.
1017 # This will be improved in the future.
1039 if util.safehasattr(self.data, 'next'):
1018 if util.safehasattr(self.data, 'next'):
1040 buff = util.chunkbuffer(self.data)
1019 buff = util.chunkbuffer(self.data)
1041 chunk = buff.read(preferedchunksize)
1020 chunk = buff.read(preferedchunksize)
1042 while chunk:
1021 while chunk:
1043 yield chunk
1022 yield chunk
1044 chunk = buff.read(preferedchunksize)
1023 chunk = buff.read(preferedchunksize)
1045 elif len(self.data):
1024 elif len(self.data):
1046 yield self.data
1025 yield self.data
1047
1026
1048
1027
1049 flaginterrupt = -1
1028 flaginterrupt = -1
1050
1029
1051 class interrupthandler(unpackermixin):
1030 class interrupthandler(unpackermixin):
1052 """read one part and process it with restricted capability
1031 """read one part and process it with restricted capability
1053
1032
1054 This allows to transmit exception raised on the producer size during part
1033 This allows to transmit exception raised on the producer size during part
1055 iteration while the consumer is reading a part.
1034 iteration while the consumer is reading a part.
1056
1035
1057 Part processed in this manner only have access to a ui object,"""
1036 Part processed in this manner only have access to a ui object,"""
1058
1037
1059 def __init__(self, ui, fp):
1038 def __init__(self, ui, fp):
1060 super(interrupthandler, self).__init__(fp)
1039 super(interrupthandler, self).__init__(fp)
1061 self.ui = ui
1040 self.ui = ui
1062
1041
1063 def _readpartheader(self):
1042 def _readpartheader(self):
1064 """reads a part header size and return the bytes blob
1043 """reads a part header size and return the bytes blob
1065
1044
1066 returns None if empty"""
1045 returns None if empty"""
1067 headersize = self._unpack(_fpartheadersize)[0]
1046 headersize = self._unpack(_fpartheadersize)[0]
1068 if headersize < 0:
1047 if headersize < 0:
1069 raise error.BundleValueError('negative part header size: %i'
1048 raise error.BundleValueError('negative part header size: %i'
1070 % headersize)
1049 % headersize)
1071 indebug(self.ui, 'part header size: %i\n' % headersize)
1050 indebug(self.ui, 'part header size: %i\n' % headersize)
1072 if headersize:
1051 if headersize:
1073 return self._readexact(headersize)
1052 return self._readexact(headersize)
1074 return None
1053 return None
1075
1054
1076 def __call__(self):
1055 def __call__(self):
1077
1056
1078 self.ui.debug('bundle2-input-stream-interrupt:'
1057 self.ui.debug('bundle2-input-stream-interrupt:'
1079 ' opening out of band context\n')
1058 ' opening out of band context\n')
1080 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1059 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1081 headerblock = self._readpartheader()
1060 headerblock = self._readpartheader()
1082 if headerblock is None:
1061 if headerblock is None:
1083 indebug(self.ui, 'no part found during interruption.')
1062 indebug(self.ui, 'no part found during interruption.')
1084 return
1063 return
1085 part = unbundlepart(self.ui, headerblock, self._fp)
1064 part = unbundlepart(self.ui, headerblock, self._fp)
1086 op = interruptoperation(self.ui)
1065 op = interruptoperation(self.ui)
1087 _processpart(op, part)
1066 _processpart(op, part)
1088 self.ui.debug('bundle2-input-stream-interrupt:'
1067 self.ui.debug('bundle2-input-stream-interrupt:'
1089 ' closing out of band context\n')
1068 ' closing out of band context\n')
1090
1069
1091 class interruptoperation(object):
1070 class interruptoperation(object):
1092 """A limited operation to be use by part handler during interruption
1071 """A limited operation to be use by part handler during interruption
1093
1072
1094 It only have access to an ui object.
1073 It only have access to an ui object.
1095 """
1074 """
1096
1075
1097 def __init__(self, ui):
1076 def __init__(self, ui):
1098 self.ui = ui
1077 self.ui = ui
1099 self.reply = None
1078 self.reply = None
1100 self.captureoutput = False
1079 self.captureoutput = False
1101
1080
1102 @property
1081 @property
1103 def repo(self):
1082 def repo(self):
1104 raise error.ProgrammingError('no repo access from stream interruption')
1083 raise error.ProgrammingError('no repo access from stream interruption')
1105
1084
1106 def gettransaction(self):
1085 def gettransaction(self):
1107 raise TransactionUnavailable('no repo access from stream interruption')
1086 raise TransactionUnavailable('no repo access from stream interruption')
1108
1087
1109 class unbundlepart(unpackermixin):
1088 class unbundlepart(unpackermixin):
1110 """a bundle part read from a bundle"""
1089 """a bundle part read from a bundle"""
1111
1090
1112 def __init__(self, ui, header, fp):
1091 def __init__(self, ui, header, fp):
1113 super(unbundlepart, self).__init__(fp)
1092 super(unbundlepart, self).__init__(fp)
1093 self._seekable = (util.safehasattr(fp, 'seek') and
1094 util.safehasattr(fp, 'tell'))
1114 self.ui = ui
1095 self.ui = ui
1115 # unbundle state attr
1096 # unbundle state attr
1116 self._headerdata = header
1097 self._headerdata = header
1117 self._headeroffset = 0
1098 self._headeroffset = 0
1118 self._initialized = False
1099 self._initialized = False
1119 self.consumed = False
1100 self.consumed = False
1120 # part data
1101 # part data
1121 self.id = None
1102 self.id = None
1122 self.type = None
1103 self.type = None
1123 self.mandatoryparams = None
1104 self.mandatoryparams = None
1124 self.advisoryparams = None
1105 self.advisoryparams = None
1125 self.params = None
1106 self.params = None
1126 self.mandatorykeys = ()
1107 self.mandatorykeys = ()
1127 self._payloadstream = None
1108 self._payloadstream = None
1128 self._readheader()
1109 self._readheader()
1129 self._mandatory = None
1110 self._mandatory = None
1130 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1111 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1131 self._pos = 0
1112 self._pos = 0
1132
1113
1133 def _fromheader(self, size):
1114 def _fromheader(self, size):
1134 """return the next <size> byte from the header"""
1115 """return the next <size> byte from the header"""
1135 offset = self._headeroffset
1116 offset = self._headeroffset
1136 data = self._headerdata[offset:(offset + size)]
1117 data = self._headerdata[offset:(offset + size)]
1137 self._headeroffset = offset + size
1118 self._headeroffset = offset + size
1138 return data
1119 return data
1139
1120
1140 def _unpackheader(self, format):
1121 def _unpackheader(self, format):
1141 """read given format from header
1122 """read given format from header
1142
1123
1143 This automatically compute the size of the format to read."""
1124 This automatically compute the size of the format to read."""
1144 data = self._fromheader(struct.calcsize(format))
1125 data = self._fromheader(struct.calcsize(format))
1145 return _unpack(format, data)
1126 return _unpack(format, data)
1146
1127
1147 def _initparams(self, mandatoryparams, advisoryparams):
1128 def _initparams(self, mandatoryparams, advisoryparams):
1148 """internal function to setup all logic related parameters"""
1129 """internal function to setup all logic related parameters"""
1149 # make it read only to prevent people touching it by mistake.
1130 # make it read only to prevent people touching it by mistake.
1150 self.mandatoryparams = tuple(mandatoryparams)
1131 self.mandatoryparams = tuple(mandatoryparams)
1151 self.advisoryparams = tuple(advisoryparams)
1132 self.advisoryparams = tuple(advisoryparams)
1152 # user friendly UI
1133 # user friendly UI
1153 self.params = util.sortdict(self.mandatoryparams)
1134 self.params = util.sortdict(self.mandatoryparams)
1154 self.params.update(self.advisoryparams)
1135 self.params.update(self.advisoryparams)
1155 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1136 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1156
1137
1157 def _payloadchunks(self, chunknum=0):
1138 def _payloadchunks(self, chunknum=0):
1158 '''seek to specified chunk and start yielding data'''
1139 '''seek to specified chunk and start yielding data'''
1159 if len(self._chunkindex) == 0:
1140 if len(self._chunkindex) == 0:
1160 assert chunknum == 0, 'Must start with chunk 0'
1141 assert chunknum == 0, 'Must start with chunk 0'
1161 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1142 self._chunkindex.append((0, self._tellfp()))
1162 else:
1143 else:
1163 assert chunknum < len(self._chunkindex), \
1144 assert chunknum < len(self._chunkindex), \
1164 'Unknown chunk %d' % chunknum
1145 'Unknown chunk %d' % chunknum
1165 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1146 self._seekfp(self._chunkindex[chunknum][1])
1166
1147
1167 pos = self._chunkindex[chunknum][0]
1148 pos = self._chunkindex[chunknum][0]
1168 payloadsize = self._unpack(_fpayloadsize)[0]
1149 payloadsize = self._unpack(_fpayloadsize)[0]
1169 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1150 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1170 while payloadsize:
1151 while payloadsize:
1171 if payloadsize == flaginterrupt:
1152 if payloadsize == flaginterrupt:
1172 # interruption detection, the handler will now read a
1153 # interruption detection, the handler will now read a
1173 # single part and process it.
1154 # single part and process it.
1174 interrupthandler(self.ui, self._fp)()
1155 interrupthandler(self.ui, self._fp)()
1175 elif payloadsize < 0:
1156 elif payloadsize < 0:
1176 msg = 'negative payload chunk size: %i' % payloadsize
1157 msg = 'negative payload chunk size: %i' % payloadsize
1177 raise error.BundleValueError(msg)
1158 raise error.BundleValueError(msg)
1178 else:
1159 else:
1179 result = self._readexact(payloadsize)
1160 result = self._readexact(payloadsize)
1180 chunknum += 1
1161 chunknum += 1
1181 pos += payloadsize
1162 pos += payloadsize
1182 if chunknum == len(self._chunkindex):
1163 if chunknum == len(self._chunkindex):
1183 self._chunkindex.append((pos,
1164 self._chunkindex.append((pos, self._tellfp()))
1184 super(unbundlepart, self).tell()))
1185 yield result
1165 yield result
1186 payloadsize = self._unpack(_fpayloadsize)[0]
1166 payloadsize = self._unpack(_fpayloadsize)[0]
1187 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1167 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1188
1168
1189 def _findchunk(self, pos):
1169 def _findchunk(self, pos):
1190 '''for a given payload position, return a chunk number and offset'''
1170 '''for a given payload position, return a chunk number and offset'''
1191 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1171 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1192 if ppos == pos:
1172 if ppos == pos:
1193 return chunk, 0
1173 return chunk, 0
1194 elif ppos > pos:
1174 elif ppos > pos:
1195 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1175 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1196 raise ValueError('Unknown chunk')
1176 raise ValueError('Unknown chunk')
1197
1177
1198 def _readheader(self):
1178 def _readheader(self):
1199 """read the header and setup the object"""
1179 """read the header and setup the object"""
1200 typesize = self._unpackheader(_fparttypesize)[0]
1180 typesize = self._unpackheader(_fparttypesize)[0]
1201 self.type = self._fromheader(typesize)
1181 self.type = self._fromheader(typesize)
1202 indebug(self.ui, 'part type: "%s"' % self.type)
1182 indebug(self.ui, 'part type: "%s"' % self.type)
1203 self.id = self._unpackheader(_fpartid)[0]
1183 self.id = self._unpackheader(_fpartid)[0]
1204 indebug(self.ui, 'part id: "%s"' % self.id)
1184 indebug(self.ui, 'part id: "%s"' % self.id)
1205 # extract mandatory bit from type
1185 # extract mandatory bit from type
1206 self.mandatory = (self.type != self.type.lower())
1186 self.mandatory = (self.type != self.type.lower())
1207 self.type = self.type.lower()
1187 self.type = self.type.lower()
1208 ## reading parameters
1188 ## reading parameters
1209 # param count
1189 # param count
1210 mancount, advcount = self._unpackheader(_fpartparamcount)
1190 mancount, advcount = self._unpackheader(_fpartparamcount)
1211 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1191 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1212 # param size
1192 # param size
1213 fparamsizes = _makefpartparamsizes(mancount + advcount)
1193 fparamsizes = _makefpartparamsizes(mancount + advcount)
1214 paramsizes = self._unpackheader(fparamsizes)
1194 paramsizes = self._unpackheader(fparamsizes)
1215 # make it a list of couple again
1195 # make it a list of couple again
1216 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1196 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1217 # split mandatory from advisory
1197 # split mandatory from advisory
1218 mansizes = paramsizes[:mancount]
1198 mansizes = paramsizes[:mancount]
1219 advsizes = paramsizes[mancount:]
1199 advsizes = paramsizes[mancount:]
1220 # retrieve param value
1200 # retrieve param value
1221 manparams = []
1201 manparams = []
1222 for key, value in mansizes:
1202 for key, value in mansizes:
1223 manparams.append((self._fromheader(key), self._fromheader(value)))
1203 manparams.append((self._fromheader(key), self._fromheader(value)))
1224 advparams = []
1204 advparams = []
1225 for key, value in advsizes:
1205 for key, value in advsizes:
1226 advparams.append((self._fromheader(key), self._fromheader(value)))
1206 advparams.append((self._fromheader(key), self._fromheader(value)))
1227 self._initparams(manparams, advparams)
1207 self._initparams(manparams, advparams)
1228 ## part payload
1208 ## part payload
1229 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1209 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1230 # we read the data, tell it
1210 # we read the data, tell it
1231 self._initialized = True
1211 self._initialized = True
1232
1212
1233 def read(self, size=None):
1213 def read(self, size=None):
1234 """read payload data"""
1214 """read payload data"""
1235 if not self._initialized:
1215 if not self._initialized:
1236 self._readheader()
1216 self._readheader()
1237 if size is None:
1217 if size is None:
1238 data = self._payloadstream.read()
1218 data = self._payloadstream.read()
1239 else:
1219 else:
1240 data = self._payloadstream.read(size)
1220 data = self._payloadstream.read(size)
1241 self._pos += len(data)
1221 self._pos += len(data)
1242 if size is None or len(data) < size:
1222 if size is None or len(data) < size:
1243 if not self.consumed and self._pos:
1223 if not self.consumed and self._pos:
1244 self.ui.debug('bundle2-input-part: total payload size %i\n'
1224 self.ui.debug('bundle2-input-part: total payload size %i\n'
1245 % self._pos)
1225 % self._pos)
1246 self.consumed = True
1226 self.consumed = True
1247 return data
1227 return data
1248
1228
1249 def tell(self):
1229 def tell(self):
1250 return self._pos
1230 return self._pos
1251
1231
1252 def seek(self, offset, whence=0):
1232 def seek(self, offset, whence=0):
1253 if whence == 0:
1233 if whence == 0:
1254 newpos = offset
1234 newpos = offset
1255 elif whence == 1:
1235 elif whence == 1:
1256 newpos = self._pos + offset
1236 newpos = self._pos + offset
1257 elif whence == 2:
1237 elif whence == 2:
1258 if not self.consumed:
1238 if not self.consumed:
1259 self.read()
1239 self.read()
1260 newpos = self._chunkindex[-1][0] - offset
1240 newpos = self._chunkindex[-1][0] - offset
1261 else:
1241 else:
1262 raise ValueError('Unknown whence value: %r' % (whence,))
1242 raise ValueError('Unknown whence value: %r' % (whence,))
1263
1243
1264 if newpos > self._chunkindex[-1][0] and not self.consumed:
1244 if newpos > self._chunkindex[-1][0] and not self.consumed:
1265 self.read()
1245 self.read()
1266 if not 0 <= newpos <= self._chunkindex[-1][0]:
1246 if not 0 <= newpos <= self._chunkindex[-1][0]:
1267 raise ValueError('Offset out of range')
1247 raise ValueError('Offset out of range')
1268
1248
1269 if self._pos != newpos:
1249 if self._pos != newpos:
1270 chunk, internaloffset = self._findchunk(newpos)
1250 chunk, internaloffset = self._findchunk(newpos)
1271 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1251 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1272 adjust = self.read(internaloffset)
1252 adjust = self.read(internaloffset)
1273 if len(adjust) != internaloffset:
1253 if len(adjust) != internaloffset:
1274 raise error.Abort(_('Seek failed\n'))
1254 raise error.Abort(_('Seek failed\n'))
1275 self._pos = newpos
1255 self._pos = newpos
1276
1256
1257 def _seekfp(self, offset, whence=0):
1258 """move the underlying file pointer
1259
1260 This method is meant for internal usage by the bundle2 protocol only.
1261 They directly manipulate the low level stream including bundle2 level
1262 instruction.
1263
1264 Do not use it to implement higher-level logic or methods."""
1265 if self._seekable:
1266 return self._fp.seek(offset, whence)
1267 else:
1268 raise NotImplementedError(_('File pointer is not seekable'))
1269
1270 def _tellfp(self):
1271 """return the file offset, or None if file is not seekable
1272
1273 This method is meant for internal usage by the bundle2 protocol only.
1274 They directly manipulate the low level stream including bundle2 level
1275 instruction.
1276
1277 Do not use it to implement higher-level logic or methods."""
1278 if self._seekable:
1279 try:
1280 return self._fp.tell()
1281 except IOError as e:
1282 if e.errno == errno.ESPIPE:
1283 self._seekable = False
1284 else:
1285 raise
1286 return None
1287
1277 # These are only the static capabilities.
1288 # These are only the static capabilities.
1278 # Check the 'getrepocaps' function for the rest.
1289 # Check the 'getrepocaps' function for the rest.
1279 capabilities = {'HG20': (),
1290 capabilities = {'HG20': (),
1280 'error': ('abort', 'unsupportedcontent', 'pushraced',
1291 'error': ('abort', 'unsupportedcontent', 'pushraced',
1281 'pushkey'),
1292 'pushkey'),
1282 'listkeys': (),
1293 'listkeys': (),
1283 'pushkey': (),
1294 'pushkey': (),
1284 'digests': tuple(sorted(util.DIGESTS.keys())),
1295 'digests': tuple(sorted(util.DIGESTS.keys())),
1285 'remote-changegroup': ('http', 'https'),
1296 'remote-changegroup': ('http', 'https'),
1286 'hgtagsfnodes': (),
1297 'hgtagsfnodes': (),
1287 }
1298 }
1288
1299
1289 def getrepocaps(repo, allowpushback=False):
1300 def getrepocaps(repo, allowpushback=False):
1290 """return the bundle2 capabilities for a given repo
1301 """return the bundle2 capabilities for a given repo
1291
1302
1292 Exists to allow extensions (like evolution) to mutate the capabilities.
1303 Exists to allow extensions (like evolution) to mutate the capabilities.
1293 """
1304 """
1294 caps = capabilities.copy()
1305 caps = capabilities.copy()
1295 caps['changegroup'] = tuple(sorted(
1306 caps['changegroup'] = tuple(sorted(
1296 changegroup.supportedincomingversions(repo)))
1307 changegroup.supportedincomingversions(repo)))
1297 if obsolete.isenabled(repo, obsolete.exchangeopt):
1308 if obsolete.isenabled(repo, obsolete.exchangeopt):
1298 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1309 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1299 caps['obsmarkers'] = supportedformat
1310 caps['obsmarkers'] = supportedformat
1300 if allowpushback:
1311 if allowpushback:
1301 caps['pushback'] = ()
1312 caps['pushback'] = ()
1302 return caps
1313 return caps
1303
1314
1304 def bundle2caps(remote):
1315 def bundle2caps(remote):
1305 """return the bundle capabilities of a peer as dict"""
1316 """return the bundle capabilities of a peer as dict"""
1306 raw = remote.capable('bundle2')
1317 raw = remote.capable('bundle2')
1307 if not raw and raw != '':
1318 if not raw and raw != '':
1308 return {}
1319 return {}
1309 capsblob = urlreq.unquote(remote.capable('bundle2'))
1320 capsblob = urlreq.unquote(remote.capable('bundle2'))
1310 return decodecaps(capsblob)
1321 return decodecaps(capsblob)
1311
1322
1312 def obsmarkersversion(caps):
1323 def obsmarkersversion(caps):
1313 """extract the list of supported obsmarkers versions from a bundle2caps dict
1324 """extract the list of supported obsmarkers versions from a bundle2caps dict
1314 """
1325 """
1315 obscaps = caps.get('obsmarkers', ())
1326 obscaps = caps.get('obsmarkers', ())
1316 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1327 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1317
1328
1318 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1329 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1319 compopts=None):
1330 compopts=None):
1320 """Write a bundle file and return its filename.
1331 """Write a bundle file and return its filename.
1321
1332
1322 Existing files will not be overwritten.
1333 Existing files will not be overwritten.
1323 If no filename is specified, a temporary file is created.
1334 If no filename is specified, a temporary file is created.
1324 bz2 compression can be turned off.
1335 bz2 compression can be turned off.
1325 The bundle file will be deleted in case of errors.
1336 The bundle file will be deleted in case of errors.
1326 """
1337 """
1327
1338
1328 if bundletype == "HG20":
1339 if bundletype == "HG20":
1329 bundle = bundle20(ui)
1340 bundle = bundle20(ui)
1330 bundle.setcompression(compression, compopts)
1341 bundle.setcompression(compression, compopts)
1331 part = bundle.newpart('changegroup', data=cg.getchunks())
1342 part = bundle.newpart('changegroup', data=cg.getchunks())
1332 part.addparam('version', cg.version)
1343 part.addparam('version', cg.version)
1333 if 'clcount' in cg.extras:
1344 if 'clcount' in cg.extras:
1334 part.addparam('nbchanges', str(cg.extras['clcount']),
1345 part.addparam('nbchanges', str(cg.extras['clcount']),
1335 mandatory=False)
1346 mandatory=False)
1336 chunkiter = bundle.getchunks()
1347 chunkiter = bundle.getchunks()
1337 else:
1348 else:
1338 # compression argument is only for the bundle2 case
1349 # compression argument is only for the bundle2 case
1339 assert compression is None
1350 assert compression is None
1340 if cg.version != '01':
1351 if cg.version != '01':
1341 raise error.Abort(_('old bundle types only supports v1 '
1352 raise error.Abort(_('old bundle types only supports v1 '
1342 'changegroups'))
1353 'changegroups'))
1343 header, comp = bundletypes[bundletype]
1354 header, comp = bundletypes[bundletype]
1344 if comp not in util.compengines.supportedbundletypes:
1355 if comp not in util.compengines.supportedbundletypes:
1345 raise error.Abort(_('unknown stream compression type: %s')
1356 raise error.Abort(_('unknown stream compression type: %s')
1346 % comp)
1357 % comp)
1347 compengine = util.compengines.forbundletype(comp)
1358 compengine = util.compengines.forbundletype(comp)
1348 def chunkiter():
1359 def chunkiter():
1349 yield header
1360 yield header
1350 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1361 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1351 yield chunk
1362 yield chunk
1352 chunkiter = chunkiter()
1363 chunkiter = chunkiter()
1353
1364
1354 # parse the changegroup data, otherwise we will block
1365 # parse the changegroup data, otherwise we will block
1355 # in case of sshrepo because we don't know the end of the stream
1366 # in case of sshrepo because we don't know the end of the stream
1356 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1367 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1357
1368
1358 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1369 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1359 def handlechangegroup(op, inpart):
1370 def handlechangegroup(op, inpart):
1360 """apply a changegroup part on the repo
1371 """apply a changegroup part on the repo
1361
1372
1362 This is a very early implementation that will massive rework before being
1373 This is a very early implementation that will massive rework before being
1363 inflicted to any end-user.
1374 inflicted to any end-user.
1364 """
1375 """
1365 # Make sure we trigger a transaction creation
1376 # Make sure we trigger a transaction creation
1366 #
1377 #
1367 # The addchangegroup function will get a transaction object by itself, but
1378 # The addchangegroup function will get a transaction object by itself, but
1368 # we need to make sure we trigger the creation of a transaction object used
1379 # we need to make sure we trigger the creation of a transaction object used
1369 # for the whole processing scope.
1380 # for the whole processing scope.
1370 op.gettransaction()
1381 op.gettransaction()
1371 unpackerversion = inpart.params.get('version', '01')
1382 unpackerversion = inpart.params.get('version', '01')
1372 # We should raise an appropriate exception here
1383 # We should raise an appropriate exception here
1373 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1384 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1374 # the source and url passed here are overwritten by the one contained in
1385 # the source and url passed here are overwritten by the one contained in
1375 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1386 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1376 nbchangesets = None
1387 nbchangesets = None
1377 if 'nbchanges' in inpart.params:
1388 if 'nbchanges' in inpart.params:
1378 nbchangesets = int(inpart.params.get('nbchanges'))
1389 nbchangesets = int(inpart.params.get('nbchanges'))
1379 if ('treemanifest' in inpart.params and
1390 if ('treemanifest' in inpart.params and
1380 'treemanifest' not in op.repo.requirements):
1391 'treemanifest' not in op.repo.requirements):
1381 if len(op.repo.changelog) != 0:
1392 if len(op.repo.changelog) != 0:
1382 raise error.Abort(_(
1393 raise error.Abort(_(
1383 "bundle contains tree manifests, but local repo is "
1394 "bundle contains tree manifests, but local repo is "
1384 "non-empty and does not use tree manifests"))
1395 "non-empty and does not use tree manifests"))
1385 op.repo.requirements.add('treemanifest')
1396 op.repo.requirements.add('treemanifest')
1386 op.repo._applyopenerreqs()
1397 op.repo._applyopenerreqs()
1387 op.repo._writerequirements()
1398 op.repo._writerequirements()
1388 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1399 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1389 op.records.add('changegroup', {'return': ret})
1400 op.records.add('changegroup', {'return': ret})
1390 if op.reply is not None:
1401 if op.reply is not None:
1391 # This is definitely not the final form of this
1402 # This is definitely not the final form of this
1392 # return. But one need to start somewhere.
1403 # return. But one need to start somewhere.
1393 part = op.reply.newpart('reply:changegroup', mandatory=False)
1404 part = op.reply.newpart('reply:changegroup', mandatory=False)
1394 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1405 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1395 part.addparam('return', '%i' % ret, mandatory=False)
1406 part.addparam('return', '%i' % ret, mandatory=False)
1396 assert not inpart.read()
1407 assert not inpart.read()
1397
1408
1398 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1409 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1399 ['digest:%s' % k for k in util.DIGESTS.keys()])
1410 ['digest:%s' % k for k in util.DIGESTS.keys()])
1400 @parthandler('remote-changegroup', _remotechangegroupparams)
1411 @parthandler('remote-changegroup', _remotechangegroupparams)
1401 def handleremotechangegroup(op, inpart):
1412 def handleremotechangegroup(op, inpart):
1402 """apply a bundle10 on the repo, given an url and validation information
1413 """apply a bundle10 on the repo, given an url and validation information
1403
1414
1404 All the information about the remote bundle to import are given as
1415 All the information about the remote bundle to import are given as
1405 parameters. The parameters include:
1416 parameters. The parameters include:
1406 - url: the url to the bundle10.
1417 - url: the url to the bundle10.
1407 - size: the bundle10 file size. It is used to validate what was
1418 - size: the bundle10 file size. It is used to validate what was
1408 retrieved by the client matches the server knowledge about the bundle.
1419 retrieved by the client matches the server knowledge about the bundle.
1409 - digests: a space separated list of the digest types provided as
1420 - digests: a space separated list of the digest types provided as
1410 parameters.
1421 parameters.
1411 - digest:<digest-type>: the hexadecimal representation of the digest with
1422 - digest:<digest-type>: the hexadecimal representation of the digest with
1412 that name. Like the size, it is used to validate what was retrieved by
1423 that name. Like the size, it is used to validate what was retrieved by
1413 the client matches what the server knows about the bundle.
1424 the client matches what the server knows about the bundle.
1414
1425
1415 When multiple digest types are given, all of them are checked.
1426 When multiple digest types are given, all of them are checked.
1416 """
1427 """
1417 try:
1428 try:
1418 raw_url = inpart.params['url']
1429 raw_url = inpart.params['url']
1419 except KeyError:
1430 except KeyError:
1420 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1431 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1421 parsed_url = util.url(raw_url)
1432 parsed_url = util.url(raw_url)
1422 if parsed_url.scheme not in capabilities['remote-changegroup']:
1433 if parsed_url.scheme not in capabilities['remote-changegroup']:
1423 raise error.Abort(_('remote-changegroup does not support %s urls') %
1434 raise error.Abort(_('remote-changegroup does not support %s urls') %
1424 parsed_url.scheme)
1435 parsed_url.scheme)
1425
1436
1426 try:
1437 try:
1427 size = int(inpart.params['size'])
1438 size = int(inpart.params['size'])
1428 except ValueError:
1439 except ValueError:
1429 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1440 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1430 % 'size')
1441 % 'size')
1431 except KeyError:
1442 except KeyError:
1432 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1443 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1433
1444
1434 digests = {}
1445 digests = {}
1435 for typ in inpart.params.get('digests', '').split():
1446 for typ in inpart.params.get('digests', '').split():
1436 param = 'digest:%s' % typ
1447 param = 'digest:%s' % typ
1437 try:
1448 try:
1438 value = inpart.params[param]
1449 value = inpart.params[param]
1439 except KeyError:
1450 except KeyError:
1440 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1451 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1441 param)
1452 param)
1442 digests[typ] = value
1453 digests[typ] = value
1443
1454
1444 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1455 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1445
1456
1446 # Make sure we trigger a transaction creation
1457 # Make sure we trigger a transaction creation
1447 #
1458 #
1448 # The addchangegroup function will get a transaction object by itself, but
1459 # The addchangegroup function will get a transaction object by itself, but
1449 # we need to make sure we trigger the creation of a transaction object used
1460 # we need to make sure we trigger the creation of a transaction object used
1450 # for the whole processing scope.
1461 # for the whole processing scope.
1451 op.gettransaction()
1462 op.gettransaction()
1452 from . import exchange
1463 from . import exchange
1453 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1464 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1454 if not isinstance(cg, changegroup.cg1unpacker):
1465 if not isinstance(cg, changegroup.cg1unpacker):
1455 raise error.Abort(_('%s: not a bundle version 1.0') %
1466 raise error.Abort(_('%s: not a bundle version 1.0') %
1456 util.hidepassword(raw_url))
1467 util.hidepassword(raw_url))
1457 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1468 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1458 op.records.add('changegroup', {'return': ret})
1469 op.records.add('changegroup', {'return': ret})
1459 if op.reply is not None:
1470 if op.reply is not None:
1460 # This is definitely not the final form of this
1471 # This is definitely not the final form of this
1461 # return. But one need to start somewhere.
1472 # return. But one need to start somewhere.
1462 part = op.reply.newpart('reply:changegroup')
1473 part = op.reply.newpart('reply:changegroup')
1463 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1474 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1464 part.addparam('return', '%i' % ret, mandatory=False)
1475 part.addparam('return', '%i' % ret, mandatory=False)
1465 try:
1476 try:
1466 real_part.validate()
1477 real_part.validate()
1467 except error.Abort as e:
1478 except error.Abort as e:
1468 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1479 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1469 (util.hidepassword(raw_url), str(e)))
1480 (util.hidepassword(raw_url), str(e)))
1470 assert not inpart.read()
1481 assert not inpart.read()
1471
1482
1472 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1483 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1473 def handlereplychangegroup(op, inpart):
1484 def handlereplychangegroup(op, inpart):
1474 ret = int(inpart.params['return'])
1485 ret = int(inpart.params['return'])
1475 replyto = int(inpart.params['in-reply-to'])
1486 replyto = int(inpart.params['in-reply-to'])
1476 op.records.add('changegroup', {'return': ret}, replyto)
1487 op.records.add('changegroup', {'return': ret}, replyto)
1477
1488
1478 @parthandler('check:heads')
1489 @parthandler('check:heads')
1479 def handlecheckheads(op, inpart):
1490 def handlecheckheads(op, inpart):
1480 """check that head of the repo did not change
1491 """check that head of the repo did not change
1481
1492
1482 This is used to detect a push race when using unbundle.
1493 This is used to detect a push race when using unbundle.
1483 This replaces the "heads" argument of unbundle."""
1494 This replaces the "heads" argument of unbundle."""
1484 h = inpart.read(20)
1495 h = inpart.read(20)
1485 heads = []
1496 heads = []
1486 while len(h) == 20:
1497 while len(h) == 20:
1487 heads.append(h)
1498 heads.append(h)
1488 h = inpart.read(20)
1499 h = inpart.read(20)
1489 assert not h
1500 assert not h
1490 # Trigger a transaction so that we are guaranteed to have the lock now.
1501 # Trigger a transaction so that we are guaranteed to have the lock now.
1491 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1502 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1492 op.gettransaction()
1503 op.gettransaction()
1493 if sorted(heads) != sorted(op.repo.heads()):
1504 if sorted(heads) != sorted(op.repo.heads()):
1494 raise error.PushRaced('repository changed while pushing - '
1505 raise error.PushRaced('repository changed while pushing - '
1495 'please try again')
1506 'please try again')
1496
1507
1497 @parthandler('output')
1508 @parthandler('output')
1498 def handleoutput(op, inpart):
1509 def handleoutput(op, inpart):
1499 """forward output captured on the server to the client"""
1510 """forward output captured on the server to the client"""
1500 for line in inpart.read().splitlines():
1511 for line in inpart.read().splitlines():
1501 op.ui.status(_('remote: %s\n') % line)
1512 op.ui.status(_('remote: %s\n') % line)
1502
1513
1503 @parthandler('replycaps')
1514 @parthandler('replycaps')
1504 def handlereplycaps(op, inpart):
1515 def handlereplycaps(op, inpart):
1505 """Notify that a reply bundle should be created
1516 """Notify that a reply bundle should be created
1506
1517
1507 The payload contains the capabilities information for the reply"""
1518 The payload contains the capabilities information for the reply"""
1508 caps = decodecaps(inpart.read())
1519 caps = decodecaps(inpart.read())
1509 if op.reply is None:
1520 if op.reply is None:
1510 op.reply = bundle20(op.ui, caps)
1521 op.reply = bundle20(op.ui, caps)
1511
1522
1512 class AbortFromPart(error.Abort):
1523 class AbortFromPart(error.Abort):
1513 """Sub-class of Abort that denotes an error from a bundle2 part."""
1524 """Sub-class of Abort that denotes an error from a bundle2 part."""
1514
1525
1515 @parthandler('error:abort', ('message', 'hint'))
1526 @parthandler('error:abort', ('message', 'hint'))
1516 def handleerrorabort(op, inpart):
1527 def handleerrorabort(op, inpart):
1517 """Used to transmit abort error over the wire"""
1528 """Used to transmit abort error over the wire"""
1518 raise AbortFromPart(inpart.params['message'],
1529 raise AbortFromPart(inpart.params['message'],
1519 hint=inpart.params.get('hint'))
1530 hint=inpart.params.get('hint'))
1520
1531
1521 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1532 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1522 'in-reply-to'))
1533 'in-reply-to'))
1523 def handleerrorpushkey(op, inpart):
1534 def handleerrorpushkey(op, inpart):
1524 """Used to transmit failure of a mandatory pushkey over the wire"""
1535 """Used to transmit failure of a mandatory pushkey over the wire"""
1525 kwargs = {}
1536 kwargs = {}
1526 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1537 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1527 value = inpart.params.get(name)
1538 value = inpart.params.get(name)
1528 if value is not None:
1539 if value is not None:
1529 kwargs[name] = value
1540 kwargs[name] = value
1530 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1541 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1531
1542
1532 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1543 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1533 def handleerrorunsupportedcontent(op, inpart):
1544 def handleerrorunsupportedcontent(op, inpart):
1534 """Used to transmit unknown content error over the wire"""
1545 """Used to transmit unknown content error over the wire"""
1535 kwargs = {}
1546 kwargs = {}
1536 parttype = inpart.params.get('parttype')
1547 parttype = inpart.params.get('parttype')
1537 if parttype is not None:
1548 if parttype is not None:
1538 kwargs['parttype'] = parttype
1549 kwargs['parttype'] = parttype
1539 params = inpart.params.get('params')
1550 params = inpart.params.get('params')
1540 if params is not None:
1551 if params is not None:
1541 kwargs['params'] = params.split('\0')
1552 kwargs['params'] = params.split('\0')
1542
1553
1543 raise error.BundleUnknownFeatureError(**kwargs)
1554 raise error.BundleUnknownFeatureError(**kwargs)
1544
1555
1545 @parthandler('error:pushraced', ('message',))
1556 @parthandler('error:pushraced', ('message',))
1546 def handleerrorpushraced(op, inpart):
1557 def handleerrorpushraced(op, inpart):
1547 """Used to transmit push race error over the wire"""
1558 """Used to transmit push race error over the wire"""
1548 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1559 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1549
1560
1550 @parthandler('listkeys', ('namespace',))
1561 @parthandler('listkeys', ('namespace',))
1551 def handlelistkeys(op, inpart):
1562 def handlelistkeys(op, inpart):
1552 """retrieve pushkey namespace content stored in a bundle2"""
1563 """retrieve pushkey namespace content stored in a bundle2"""
1553 namespace = inpart.params['namespace']
1564 namespace = inpart.params['namespace']
1554 r = pushkey.decodekeys(inpart.read())
1565 r = pushkey.decodekeys(inpart.read())
1555 op.records.add('listkeys', (namespace, r))
1566 op.records.add('listkeys', (namespace, r))
1556
1567
1557 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1568 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1558 def handlepushkey(op, inpart):
1569 def handlepushkey(op, inpart):
1559 """process a pushkey request"""
1570 """process a pushkey request"""
1560 dec = pushkey.decode
1571 dec = pushkey.decode
1561 namespace = dec(inpart.params['namespace'])
1572 namespace = dec(inpart.params['namespace'])
1562 key = dec(inpart.params['key'])
1573 key = dec(inpart.params['key'])
1563 old = dec(inpart.params['old'])
1574 old = dec(inpart.params['old'])
1564 new = dec(inpart.params['new'])
1575 new = dec(inpart.params['new'])
1565 # Grab the transaction to ensure that we have the lock before performing the
1576 # Grab the transaction to ensure that we have the lock before performing the
1566 # pushkey.
1577 # pushkey.
1567 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1578 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1568 op.gettransaction()
1579 op.gettransaction()
1569 ret = op.repo.pushkey(namespace, key, old, new)
1580 ret = op.repo.pushkey(namespace, key, old, new)
1570 record = {'namespace': namespace,
1581 record = {'namespace': namespace,
1571 'key': key,
1582 'key': key,
1572 'old': old,
1583 'old': old,
1573 'new': new}
1584 'new': new}
1574 op.records.add('pushkey', record)
1585 op.records.add('pushkey', record)
1575 if op.reply is not None:
1586 if op.reply is not None:
1576 rpart = op.reply.newpart('reply:pushkey')
1587 rpart = op.reply.newpart('reply:pushkey')
1577 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1588 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1578 rpart.addparam('return', '%i' % ret, mandatory=False)
1589 rpart.addparam('return', '%i' % ret, mandatory=False)
1579 if inpart.mandatory and not ret:
1590 if inpart.mandatory and not ret:
1580 kwargs = {}
1591 kwargs = {}
1581 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1592 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1582 if key in inpart.params:
1593 if key in inpart.params:
1583 kwargs[key] = inpart.params[key]
1594 kwargs[key] = inpart.params[key]
1584 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1595 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1585
1596
1586 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1597 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1587 def handlepushkeyreply(op, inpart):
1598 def handlepushkeyreply(op, inpart):
1588 """retrieve the result of a pushkey request"""
1599 """retrieve the result of a pushkey request"""
1589 ret = int(inpart.params['return'])
1600 ret = int(inpart.params['return'])
1590 partid = int(inpart.params['in-reply-to'])
1601 partid = int(inpart.params['in-reply-to'])
1591 op.records.add('pushkey', {'return': ret}, partid)
1602 op.records.add('pushkey', {'return': ret}, partid)
1592
1603
1593 @parthandler('obsmarkers')
1604 @parthandler('obsmarkers')
1594 def handleobsmarker(op, inpart):
1605 def handleobsmarker(op, inpart):
1595 """add a stream of obsmarkers to the repo"""
1606 """add a stream of obsmarkers to the repo"""
1596 tr = op.gettransaction()
1607 tr = op.gettransaction()
1597 markerdata = inpart.read()
1608 markerdata = inpart.read()
1598 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1609 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1599 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1610 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1600 % len(markerdata))
1611 % len(markerdata))
1601 # The mergemarkers call will crash if marker creation is not enabled.
1612 # The mergemarkers call will crash if marker creation is not enabled.
1602 # we want to avoid this if the part is advisory.
1613 # we want to avoid this if the part is advisory.
1603 if not inpart.mandatory and op.repo.obsstore.readonly:
1614 if not inpart.mandatory and op.repo.obsstore.readonly:
1604 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1615 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1605 return
1616 return
1606 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1617 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1607 if new:
1618 if new:
1608 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1619 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1609 op.records.add('obsmarkers', {'new': new})
1620 op.records.add('obsmarkers', {'new': new})
1610 if op.reply is not None:
1621 if op.reply is not None:
1611 rpart = op.reply.newpart('reply:obsmarkers')
1622 rpart = op.reply.newpart('reply:obsmarkers')
1612 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1623 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1613 rpart.addparam('new', '%i' % new, mandatory=False)
1624 rpart.addparam('new', '%i' % new, mandatory=False)
1614
1625
1615
1626
1616 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1627 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1617 def handleobsmarkerreply(op, inpart):
1628 def handleobsmarkerreply(op, inpart):
1618 """retrieve the result of a pushkey request"""
1629 """retrieve the result of a pushkey request"""
1619 ret = int(inpart.params['new'])
1630 ret = int(inpart.params['new'])
1620 partid = int(inpart.params['in-reply-to'])
1631 partid = int(inpart.params['in-reply-to'])
1621 op.records.add('obsmarkers', {'new': ret}, partid)
1632 op.records.add('obsmarkers', {'new': ret}, partid)
1622
1633
1623 @parthandler('hgtagsfnodes')
1634 @parthandler('hgtagsfnodes')
1624 def handlehgtagsfnodes(op, inpart):
1635 def handlehgtagsfnodes(op, inpart):
1625 """Applies .hgtags fnodes cache entries to the local repo.
1636 """Applies .hgtags fnodes cache entries to the local repo.
1626
1637
1627 Payload is pairs of 20 byte changeset nodes and filenodes.
1638 Payload is pairs of 20 byte changeset nodes and filenodes.
1628 """
1639 """
1629 # Grab the transaction so we ensure that we have the lock at this point.
1640 # Grab the transaction so we ensure that we have the lock at this point.
1630 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1641 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1631 op.gettransaction()
1642 op.gettransaction()
1632 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1643 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1633
1644
1634 count = 0
1645 count = 0
1635 while True:
1646 while True:
1636 node = inpart.read(20)
1647 node = inpart.read(20)
1637 fnode = inpart.read(20)
1648 fnode = inpart.read(20)
1638 if len(node) < 20 or len(fnode) < 20:
1649 if len(node) < 20 or len(fnode) < 20:
1639 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1650 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1640 break
1651 break
1641 cache.setfnode(node, fnode)
1652 cache.setfnode(node, fnode)
1642 count += 1
1653 count += 1
1643
1654
1644 cache.write()
1655 cache.write()
1645 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1656 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now