##// END OF EJS Templates
bundle2: implement a basic __repr__ for bundle2 part...
Pierre-Yves David -
r30872:1f51b465 default
parent child Browse files
Show More
@@ -1,1619 +1,1624 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 pushkey,
161 pushkey,
162 pycompat,
162 pycompat,
163 tags,
163 tags,
164 url,
164 url,
165 util,
165 util,
166 )
166 )
167
167
168 urlerr = util.urlerr
168 urlerr = util.urlerr
169 urlreq = util.urlreq
169 urlreq = util.urlreq
170
170
171 _pack = struct.pack
171 _pack = struct.pack
172 _unpack = struct.unpack
172 _unpack = struct.unpack
173
173
174 _fstreamparamsize = '>i'
174 _fstreamparamsize = '>i'
175 _fpartheadersize = '>i'
175 _fpartheadersize = '>i'
176 _fparttypesize = '>B'
176 _fparttypesize = '>B'
177 _fpartid = '>I'
177 _fpartid = '>I'
178 _fpayloadsize = '>i'
178 _fpayloadsize = '>i'
179 _fpartparamcount = '>BB'
179 _fpartparamcount = '>BB'
180
180
181 preferedchunksize = 4096
181 preferedchunksize = 4096
182
182
183 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
183 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
184
184
185 def outdebug(ui, message):
185 def outdebug(ui, message):
186 """debug regarding output stream (bundling)"""
186 """debug regarding output stream (bundling)"""
187 if ui.configbool('devel', 'bundle2.debug', False):
187 if ui.configbool('devel', 'bundle2.debug', False):
188 ui.debug('bundle2-output: %s\n' % message)
188 ui.debug('bundle2-output: %s\n' % message)
189
189
190 def indebug(ui, message):
190 def indebug(ui, message):
191 """debug on input stream (unbundling)"""
191 """debug on input stream (unbundling)"""
192 if ui.configbool('devel', 'bundle2.debug', False):
192 if ui.configbool('devel', 'bundle2.debug', False):
193 ui.debug('bundle2-input: %s\n' % message)
193 ui.debug('bundle2-input: %s\n' % message)
194
194
195 def validateparttype(parttype):
195 def validateparttype(parttype):
196 """raise ValueError if a parttype contains invalid character"""
196 """raise ValueError if a parttype contains invalid character"""
197 if _parttypeforbidden.search(parttype):
197 if _parttypeforbidden.search(parttype):
198 raise ValueError(parttype)
198 raise ValueError(parttype)
199
199
200 def _makefpartparamsizes(nbparams):
200 def _makefpartparamsizes(nbparams):
201 """return a struct format to read part parameter sizes
201 """return a struct format to read part parameter sizes
202
202
203 The number parameters is variable so we need to build that format
203 The number parameters is variable so we need to build that format
204 dynamically.
204 dynamically.
205 """
205 """
206 return '>'+('BB'*nbparams)
206 return '>'+('BB'*nbparams)
207
207
208 parthandlermapping = {}
208 parthandlermapping = {}
209
209
210 def parthandler(parttype, params=()):
210 def parthandler(parttype, params=()):
211 """decorator that register a function as a bundle2 part handler
211 """decorator that register a function as a bundle2 part handler
212
212
213 eg::
213 eg::
214
214
215 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
215 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
216 def myparttypehandler(...):
216 def myparttypehandler(...):
217 '''process a part of type "my part".'''
217 '''process a part of type "my part".'''
218 ...
218 ...
219 """
219 """
220 validateparttype(parttype)
220 validateparttype(parttype)
221 def _decorator(func):
221 def _decorator(func):
222 lparttype = parttype.lower() # enforce lower case matching.
222 lparttype = parttype.lower() # enforce lower case matching.
223 assert lparttype not in parthandlermapping
223 assert lparttype not in parthandlermapping
224 parthandlermapping[lparttype] = func
224 parthandlermapping[lparttype] = func
225 func.params = frozenset(params)
225 func.params = frozenset(params)
226 return func
226 return func
227 return _decorator
227 return _decorator
228
228
229 class unbundlerecords(object):
229 class unbundlerecords(object):
230 """keep record of what happens during and unbundle
230 """keep record of what happens during and unbundle
231
231
232 New records are added using `records.add('cat', obj)`. Where 'cat' is a
232 New records are added using `records.add('cat', obj)`. Where 'cat' is a
233 category of record and obj is an arbitrary object.
233 category of record and obj is an arbitrary object.
234
234
235 `records['cat']` will return all entries of this category 'cat'.
235 `records['cat']` will return all entries of this category 'cat'.
236
236
237 Iterating on the object itself will yield `('category', obj)` tuples
237 Iterating on the object itself will yield `('category', obj)` tuples
238 for all entries.
238 for all entries.
239
239
240 All iterations happens in chronological order.
240 All iterations happens in chronological order.
241 """
241 """
242
242
243 def __init__(self):
243 def __init__(self):
244 self._categories = {}
244 self._categories = {}
245 self._sequences = []
245 self._sequences = []
246 self._replies = {}
246 self._replies = {}
247
247
248 def add(self, category, entry, inreplyto=None):
248 def add(self, category, entry, inreplyto=None):
249 """add a new record of a given category.
249 """add a new record of a given category.
250
250
251 The entry can then be retrieved in the list returned by
251 The entry can then be retrieved in the list returned by
252 self['category']."""
252 self['category']."""
253 self._categories.setdefault(category, []).append(entry)
253 self._categories.setdefault(category, []).append(entry)
254 self._sequences.append((category, entry))
254 self._sequences.append((category, entry))
255 if inreplyto is not None:
255 if inreplyto is not None:
256 self.getreplies(inreplyto).add(category, entry)
256 self.getreplies(inreplyto).add(category, entry)
257
257
258 def getreplies(self, partid):
258 def getreplies(self, partid):
259 """get the records that are replies to a specific part"""
259 """get the records that are replies to a specific part"""
260 return self._replies.setdefault(partid, unbundlerecords())
260 return self._replies.setdefault(partid, unbundlerecords())
261
261
262 def __getitem__(self, cat):
262 def __getitem__(self, cat):
263 return tuple(self._categories.get(cat, ()))
263 return tuple(self._categories.get(cat, ()))
264
264
265 def __iter__(self):
265 def __iter__(self):
266 return iter(self._sequences)
266 return iter(self._sequences)
267
267
268 def __len__(self):
268 def __len__(self):
269 return len(self._sequences)
269 return len(self._sequences)
270
270
271 def __nonzero__(self):
271 def __nonzero__(self):
272 return bool(self._sequences)
272 return bool(self._sequences)
273
273
274 class bundleoperation(object):
274 class bundleoperation(object):
275 """an object that represents a single bundling process
275 """an object that represents a single bundling process
276
276
277 Its purpose is to carry unbundle-related objects and states.
277 Its purpose is to carry unbundle-related objects and states.
278
278
279 A new object should be created at the beginning of each bundle processing.
279 A new object should be created at the beginning of each bundle processing.
280 The object is to be returned by the processing function.
280 The object is to be returned by the processing function.
281
281
282 The object has very little content now it will ultimately contain:
282 The object has very little content now it will ultimately contain:
283 * an access to the repo the bundle is applied to,
283 * an access to the repo the bundle is applied to,
284 * a ui object,
284 * a ui object,
285 * a way to retrieve a transaction to add changes to the repo,
285 * a way to retrieve a transaction to add changes to the repo,
286 * a way to record the result of processing each part,
286 * a way to record the result of processing each part,
287 * a way to construct a bundle response when applicable.
287 * a way to construct a bundle response when applicable.
288 """
288 """
289
289
290 def __init__(self, repo, transactiongetter, captureoutput=True):
290 def __init__(self, repo, transactiongetter, captureoutput=True):
291 self.repo = repo
291 self.repo = repo
292 self.ui = repo.ui
292 self.ui = repo.ui
293 self.records = unbundlerecords()
293 self.records = unbundlerecords()
294 self.gettransaction = transactiongetter
294 self.gettransaction = transactiongetter
295 self.reply = None
295 self.reply = None
296 self.captureoutput = captureoutput
296 self.captureoutput = captureoutput
297
297
298 class TransactionUnavailable(RuntimeError):
298 class TransactionUnavailable(RuntimeError):
299 pass
299 pass
300
300
301 def _notransaction():
301 def _notransaction():
302 """default method to get a transaction while processing a bundle
302 """default method to get a transaction while processing a bundle
303
303
304 Raise an exception to highlight the fact that no transaction was expected
304 Raise an exception to highlight the fact that no transaction was expected
305 to be created"""
305 to be created"""
306 raise TransactionUnavailable()
306 raise TransactionUnavailable()
307
307
308 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
308 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
309 # transform me into unbundler.apply() as soon as the freeze is lifted
309 # transform me into unbundler.apply() as soon as the freeze is lifted
310 tr.hookargs['bundle2'] = '1'
310 tr.hookargs['bundle2'] = '1'
311 if source is not None and 'source' not in tr.hookargs:
311 if source is not None and 'source' not in tr.hookargs:
312 tr.hookargs['source'] = source
312 tr.hookargs['source'] = source
313 if url is not None and 'url' not in tr.hookargs:
313 if url is not None and 'url' not in tr.hookargs:
314 tr.hookargs['url'] = url
314 tr.hookargs['url'] = url
315 return processbundle(repo, unbundler, lambda: tr, op=op)
315 return processbundle(repo, unbundler, lambda: tr, op=op)
316
316
317 def processbundle(repo, unbundler, transactiongetter=None, op=None):
317 def processbundle(repo, unbundler, transactiongetter=None, op=None):
318 """This function process a bundle, apply effect to/from a repo
318 """This function process a bundle, apply effect to/from a repo
319
319
320 It iterates over each part then searches for and uses the proper handling
320 It iterates over each part then searches for and uses the proper handling
321 code to process the part. Parts are processed in order.
321 code to process the part. Parts are processed in order.
322
322
323 Unknown Mandatory part will abort the process.
323 Unknown Mandatory part will abort the process.
324
324
325 It is temporarily possible to provide a prebuilt bundleoperation to the
325 It is temporarily possible to provide a prebuilt bundleoperation to the
326 function. This is used to ensure output is properly propagated in case of
326 function. This is used to ensure output is properly propagated in case of
327 an error during the unbundling. This output capturing part will likely be
327 an error during the unbundling. This output capturing part will likely be
328 reworked and this ability will probably go away in the process.
328 reworked and this ability will probably go away in the process.
329 """
329 """
330 if op is None:
330 if op is None:
331 if transactiongetter is None:
331 if transactiongetter is None:
332 transactiongetter = _notransaction
332 transactiongetter = _notransaction
333 op = bundleoperation(repo, transactiongetter)
333 op = bundleoperation(repo, transactiongetter)
334 # todo:
334 # todo:
335 # - replace this is a init function soon.
335 # - replace this is a init function soon.
336 # - exception catching
336 # - exception catching
337 unbundler.params
337 unbundler.params
338 if repo.ui.debugflag:
338 if repo.ui.debugflag:
339 msg = ['bundle2-input-bundle:']
339 msg = ['bundle2-input-bundle:']
340 if unbundler.params:
340 if unbundler.params:
341 msg.append(' %i params')
341 msg.append(' %i params')
342 if op.gettransaction is None:
342 if op.gettransaction is None:
343 msg.append(' no-transaction')
343 msg.append(' no-transaction')
344 else:
344 else:
345 msg.append(' with-transaction')
345 msg.append(' with-transaction')
346 msg.append('\n')
346 msg.append('\n')
347 repo.ui.debug(''.join(msg))
347 repo.ui.debug(''.join(msg))
348 iterparts = enumerate(unbundler.iterparts())
348 iterparts = enumerate(unbundler.iterparts())
349 part = None
349 part = None
350 nbpart = 0
350 nbpart = 0
351 try:
351 try:
352 for nbpart, part in iterparts:
352 for nbpart, part in iterparts:
353 _processpart(op, part)
353 _processpart(op, part)
354 except Exception as exc:
354 except Exception as exc:
355 for nbpart, part in iterparts:
355 for nbpart, part in iterparts:
356 # consume the bundle content
356 # consume the bundle content
357 part.seek(0, 2)
357 part.seek(0, 2)
358 # Small hack to let caller code distinguish exceptions from bundle2
358 # Small hack to let caller code distinguish exceptions from bundle2
359 # processing from processing the old format. This is mostly
359 # processing from processing the old format. This is mostly
360 # needed to handle different return codes to unbundle according to the
360 # needed to handle different return codes to unbundle according to the
361 # type of bundle. We should probably clean up or drop this return code
361 # type of bundle. We should probably clean up or drop this return code
362 # craziness in a future version.
362 # craziness in a future version.
363 exc.duringunbundle2 = True
363 exc.duringunbundle2 = True
364 salvaged = []
364 salvaged = []
365 replycaps = None
365 replycaps = None
366 if op.reply is not None:
366 if op.reply is not None:
367 salvaged = op.reply.salvageoutput()
367 salvaged = op.reply.salvageoutput()
368 replycaps = op.reply.capabilities
368 replycaps = op.reply.capabilities
369 exc._replycaps = replycaps
369 exc._replycaps = replycaps
370 exc._bundle2salvagedoutput = salvaged
370 exc._bundle2salvagedoutput = salvaged
371 raise
371 raise
372 finally:
372 finally:
373 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
373 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
374
374
375 return op
375 return op
376
376
377 def _processpart(op, part):
377 def _processpart(op, part):
378 """process a single part from a bundle
378 """process a single part from a bundle
379
379
380 The part is guaranteed to have been fully consumed when the function exits
380 The part is guaranteed to have been fully consumed when the function exits
381 (even if an exception is raised)."""
381 (even if an exception is raised)."""
382 status = 'unknown' # used by debug output
382 status = 'unknown' # used by debug output
383 hardabort = False
383 hardabort = False
384 try:
384 try:
385 try:
385 try:
386 handler = parthandlermapping.get(part.type)
386 handler = parthandlermapping.get(part.type)
387 if handler is None:
387 if handler is None:
388 status = 'unsupported-type'
388 status = 'unsupported-type'
389 raise error.BundleUnknownFeatureError(parttype=part.type)
389 raise error.BundleUnknownFeatureError(parttype=part.type)
390 indebug(op.ui, 'found a handler for part %r' % part.type)
390 indebug(op.ui, 'found a handler for part %r' % part.type)
391 unknownparams = part.mandatorykeys - handler.params
391 unknownparams = part.mandatorykeys - handler.params
392 if unknownparams:
392 if unknownparams:
393 unknownparams = list(unknownparams)
393 unknownparams = list(unknownparams)
394 unknownparams.sort()
394 unknownparams.sort()
395 status = 'unsupported-params (%s)' % unknownparams
395 status = 'unsupported-params (%s)' % unknownparams
396 raise error.BundleUnknownFeatureError(parttype=part.type,
396 raise error.BundleUnknownFeatureError(parttype=part.type,
397 params=unknownparams)
397 params=unknownparams)
398 status = 'supported'
398 status = 'supported'
399 except error.BundleUnknownFeatureError as exc:
399 except error.BundleUnknownFeatureError as exc:
400 if part.mandatory: # mandatory parts
400 if part.mandatory: # mandatory parts
401 raise
401 raise
402 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
402 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
403 return # skip to part processing
403 return # skip to part processing
404 finally:
404 finally:
405 if op.ui.debugflag:
405 if op.ui.debugflag:
406 msg = ['bundle2-input-part: "%s"' % part.type]
406 msg = ['bundle2-input-part: "%s"' % part.type]
407 if not part.mandatory:
407 if not part.mandatory:
408 msg.append(' (advisory)')
408 msg.append(' (advisory)')
409 nbmp = len(part.mandatorykeys)
409 nbmp = len(part.mandatorykeys)
410 nbap = len(part.params) - nbmp
410 nbap = len(part.params) - nbmp
411 if nbmp or nbap:
411 if nbmp or nbap:
412 msg.append(' (params:')
412 msg.append(' (params:')
413 if nbmp:
413 if nbmp:
414 msg.append(' %i mandatory' % nbmp)
414 msg.append(' %i mandatory' % nbmp)
415 if nbap:
415 if nbap:
416 msg.append(' %i advisory' % nbmp)
416 msg.append(' %i advisory' % nbmp)
417 msg.append(')')
417 msg.append(')')
418 msg.append(' %s\n' % status)
418 msg.append(' %s\n' % status)
419 op.ui.debug(''.join(msg))
419 op.ui.debug(''.join(msg))
420
420
421 # handler is called outside the above try block so that we don't
421 # handler is called outside the above try block so that we don't
422 # risk catching KeyErrors from anything other than the
422 # risk catching KeyErrors from anything other than the
423 # parthandlermapping lookup (any KeyError raised by handler()
423 # parthandlermapping lookup (any KeyError raised by handler()
424 # itself represents a defect of a different variety).
424 # itself represents a defect of a different variety).
425 output = None
425 output = None
426 if op.captureoutput and op.reply is not None:
426 if op.captureoutput and op.reply is not None:
427 op.ui.pushbuffer(error=True, subproc=True)
427 op.ui.pushbuffer(error=True, subproc=True)
428 output = ''
428 output = ''
429 try:
429 try:
430 handler(op, part)
430 handler(op, part)
431 finally:
431 finally:
432 if output is not None:
432 if output is not None:
433 output = op.ui.popbuffer()
433 output = op.ui.popbuffer()
434 if output:
434 if output:
435 outpart = op.reply.newpart('output', data=output,
435 outpart = op.reply.newpart('output', data=output,
436 mandatory=False)
436 mandatory=False)
437 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
437 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
438 # If exiting or interrupted, do not attempt to seek the stream in the
438 # If exiting or interrupted, do not attempt to seek the stream in the
439 # finally block below. This makes abort faster.
439 # finally block below. This makes abort faster.
440 except (SystemExit, KeyboardInterrupt):
440 except (SystemExit, KeyboardInterrupt):
441 hardabort = True
441 hardabort = True
442 raise
442 raise
443 finally:
443 finally:
444 # consume the part content to not corrupt the stream.
444 # consume the part content to not corrupt the stream.
445 if not hardabort:
445 if not hardabort:
446 part.seek(0, 2)
446 part.seek(0, 2)
447
447
448
448
449 def decodecaps(blob):
449 def decodecaps(blob):
450 """decode a bundle2 caps bytes blob into a dictionary
450 """decode a bundle2 caps bytes blob into a dictionary
451
451
452 The blob is a list of capabilities (one per line)
452 The blob is a list of capabilities (one per line)
453 Capabilities may have values using a line of the form::
453 Capabilities may have values using a line of the form::
454
454
455 capability=value1,value2,value3
455 capability=value1,value2,value3
456
456
457 The values are always a list."""
457 The values are always a list."""
458 caps = {}
458 caps = {}
459 for line in blob.splitlines():
459 for line in blob.splitlines():
460 if not line:
460 if not line:
461 continue
461 continue
462 if '=' not in line:
462 if '=' not in line:
463 key, vals = line, ()
463 key, vals = line, ()
464 else:
464 else:
465 key, vals = line.split('=', 1)
465 key, vals = line.split('=', 1)
466 vals = vals.split(',')
466 vals = vals.split(',')
467 key = urlreq.unquote(key)
467 key = urlreq.unquote(key)
468 vals = [urlreq.unquote(v) for v in vals]
468 vals = [urlreq.unquote(v) for v in vals]
469 caps[key] = vals
469 caps[key] = vals
470 return caps
470 return caps
471
471
472 def encodecaps(caps):
472 def encodecaps(caps):
473 """encode a bundle2 caps dictionary into a bytes blob"""
473 """encode a bundle2 caps dictionary into a bytes blob"""
474 chunks = []
474 chunks = []
475 for ca in sorted(caps):
475 for ca in sorted(caps):
476 vals = caps[ca]
476 vals = caps[ca]
477 ca = urlreq.quote(ca)
477 ca = urlreq.quote(ca)
478 vals = [urlreq.quote(v) for v in vals]
478 vals = [urlreq.quote(v) for v in vals]
479 if vals:
479 if vals:
480 ca = "%s=%s" % (ca, ','.join(vals))
480 ca = "%s=%s" % (ca, ','.join(vals))
481 chunks.append(ca)
481 chunks.append(ca)
482 return '\n'.join(chunks)
482 return '\n'.join(chunks)
483
483
484 bundletypes = {
484 bundletypes = {
485 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
485 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
486 # since the unification ssh accepts a header but there
486 # since the unification ssh accepts a header but there
487 # is no capability signaling it.
487 # is no capability signaling it.
488 "HG20": (), # special-cased below
488 "HG20": (), # special-cased below
489 "HG10UN": ("HG10UN", 'UN'),
489 "HG10UN": ("HG10UN", 'UN'),
490 "HG10BZ": ("HG10", 'BZ'),
490 "HG10BZ": ("HG10", 'BZ'),
491 "HG10GZ": ("HG10GZ", 'GZ'),
491 "HG10GZ": ("HG10GZ", 'GZ'),
492 }
492 }
493
493
494 # hgweb uses this list to communicate its preferred type
494 # hgweb uses this list to communicate its preferred type
495 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
495 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
496
496
497 class bundle20(object):
497 class bundle20(object):
498 """represent an outgoing bundle2 container
498 """represent an outgoing bundle2 container
499
499
500 Use the `addparam` method to add stream level parameter. and `newpart` to
500 Use the `addparam` method to add stream level parameter. and `newpart` to
501 populate it. Then call `getchunks` to retrieve all the binary chunks of
501 populate it. Then call `getchunks` to retrieve all the binary chunks of
502 data that compose the bundle2 container."""
502 data that compose the bundle2 container."""
503
503
504 _magicstring = 'HG20'
504 _magicstring = 'HG20'
505
505
506 def __init__(self, ui, capabilities=()):
506 def __init__(self, ui, capabilities=()):
507 self.ui = ui
507 self.ui = ui
508 self._params = []
508 self._params = []
509 self._parts = []
509 self._parts = []
510 self.capabilities = dict(capabilities)
510 self.capabilities = dict(capabilities)
511 self._compengine = util.compengines.forbundletype('UN')
511 self._compengine = util.compengines.forbundletype('UN')
512 self._compopts = None
512 self._compopts = None
513
513
514 def setcompression(self, alg, compopts=None):
514 def setcompression(self, alg, compopts=None):
515 """setup core part compression to <alg>"""
515 """setup core part compression to <alg>"""
516 if alg in (None, 'UN'):
516 if alg in (None, 'UN'):
517 return
517 return
518 assert not any(n.lower() == 'Compression' for n, v in self._params)
518 assert not any(n.lower() == 'Compression' for n, v in self._params)
519 self.addparam('Compression', alg)
519 self.addparam('Compression', alg)
520 self._compengine = util.compengines.forbundletype(alg)
520 self._compengine = util.compengines.forbundletype(alg)
521 self._compopts = compopts
521 self._compopts = compopts
522
522
523 @property
523 @property
524 def nbparts(self):
524 def nbparts(self):
525 """total number of parts added to the bundler"""
525 """total number of parts added to the bundler"""
526 return len(self._parts)
526 return len(self._parts)
527
527
528 # methods used to defines the bundle2 content
528 # methods used to defines the bundle2 content
529 def addparam(self, name, value=None):
529 def addparam(self, name, value=None):
530 """add a stream level parameter"""
530 """add a stream level parameter"""
531 if not name:
531 if not name:
532 raise ValueError('empty parameter name')
532 raise ValueError('empty parameter name')
533 if name[0] not in string.letters:
533 if name[0] not in string.letters:
534 raise ValueError('non letter first character: %r' % name)
534 raise ValueError('non letter first character: %r' % name)
535 self._params.append((name, value))
535 self._params.append((name, value))
536
536
537 def addpart(self, part):
537 def addpart(self, part):
538 """add a new part to the bundle2 container
538 """add a new part to the bundle2 container
539
539
540 Parts contains the actual applicative payload."""
540 Parts contains the actual applicative payload."""
541 assert part.id is None
541 assert part.id is None
542 part.id = len(self._parts) # very cheap counter
542 part.id = len(self._parts) # very cheap counter
543 self._parts.append(part)
543 self._parts.append(part)
544
544
545 def newpart(self, typeid, *args, **kwargs):
545 def newpart(self, typeid, *args, **kwargs):
546 """create a new part and add it to the containers
546 """create a new part and add it to the containers
547
547
548 As the part is directly added to the containers. For now, this means
548 As the part is directly added to the containers. For now, this means
549 that any failure to properly initialize the part after calling
549 that any failure to properly initialize the part after calling
550 ``newpart`` should result in a failure of the whole bundling process.
550 ``newpart`` should result in a failure of the whole bundling process.
551
551
552 You can still fall back to manually create and add if you need better
552 You can still fall back to manually create and add if you need better
553 control."""
553 control."""
554 part = bundlepart(typeid, *args, **kwargs)
554 part = bundlepart(typeid, *args, **kwargs)
555 self.addpart(part)
555 self.addpart(part)
556 return part
556 return part
557
557
558 # methods used to generate the bundle2 stream
558 # methods used to generate the bundle2 stream
559 def getchunks(self):
559 def getchunks(self):
560 if self.ui.debugflag:
560 if self.ui.debugflag:
561 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
561 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
562 if self._params:
562 if self._params:
563 msg.append(' (%i params)' % len(self._params))
563 msg.append(' (%i params)' % len(self._params))
564 msg.append(' %i parts total\n' % len(self._parts))
564 msg.append(' %i parts total\n' % len(self._parts))
565 self.ui.debug(''.join(msg))
565 self.ui.debug(''.join(msg))
566 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
566 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
567 yield self._magicstring
567 yield self._magicstring
568 param = self._paramchunk()
568 param = self._paramchunk()
569 outdebug(self.ui, 'bundle parameter: %s' % param)
569 outdebug(self.ui, 'bundle parameter: %s' % param)
570 yield _pack(_fstreamparamsize, len(param))
570 yield _pack(_fstreamparamsize, len(param))
571 if param:
571 if param:
572 yield param
572 yield param
573 for chunk in self._compengine.compressstream(self._getcorechunk(),
573 for chunk in self._compengine.compressstream(self._getcorechunk(),
574 self._compopts):
574 self._compopts):
575 yield chunk
575 yield chunk
576
576
577 def _paramchunk(self):
577 def _paramchunk(self):
578 """return a encoded version of all stream parameters"""
578 """return a encoded version of all stream parameters"""
579 blocks = []
579 blocks = []
580 for par, value in self._params:
580 for par, value in self._params:
581 par = urlreq.quote(par)
581 par = urlreq.quote(par)
582 if value is not None:
582 if value is not None:
583 value = urlreq.quote(value)
583 value = urlreq.quote(value)
584 par = '%s=%s' % (par, value)
584 par = '%s=%s' % (par, value)
585 blocks.append(par)
585 blocks.append(par)
586 return ' '.join(blocks)
586 return ' '.join(blocks)
587
587
588 def _getcorechunk(self):
588 def _getcorechunk(self):
589 """yield chunk for the core part of the bundle
589 """yield chunk for the core part of the bundle
590
590
591 (all but headers and parameters)"""
591 (all but headers and parameters)"""
592 outdebug(self.ui, 'start of parts')
592 outdebug(self.ui, 'start of parts')
593 for part in self._parts:
593 for part in self._parts:
594 outdebug(self.ui, 'bundle part: "%s"' % part.type)
594 outdebug(self.ui, 'bundle part: "%s"' % part.type)
595 for chunk in part.getchunks(ui=self.ui):
595 for chunk in part.getchunks(ui=self.ui):
596 yield chunk
596 yield chunk
597 outdebug(self.ui, 'end of bundle')
597 outdebug(self.ui, 'end of bundle')
598 yield _pack(_fpartheadersize, 0)
598 yield _pack(_fpartheadersize, 0)
599
599
600
600
601 def salvageoutput(self):
601 def salvageoutput(self):
602 """return a list with a copy of all output parts in the bundle
602 """return a list with a copy of all output parts in the bundle
603
603
604 This is meant to be used during error handling to make sure we preserve
604 This is meant to be used during error handling to make sure we preserve
605 server output"""
605 server output"""
606 salvaged = []
606 salvaged = []
607 for part in self._parts:
607 for part in self._parts:
608 if part.type.startswith('output'):
608 if part.type.startswith('output'):
609 salvaged.append(part.copy())
609 salvaged.append(part.copy())
610 return salvaged
610 return salvaged
611
611
612
612
613 class unpackermixin(object):
613 class unpackermixin(object):
614 """A mixin to extract bytes and struct data from a stream"""
614 """A mixin to extract bytes and struct data from a stream"""
615
615
616 def __init__(self, fp):
616 def __init__(self, fp):
617 self._fp = fp
617 self._fp = fp
618 self._seekable = (util.safehasattr(fp, 'seek') and
618 self._seekable = (util.safehasattr(fp, 'seek') and
619 util.safehasattr(fp, 'tell'))
619 util.safehasattr(fp, 'tell'))
620
620
621 def _unpack(self, format):
621 def _unpack(self, format):
622 """unpack this struct format from the stream"""
622 """unpack this struct format from the stream"""
623 data = self._readexact(struct.calcsize(format))
623 data = self._readexact(struct.calcsize(format))
624 return _unpack(format, data)
624 return _unpack(format, data)
625
625
626 def _readexact(self, size):
626 def _readexact(self, size):
627 """read exactly <size> bytes from the stream"""
627 """read exactly <size> bytes from the stream"""
628 return changegroup.readexactly(self._fp, size)
628 return changegroup.readexactly(self._fp, size)
629
629
630 def seek(self, offset, whence=0):
630 def seek(self, offset, whence=0):
631 """move the underlying file pointer"""
631 """move the underlying file pointer"""
632 if self._seekable:
632 if self._seekable:
633 return self._fp.seek(offset, whence)
633 return self._fp.seek(offset, whence)
634 else:
634 else:
635 raise NotImplementedError(_('File pointer is not seekable'))
635 raise NotImplementedError(_('File pointer is not seekable'))
636
636
637 def tell(self):
637 def tell(self):
638 """return the file offset, or None if file is not seekable"""
638 """return the file offset, or None if file is not seekable"""
639 if self._seekable:
639 if self._seekable:
640 try:
640 try:
641 return self._fp.tell()
641 return self._fp.tell()
642 except IOError as e:
642 except IOError as e:
643 if e.errno == errno.ESPIPE:
643 if e.errno == errno.ESPIPE:
644 self._seekable = False
644 self._seekable = False
645 else:
645 else:
646 raise
646 raise
647 return None
647 return None
648
648
649 def close(self):
649 def close(self):
650 """close underlying file"""
650 """close underlying file"""
651 if util.safehasattr(self._fp, 'close'):
651 if util.safehasattr(self._fp, 'close'):
652 return self._fp.close()
652 return self._fp.close()
653
653
654 def getunbundler(ui, fp, magicstring=None):
654 def getunbundler(ui, fp, magicstring=None):
655 """return a valid unbundler object for a given magicstring"""
655 """return a valid unbundler object for a given magicstring"""
656 if magicstring is None:
656 if magicstring is None:
657 magicstring = changegroup.readexactly(fp, 4)
657 magicstring = changegroup.readexactly(fp, 4)
658 magic, version = magicstring[0:2], magicstring[2:4]
658 magic, version = magicstring[0:2], magicstring[2:4]
659 if magic != 'HG':
659 if magic != 'HG':
660 raise error.Abort(_('not a Mercurial bundle'))
660 raise error.Abort(_('not a Mercurial bundle'))
661 unbundlerclass = formatmap.get(version)
661 unbundlerclass = formatmap.get(version)
662 if unbundlerclass is None:
662 if unbundlerclass is None:
663 raise error.Abort(_('unknown bundle version %s') % version)
663 raise error.Abort(_('unknown bundle version %s') % version)
664 unbundler = unbundlerclass(ui, fp)
664 unbundler = unbundlerclass(ui, fp)
665 indebug(ui, 'start processing of %s stream' % magicstring)
665 indebug(ui, 'start processing of %s stream' % magicstring)
666 return unbundler
666 return unbundler
667
667
668 class unbundle20(unpackermixin):
668 class unbundle20(unpackermixin):
669 """interpret a bundle2 stream
669 """interpret a bundle2 stream
670
670
671 This class is fed with a binary stream and yields parts through its
671 This class is fed with a binary stream and yields parts through its
672 `iterparts` methods."""
672 `iterparts` methods."""
673
673
674 _magicstring = 'HG20'
674 _magicstring = 'HG20'
675
675
676 def __init__(self, ui, fp):
676 def __init__(self, ui, fp):
677 """If header is specified, we do not read it out of the stream."""
677 """If header is specified, we do not read it out of the stream."""
678 self.ui = ui
678 self.ui = ui
679 self._compengine = util.compengines.forbundletype('UN')
679 self._compengine = util.compengines.forbundletype('UN')
680 self._compressed = None
680 self._compressed = None
681 super(unbundle20, self).__init__(fp)
681 super(unbundle20, self).__init__(fp)
682
682
683 @util.propertycache
683 @util.propertycache
684 def params(self):
684 def params(self):
685 """dictionary of stream level parameters"""
685 """dictionary of stream level parameters"""
686 indebug(self.ui, 'reading bundle2 stream parameters')
686 indebug(self.ui, 'reading bundle2 stream parameters')
687 params = {}
687 params = {}
688 paramssize = self._unpack(_fstreamparamsize)[0]
688 paramssize = self._unpack(_fstreamparamsize)[0]
689 if paramssize < 0:
689 if paramssize < 0:
690 raise error.BundleValueError('negative bundle param size: %i'
690 raise error.BundleValueError('negative bundle param size: %i'
691 % paramssize)
691 % paramssize)
692 if paramssize:
692 if paramssize:
693 params = self._readexact(paramssize)
693 params = self._readexact(paramssize)
694 params = self._processallparams(params)
694 params = self._processallparams(params)
695 return params
695 return params
696
696
697 def _processallparams(self, paramsblock):
697 def _processallparams(self, paramsblock):
698 """"""
698 """"""
699 params = util.sortdict()
699 params = util.sortdict()
700 for p in paramsblock.split(' '):
700 for p in paramsblock.split(' '):
701 p = p.split('=', 1)
701 p = p.split('=', 1)
702 p = [urlreq.unquote(i) for i in p]
702 p = [urlreq.unquote(i) for i in p]
703 if len(p) < 2:
703 if len(p) < 2:
704 p.append(None)
704 p.append(None)
705 self._processparam(*p)
705 self._processparam(*p)
706 params[p[0]] = p[1]
706 params[p[0]] = p[1]
707 return params
707 return params
708
708
709
709
710 def _processparam(self, name, value):
710 def _processparam(self, name, value):
711 """process a parameter, applying its effect if needed
711 """process a parameter, applying its effect if needed
712
712
713 Parameter starting with a lower case letter are advisory and will be
713 Parameter starting with a lower case letter are advisory and will be
714 ignored when unknown. Those starting with an upper case letter are
714 ignored when unknown. Those starting with an upper case letter are
715 mandatory and will this function will raise a KeyError when unknown.
715 mandatory and will this function will raise a KeyError when unknown.
716
716
717 Note: no option are currently supported. Any input will be either
717 Note: no option are currently supported. Any input will be either
718 ignored or failing.
718 ignored or failing.
719 """
719 """
720 if not name:
720 if not name:
721 raise ValueError('empty parameter name')
721 raise ValueError('empty parameter name')
722 if name[0] not in string.letters:
722 if name[0] not in string.letters:
723 raise ValueError('non letter first character: %r' % name)
723 raise ValueError('non letter first character: %r' % name)
724 try:
724 try:
725 handler = b2streamparamsmap[name.lower()]
725 handler = b2streamparamsmap[name.lower()]
726 except KeyError:
726 except KeyError:
727 if name[0].islower():
727 if name[0].islower():
728 indebug(self.ui, "ignoring unknown parameter %r" % name)
728 indebug(self.ui, "ignoring unknown parameter %r" % name)
729 else:
729 else:
730 raise error.BundleUnknownFeatureError(params=(name,))
730 raise error.BundleUnknownFeatureError(params=(name,))
731 else:
731 else:
732 handler(self, name, value)
732 handler(self, name, value)
733
733
734 def _forwardchunks(self):
734 def _forwardchunks(self):
735 """utility to transfer a bundle2 as binary
735 """utility to transfer a bundle2 as binary
736
736
737 This is made necessary by the fact the 'getbundle' command over 'ssh'
737 This is made necessary by the fact the 'getbundle' command over 'ssh'
738 have no way to know then the reply end, relying on the bundle to be
738 have no way to know then the reply end, relying on the bundle to be
739 interpreted to know its end. This is terrible and we are sorry, but we
739 interpreted to know its end. This is terrible and we are sorry, but we
740 needed to move forward to get general delta enabled.
740 needed to move forward to get general delta enabled.
741 """
741 """
742 yield self._magicstring
742 yield self._magicstring
743 assert 'params' not in vars(self)
743 assert 'params' not in vars(self)
744 paramssize = self._unpack(_fstreamparamsize)[0]
744 paramssize = self._unpack(_fstreamparamsize)[0]
745 if paramssize < 0:
745 if paramssize < 0:
746 raise error.BundleValueError('negative bundle param size: %i'
746 raise error.BundleValueError('negative bundle param size: %i'
747 % paramssize)
747 % paramssize)
748 yield _pack(_fstreamparamsize, paramssize)
748 yield _pack(_fstreamparamsize, paramssize)
749 if paramssize:
749 if paramssize:
750 params = self._readexact(paramssize)
750 params = self._readexact(paramssize)
751 self._processallparams(params)
751 self._processallparams(params)
752 yield params
752 yield params
753 assert self._compengine.bundletype == 'UN'
753 assert self._compengine.bundletype == 'UN'
754 # From there, payload might need to be decompressed
754 # From there, payload might need to be decompressed
755 self._fp = self._compengine.decompressorreader(self._fp)
755 self._fp = self._compengine.decompressorreader(self._fp)
756 emptycount = 0
756 emptycount = 0
757 while emptycount < 2:
757 while emptycount < 2:
758 # so we can brainlessly loop
758 # so we can brainlessly loop
759 assert _fpartheadersize == _fpayloadsize
759 assert _fpartheadersize == _fpayloadsize
760 size = self._unpack(_fpartheadersize)[0]
760 size = self._unpack(_fpartheadersize)[0]
761 yield _pack(_fpartheadersize, size)
761 yield _pack(_fpartheadersize, size)
762 if size:
762 if size:
763 emptycount = 0
763 emptycount = 0
764 else:
764 else:
765 emptycount += 1
765 emptycount += 1
766 continue
766 continue
767 if size == flaginterrupt:
767 if size == flaginterrupt:
768 continue
768 continue
769 elif size < 0:
769 elif size < 0:
770 raise error.BundleValueError('negative chunk size: %i')
770 raise error.BundleValueError('negative chunk size: %i')
771 yield self._readexact(size)
771 yield self._readexact(size)
772
772
773
773
774 def iterparts(self):
774 def iterparts(self):
775 """yield all parts contained in the stream"""
775 """yield all parts contained in the stream"""
776 # make sure param have been loaded
776 # make sure param have been loaded
777 self.params
777 self.params
778 # From there, payload need to be decompressed
778 # From there, payload need to be decompressed
779 self._fp = self._compengine.decompressorreader(self._fp)
779 self._fp = self._compengine.decompressorreader(self._fp)
780 indebug(self.ui, 'start extraction of bundle2 parts')
780 indebug(self.ui, 'start extraction of bundle2 parts')
781 headerblock = self._readpartheader()
781 headerblock = self._readpartheader()
782 while headerblock is not None:
782 while headerblock is not None:
783 part = unbundlepart(self.ui, headerblock, self._fp)
783 part = unbundlepart(self.ui, headerblock, self._fp)
784 yield part
784 yield part
785 part.seek(0, 2)
785 part.seek(0, 2)
786 headerblock = self._readpartheader()
786 headerblock = self._readpartheader()
787 indebug(self.ui, 'end of bundle2 stream')
787 indebug(self.ui, 'end of bundle2 stream')
788
788
789 def _readpartheader(self):
789 def _readpartheader(self):
790 """reads a part header size and return the bytes blob
790 """reads a part header size and return the bytes blob
791
791
792 returns None if empty"""
792 returns None if empty"""
793 headersize = self._unpack(_fpartheadersize)[0]
793 headersize = self._unpack(_fpartheadersize)[0]
794 if headersize < 0:
794 if headersize < 0:
795 raise error.BundleValueError('negative part header size: %i'
795 raise error.BundleValueError('negative part header size: %i'
796 % headersize)
796 % headersize)
797 indebug(self.ui, 'part header size: %i' % headersize)
797 indebug(self.ui, 'part header size: %i' % headersize)
798 if headersize:
798 if headersize:
799 return self._readexact(headersize)
799 return self._readexact(headersize)
800 return None
800 return None
801
801
802 def compressed(self):
802 def compressed(self):
803 self.params # load params
803 self.params # load params
804 return self._compressed
804 return self._compressed
805
805
806 formatmap = {'20': unbundle20}
806 formatmap = {'20': unbundle20}
807
807
808 b2streamparamsmap = {}
808 b2streamparamsmap = {}
809
809
810 def b2streamparamhandler(name):
810 def b2streamparamhandler(name):
811 """register a handler for a stream level parameter"""
811 """register a handler for a stream level parameter"""
812 def decorator(func):
812 def decorator(func):
813 assert name not in formatmap
813 assert name not in formatmap
814 b2streamparamsmap[name] = func
814 b2streamparamsmap[name] = func
815 return func
815 return func
816 return decorator
816 return decorator
817
817
818 @b2streamparamhandler('compression')
818 @b2streamparamhandler('compression')
819 def processcompression(unbundler, param, value):
819 def processcompression(unbundler, param, value):
820 """read compression parameter and install payload decompression"""
820 """read compression parameter and install payload decompression"""
821 if value not in util.compengines.supportedbundletypes:
821 if value not in util.compengines.supportedbundletypes:
822 raise error.BundleUnknownFeatureError(params=(param,),
822 raise error.BundleUnknownFeatureError(params=(param,),
823 values=(value,))
823 values=(value,))
824 unbundler._compengine = util.compengines.forbundletype(value)
824 unbundler._compengine = util.compengines.forbundletype(value)
825 if value is not None:
825 if value is not None:
826 unbundler._compressed = True
826 unbundler._compressed = True
827
827
828 class bundlepart(object):
828 class bundlepart(object):
829 """A bundle2 part contains application level payload
829 """A bundle2 part contains application level payload
830
830
831 The part `type` is used to route the part to the application level
831 The part `type` is used to route the part to the application level
832 handler.
832 handler.
833
833
834 The part payload is contained in ``part.data``. It could be raw bytes or a
834 The part payload is contained in ``part.data``. It could be raw bytes or a
835 generator of byte chunks.
835 generator of byte chunks.
836
836
837 You can add parameters to the part using the ``addparam`` method.
837 You can add parameters to the part using the ``addparam`` method.
838 Parameters can be either mandatory (default) or advisory. Remote side
838 Parameters can be either mandatory (default) or advisory. Remote side
839 should be able to safely ignore the advisory ones.
839 should be able to safely ignore the advisory ones.
840
840
841 Both data and parameters cannot be modified after the generation has begun.
841 Both data and parameters cannot be modified after the generation has begun.
842 """
842 """
843
843
844 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
844 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
845 data='', mandatory=True):
845 data='', mandatory=True):
846 validateparttype(parttype)
846 validateparttype(parttype)
847 self.id = None
847 self.id = None
848 self.type = parttype
848 self.type = parttype
849 self._data = data
849 self._data = data
850 self._mandatoryparams = list(mandatoryparams)
850 self._mandatoryparams = list(mandatoryparams)
851 self._advisoryparams = list(advisoryparams)
851 self._advisoryparams = list(advisoryparams)
852 # checking for duplicated entries
852 # checking for duplicated entries
853 self._seenparams = set()
853 self._seenparams = set()
854 for pname, __ in self._mandatoryparams + self._advisoryparams:
854 for pname, __ in self._mandatoryparams + self._advisoryparams:
855 if pname in self._seenparams:
855 if pname in self._seenparams:
856 raise RuntimeError('duplicated params: %s' % pname)
856 raise RuntimeError('duplicated params: %s' % pname)
857 self._seenparams.add(pname)
857 self._seenparams.add(pname)
858 # status of the part's generation:
858 # status of the part's generation:
859 # - None: not started,
859 # - None: not started,
860 # - False: currently generated,
860 # - False: currently generated,
861 # - True: generation done.
861 # - True: generation done.
862 self._generated = None
862 self._generated = None
863 self.mandatory = mandatory
863 self.mandatory = mandatory
864
864
865 def __repr__(self):
866 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
867 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
868 % (cls, id(self), self.id, self.type, self.mandatory))
869
865 def copy(self):
870 def copy(self):
866 """return a copy of the part
871 """return a copy of the part
867
872
868 The new part have the very same content but no partid assigned yet.
873 The new part have the very same content but no partid assigned yet.
869 Parts with generated data cannot be copied."""
874 Parts with generated data cannot be copied."""
870 assert not util.safehasattr(self.data, 'next')
875 assert not util.safehasattr(self.data, 'next')
871 return self.__class__(self.type, self._mandatoryparams,
876 return self.__class__(self.type, self._mandatoryparams,
872 self._advisoryparams, self._data, self.mandatory)
877 self._advisoryparams, self._data, self.mandatory)
873
878
874 # methods used to defines the part content
879 # methods used to defines the part content
875 @property
880 @property
876 def data(self):
881 def data(self):
877 return self._data
882 return self._data
878
883
879 @data.setter
884 @data.setter
880 def data(self, data):
885 def data(self, data):
881 if self._generated is not None:
886 if self._generated is not None:
882 raise error.ReadOnlyPartError('part is being generated')
887 raise error.ReadOnlyPartError('part is being generated')
883 self._data = data
888 self._data = data
884
889
885 @property
890 @property
886 def mandatoryparams(self):
891 def mandatoryparams(self):
887 # make it an immutable tuple to force people through ``addparam``
892 # make it an immutable tuple to force people through ``addparam``
888 return tuple(self._mandatoryparams)
893 return tuple(self._mandatoryparams)
889
894
890 @property
895 @property
891 def advisoryparams(self):
896 def advisoryparams(self):
892 # make it an immutable tuple to force people through ``addparam``
897 # make it an immutable tuple to force people through ``addparam``
893 return tuple(self._advisoryparams)
898 return tuple(self._advisoryparams)
894
899
895 def addparam(self, name, value='', mandatory=True):
900 def addparam(self, name, value='', mandatory=True):
896 if self._generated is not None:
901 if self._generated is not None:
897 raise error.ReadOnlyPartError('part is being generated')
902 raise error.ReadOnlyPartError('part is being generated')
898 if name in self._seenparams:
903 if name in self._seenparams:
899 raise ValueError('duplicated params: %s' % name)
904 raise ValueError('duplicated params: %s' % name)
900 self._seenparams.add(name)
905 self._seenparams.add(name)
901 params = self._advisoryparams
906 params = self._advisoryparams
902 if mandatory:
907 if mandatory:
903 params = self._mandatoryparams
908 params = self._mandatoryparams
904 params.append((name, value))
909 params.append((name, value))
905
910
906 # methods used to generates the bundle2 stream
911 # methods used to generates the bundle2 stream
907 def getchunks(self, ui):
912 def getchunks(self, ui):
908 if self._generated is not None:
913 if self._generated is not None:
909 raise RuntimeError('part can only be consumed once')
914 raise RuntimeError('part can only be consumed once')
910 self._generated = False
915 self._generated = False
911
916
912 if ui.debugflag:
917 if ui.debugflag:
913 msg = ['bundle2-output-part: "%s"' % self.type]
918 msg = ['bundle2-output-part: "%s"' % self.type]
914 if not self.mandatory:
919 if not self.mandatory:
915 msg.append(' (advisory)')
920 msg.append(' (advisory)')
916 nbmp = len(self.mandatoryparams)
921 nbmp = len(self.mandatoryparams)
917 nbap = len(self.advisoryparams)
922 nbap = len(self.advisoryparams)
918 if nbmp or nbap:
923 if nbmp or nbap:
919 msg.append(' (params:')
924 msg.append(' (params:')
920 if nbmp:
925 if nbmp:
921 msg.append(' %i mandatory' % nbmp)
926 msg.append(' %i mandatory' % nbmp)
922 if nbap:
927 if nbap:
923 msg.append(' %i advisory' % nbmp)
928 msg.append(' %i advisory' % nbmp)
924 msg.append(')')
929 msg.append(')')
925 if not self.data:
930 if not self.data:
926 msg.append(' empty payload')
931 msg.append(' empty payload')
927 elif util.safehasattr(self.data, 'next'):
932 elif util.safehasattr(self.data, 'next'):
928 msg.append(' streamed payload')
933 msg.append(' streamed payload')
929 else:
934 else:
930 msg.append(' %i bytes payload' % len(self.data))
935 msg.append(' %i bytes payload' % len(self.data))
931 msg.append('\n')
936 msg.append('\n')
932 ui.debug(''.join(msg))
937 ui.debug(''.join(msg))
933
938
934 #### header
939 #### header
935 if self.mandatory:
940 if self.mandatory:
936 parttype = self.type.upper()
941 parttype = self.type.upper()
937 else:
942 else:
938 parttype = self.type.lower()
943 parttype = self.type.lower()
939 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
944 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
940 ## parttype
945 ## parttype
941 header = [_pack(_fparttypesize, len(parttype)),
946 header = [_pack(_fparttypesize, len(parttype)),
942 parttype, _pack(_fpartid, self.id),
947 parttype, _pack(_fpartid, self.id),
943 ]
948 ]
944 ## parameters
949 ## parameters
945 # count
950 # count
946 manpar = self.mandatoryparams
951 manpar = self.mandatoryparams
947 advpar = self.advisoryparams
952 advpar = self.advisoryparams
948 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
953 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
949 # size
954 # size
950 parsizes = []
955 parsizes = []
951 for key, value in manpar:
956 for key, value in manpar:
952 parsizes.append(len(key))
957 parsizes.append(len(key))
953 parsizes.append(len(value))
958 parsizes.append(len(value))
954 for key, value in advpar:
959 for key, value in advpar:
955 parsizes.append(len(key))
960 parsizes.append(len(key))
956 parsizes.append(len(value))
961 parsizes.append(len(value))
957 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
962 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
958 header.append(paramsizes)
963 header.append(paramsizes)
959 # key, value
964 # key, value
960 for key, value in manpar:
965 for key, value in manpar:
961 header.append(key)
966 header.append(key)
962 header.append(value)
967 header.append(value)
963 for key, value in advpar:
968 for key, value in advpar:
964 header.append(key)
969 header.append(key)
965 header.append(value)
970 header.append(value)
966 ## finalize header
971 ## finalize header
967 headerchunk = ''.join(header)
972 headerchunk = ''.join(header)
968 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
973 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
969 yield _pack(_fpartheadersize, len(headerchunk))
974 yield _pack(_fpartheadersize, len(headerchunk))
970 yield headerchunk
975 yield headerchunk
971 ## payload
976 ## payload
972 try:
977 try:
973 for chunk in self._payloadchunks():
978 for chunk in self._payloadchunks():
974 outdebug(ui, 'payload chunk size: %i' % len(chunk))
979 outdebug(ui, 'payload chunk size: %i' % len(chunk))
975 yield _pack(_fpayloadsize, len(chunk))
980 yield _pack(_fpayloadsize, len(chunk))
976 yield chunk
981 yield chunk
977 except GeneratorExit:
982 except GeneratorExit:
978 # GeneratorExit means that nobody is listening for our
983 # GeneratorExit means that nobody is listening for our
979 # results anyway, so just bail quickly rather than trying
984 # results anyway, so just bail quickly rather than trying
980 # to produce an error part.
985 # to produce an error part.
981 ui.debug('bundle2-generatorexit\n')
986 ui.debug('bundle2-generatorexit\n')
982 raise
987 raise
983 except BaseException as exc:
988 except BaseException as exc:
984 # backup exception data for later
989 # backup exception data for later
985 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
990 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
986 % exc)
991 % exc)
987 exc_info = sys.exc_info()
992 exc_info = sys.exc_info()
988 msg = 'unexpected error: %s' % exc
993 msg = 'unexpected error: %s' % exc
989 interpart = bundlepart('error:abort', [('message', msg)],
994 interpart = bundlepart('error:abort', [('message', msg)],
990 mandatory=False)
995 mandatory=False)
991 interpart.id = 0
996 interpart.id = 0
992 yield _pack(_fpayloadsize, -1)
997 yield _pack(_fpayloadsize, -1)
993 for chunk in interpart.getchunks(ui=ui):
998 for chunk in interpart.getchunks(ui=ui):
994 yield chunk
999 yield chunk
995 outdebug(ui, 'closing payload chunk')
1000 outdebug(ui, 'closing payload chunk')
996 # abort current part payload
1001 # abort current part payload
997 yield _pack(_fpayloadsize, 0)
1002 yield _pack(_fpayloadsize, 0)
998 if pycompat.ispy3:
1003 if pycompat.ispy3:
999 raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
1004 raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
1000 else:
1005 else:
1001 exec("""raise exc_info[0], exc_info[1], exc_info[2]""")
1006 exec("""raise exc_info[0], exc_info[1], exc_info[2]""")
1002 # end of payload
1007 # end of payload
1003 outdebug(ui, 'closing payload chunk')
1008 outdebug(ui, 'closing payload chunk')
1004 yield _pack(_fpayloadsize, 0)
1009 yield _pack(_fpayloadsize, 0)
1005 self._generated = True
1010 self._generated = True
1006
1011
1007 def _payloadchunks(self):
1012 def _payloadchunks(self):
1008 """yield chunks of a the part payload
1013 """yield chunks of a the part payload
1009
1014
1010 Exists to handle the different methods to provide data to a part."""
1015 Exists to handle the different methods to provide data to a part."""
1011 # we only support fixed size data now.
1016 # we only support fixed size data now.
1012 # This will be improved in the future.
1017 # This will be improved in the future.
1013 if util.safehasattr(self.data, 'next'):
1018 if util.safehasattr(self.data, 'next'):
1014 buff = util.chunkbuffer(self.data)
1019 buff = util.chunkbuffer(self.data)
1015 chunk = buff.read(preferedchunksize)
1020 chunk = buff.read(preferedchunksize)
1016 while chunk:
1021 while chunk:
1017 yield chunk
1022 yield chunk
1018 chunk = buff.read(preferedchunksize)
1023 chunk = buff.read(preferedchunksize)
1019 elif len(self.data):
1024 elif len(self.data):
1020 yield self.data
1025 yield self.data
1021
1026
1022
1027
1023 flaginterrupt = -1
1028 flaginterrupt = -1
1024
1029
1025 class interrupthandler(unpackermixin):
1030 class interrupthandler(unpackermixin):
1026 """read one part and process it with restricted capability
1031 """read one part and process it with restricted capability
1027
1032
1028 This allows to transmit exception raised on the producer size during part
1033 This allows to transmit exception raised on the producer size during part
1029 iteration while the consumer is reading a part.
1034 iteration while the consumer is reading a part.
1030
1035
1031 Part processed in this manner only have access to a ui object,"""
1036 Part processed in this manner only have access to a ui object,"""
1032
1037
1033 def __init__(self, ui, fp):
1038 def __init__(self, ui, fp):
1034 super(interrupthandler, self).__init__(fp)
1039 super(interrupthandler, self).__init__(fp)
1035 self.ui = ui
1040 self.ui = ui
1036
1041
1037 def _readpartheader(self):
1042 def _readpartheader(self):
1038 """reads a part header size and return the bytes blob
1043 """reads a part header size and return the bytes blob
1039
1044
1040 returns None if empty"""
1045 returns None if empty"""
1041 headersize = self._unpack(_fpartheadersize)[0]
1046 headersize = self._unpack(_fpartheadersize)[0]
1042 if headersize < 0:
1047 if headersize < 0:
1043 raise error.BundleValueError('negative part header size: %i'
1048 raise error.BundleValueError('negative part header size: %i'
1044 % headersize)
1049 % headersize)
1045 indebug(self.ui, 'part header size: %i\n' % headersize)
1050 indebug(self.ui, 'part header size: %i\n' % headersize)
1046 if headersize:
1051 if headersize:
1047 return self._readexact(headersize)
1052 return self._readexact(headersize)
1048 return None
1053 return None
1049
1054
1050 def __call__(self):
1055 def __call__(self):
1051
1056
1052 self.ui.debug('bundle2-input-stream-interrupt:'
1057 self.ui.debug('bundle2-input-stream-interrupt:'
1053 ' opening out of band context\n')
1058 ' opening out of band context\n')
1054 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1059 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1055 headerblock = self._readpartheader()
1060 headerblock = self._readpartheader()
1056 if headerblock is None:
1061 if headerblock is None:
1057 indebug(self.ui, 'no part found during interruption.')
1062 indebug(self.ui, 'no part found during interruption.')
1058 return
1063 return
1059 part = unbundlepart(self.ui, headerblock, self._fp)
1064 part = unbundlepart(self.ui, headerblock, self._fp)
1060 op = interruptoperation(self.ui)
1065 op = interruptoperation(self.ui)
1061 _processpart(op, part)
1066 _processpart(op, part)
1062 self.ui.debug('bundle2-input-stream-interrupt:'
1067 self.ui.debug('bundle2-input-stream-interrupt:'
1063 ' closing out of band context\n')
1068 ' closing out of band context\n')
1064
1069
1065 class interruptoperation(object):
1070 class interruptoperation(object):
1066 """A limited operation to be use by part handler during interruption
1071 """A limited operation to be use by part handler during interruption
1067
1072
1068 It only have access to an ui object.
1073 It only have access to an ui object.
1069 """
1074 """
1070
1075
1071 def __init__(self, ui):
1076 def __init__(self, ui):
1072 self.ui = ui
1077 self.ui = ui
1073 self.reply = None
1078 self.reply = None
1074 self.captureoutput = False
1079 self.captureoutput = False
1075
1080
1076 @property
1081 @property
1077 def repo(self):
1082 def repo(self):
1078 raise RuntimeError('no repo access from stream interruption')
1083 raise RuntimeError('no repo access from stream interruption')
1079
1084
1080 def gettransaction(self):
1085 def gettransaction(self):
1081 raise TransactionUnavailable('no repo access from stream interruption')
1086 raise TransactionUnavailable('no repo access from stream interruption')
1082
1087
1083 class unbundlepart(unpackermixin):
1088 class unbundlepart(unpackermixin):
1084 """a bundle part read from a bundle"""
1089 """a bundle part read from a bundle"""
1085
1090
1086 def __init__(self, ui, header, fp):
1091 def __init__(self, ui, header, fp):
1087 super(unbundlepart, self).__init__(fp)
1092 super(unbundlepart, self).__init__(fp)
1088 self.ui = ui
1093 self.ui = ui
1089 # unbundle state attr
1094 # unbundle state attr
1090 self._headerdata = header
1095 self._headerdata = header
1091 self._headeroffset = 0
1096 self._headeroffset = 0
1092 self._initialized = False
1097 self._initialized = False
1093 self.consumed = False
1098 self.consumed = False
1094 # part data
1099 # part data
1095 self.id = None
1100 self.id = None
1096 self.type = None
1101 self.type = None
1097 self.mandatoryparams = None
1102 self.mandatoryparams = None
1098 self.advisoryparams = None
1103 self.advisoryparams = None
1099 self.params = None
1104 self.params = None
1100 self.mandatorykeys = ()
1105 self.mandatorykeys = ()
1101 self._payloadstream = None
1106 self._payloadstream = None
1102 self._readheader()
1107 self._readheader()
1103 self._mandatory = None
1108 self._mandatory = None
1104 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1109 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1105 self._pos = 0
1110 self._pos = 0
1106
1111
1107 def _fromheader(self, size):
1112 def _fromheader(self, size):
1108 """return the next <size> byte from the header"""
1113 """return the next <size> byte from the header"""
1109 offset = self._headeroffset
1114 offset = self._headeroffset
1110 data = self._headerdata[offset:(offset + size)]
1115 data = self._headerdata[offset:(offset + size)]
1111 self._headeroffset = offset + size
1116 self._headeroffset = offset + size
1112 return data
1117 return data
1113
1118
1114 def _unpackheader(self, format):
1119 def _unpackheader(self, format):
1115 """read given format from header
1120 """read given format from header
1116
1121
1117 This automatically compute the size of the format to read."""
1122 This automatically compute the size of the format to read."""
1118 data = self._fromheader(struct.calcsize(format))
1123 data = self._fromheader(struct.calcsize(format))
1119 return _unpack(format, data)
1124 return _unpack(format, data)
1120
1125
1121 def _initparams(self, mandatoryparams, advisoryparams):
1126 def _initparams(self, mandatoryparams, advisoryparams):
1122 """internal function to setup all logic related parameters"""
1127 """internal function to setup all logic related parameters"""
1123 # make it read only to prevent people touching it by mistake.
1128 # make it read only to prevent people touching it by mistake.
1124 self.mandatoryparams = tuple(mandatoryparams)
1129 self.mandatoryparams = tuple(mandatoryparams)
1125 self.advisoryparams = tuple(advisoryparams)
1130 self.advisoryparams = tuple(advisoryparams)
1126 # user friendly UI
1131 # user friendly UI
1127 self.params = util.sortdict(self.mandatoryparams)
1132 self.params = util.sortdict(self.mandatoryparams)
1128 self.params.update(self.advisoryparams)
1133 self.params.update(self.advisoryparams)
1129 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1134 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1130
1135
1131 def _payloadchunks(self, chunknum=0):
1136 def _payloadchunks(self, chunknum=0):
1132 '''seek to specified chunk and start yielding data'''
1137 '''seek to specified chunk and start yielding data'''
1133 if len(self._chunkindex) == 0:
1138 if len(self._chunkindex) == 0:
1134 assert chunknum == 0, 'Must start with chunk 0'
1139 assert chunknum == 0, 'Must start with chunk 0'
1135 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1140 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1136 else:
1141 else:
1137 assert chunknum < len(self._chunkindex), \
1142 assert chunknum < len(self._chunkindex), \
1138 'Unknown chunk %d' % chunknum
1143 'Unknown chunk %d' % chunknum
1139 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1144 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1140
1145
1141 pos = self._chunkindex[chunknum][0]
1146 pos = self._chunkindex[chunknum][0]
1142 payloadsize = self._unpack(_fpayloadsize)[0]
1147 payloadsize = self._unpack(_fpayloadsize)[0]
1143 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1148 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1144 while payloadsize:
1149 while payloadsize:
1145 if payloadsize == flaginterrupt:
1150 if payloadsize == flaginterrupt:
1146 # interruption detection, the handler will now read a
1151 # interruption detection, the handler will now read a
1147 # single part and process it.
1152 # single part and process it.
1148 interrupthandler(self.ui, self._fp)()
1153 interrupthandler(self.ui, self._fp)()
1149 elif payloadsize < 0:
1154 elif payloadsize < 0:
1150 msg = 'negative payload chunk size: %i' % payloadsize
1155 msg = 'negative payload chunk size: %i' % payloadsize
1151 raise error.BundleValueError(msg)
1156 raise error.BundleValueError(msg)
1152 else:
1157 else:
1153 result = self._readexact(payloadsize)
1158 result = self._readexact(payloadsize)
1154 chunknum += 1
1159 chunknum += 1
1155 pos += payloadsize
1160 pos += payloadsize
1156 if chunknum == len(self._chunkindex):
1161 if chunknum == len(self._chunkindex):
1157 self._chunkindex.append((pos,
1162 self._chunkindex.append((pos,
1158 super(unbundlepart, self).tell()))
1163 super(unbundlepart, self).tell()))
1159 yield result
1164 yield result
1160 payloadsize = self._unpack(_fpayloadsize)[0]
1165 payloadsize = self._unpack(_fpayloadsize)[0]
1161 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1166 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1162
1167
1163 def _findchunk(self, pos):
1168 def _findchunk(self, pos):
1164 '''for a given payload position, return a chunk number and offset'''
1169 '''for a given payload position, return a chunk number and offset'''
1165 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1170 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1166 if ppos == pos:
1171 if ppos == pos:
1167 return chunk, 0
1172 return chunk, 0
1168 elif ppos > pos:
1173 elif ppos > pos:
1169 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1174 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1170 raise ValueError('Unknown chunk')
1175 raise ValueError('Unknown chunk')
1171
1176
1172 def _readheader(self):
1177 def _readheader(self):
1173 """read the header and setup the object"""
1178 """read the header and setup the object"""
1174 typesize = self._unpackheader(_fparttypesize)[0]
1179 typesize = self._unpackheader(_fparttypesize)[0]
1175 self.type = self._fromheader(typesize)
1180 self.type = self._fromheader(typesize)
1176 indebug(self.ui, 'part type: "%s"' % self.type)
1181 indebug(self.ui, 'part type: "%s"' % self.type)
1177 self.id = self._unpackheader(_fpartid)[0]
1182 self.id = self._unpackheader(_fpartid)[0]
1178 indebug(self.ui, 'part id: "%s"' % self.id)
1183 indebug(self.ui, 'part id: "%s"' % self.id)
1179 # extract mandatory bit from type
1184 # extract mandatory bit from type
1180 self.mandatory = (self.type != self.type.lower())
1185 self.mandatory = (self.type != self.type.lower())
1181 self.type = self.type.lower()
1186 self.type = self.type.lower()
1182 ## reading parameters
1187 ## reading parameters
1183 # param count
1188 # param count
1184 mancount, advcount = self._unpackheader(_fpartparamcount)
1189 mancount, advcount = self._unpackheader(_fpartparamcount)
1185 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1190 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1186 # param size
1191 # param size
1187 fparamsizes = _makefpartparamsizes(mancount + advcount)
1192 fparamsizes = _makefpartparamsizes(mancount + advcount)
1188 paramsizes = self._unpackheader(fparamsizes)
1193 paramsizes = self._unpackheader(fparamsizes)
1189 # make it a list of couple again
1194 # make it a list of couple again
1190 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1195 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1191 # split mandatory from advisory
1196 # split mandatory from advisory
1192 mansizes = paramsizes[:mancount]
1197 mansizes = paramsizes[:mancount]
1193 advsizes = paramsizes[mancount:]
1198 advsizes = paramsizes[mancount:]
1194 # retrieve param value
1199 # retrieve param value
1195 manparams = []
1200 manparams = []
1196 for key, value in mansizes:
1201 for key, value in mansizes:
1197 manparams.append((self._fromheader(key), self._fromheader(value)))
1202 manparams.append((self._fromheader(key), self._fromheader(value)))
1198 advparams = []
1203 advparams = []
1199 for key, value in advsizes:
1204 for key, value in advsizes:
1200 advparams.append((self._fromheader(key), self._fromheader(value)))
1205 advparams.append((self._fromheader(key), self._fromheader(value)))
1201 self._initparams(manparams, advparams)
1206 self._initparams(manparams, advparams)
1202 ## part payload
1207 ## part payload
1203 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1208 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1204 # we read the data, tell it
1209 # we read the data, tell it
1205 self._initialized = True
1210 self._initialized = True
1206
1211
1207 def read(self, size=None):
1212 def read(self, size=None):
1208 """read payload data"""
1213 """read payload data"""
1209 if not self._initialized:
1214 if not self._initialized:
1210 self._readheader()
1215 self._readheader()
1211 if size is None:
1216 if size is None:
1212 data = self._payloadstream.read()
1217 data = self._payloadstream.read()
1213 else:
1218 else:
1214 data = self._payloadstream.read(size)
1219 data = self._payloadstream.read(size)
1215 self._pos += len(data)
1220 self._pos += len(data)
1216 if size is None or len(data) < size:
1221 if size is None or len(data) < size:
1217 if not self.consumed and self._pos:
1222 if not self.consumed and self._pos:
1218 self.ui.debug('bundle2-input-part: total payload size %i\n'
1223 self.ui.debug('bundle2-input-part: total payload size %i\n'
1219 % self._pos)
1224 % self._pos)
1220 self.consumed = True
1225 self.consumed = True
1221 return data
1226 return data
1222
1227
1223 def tell(self):
1228 def tell(self):
1224 return self._pos
1229 return self._pos
1225
1230
1226 def seek(self, offset, whence=0):
1231 def seek(self, offset, whence=0):
1227 if whence == 0:
1232 if whence == 0:
1228 newpos = offset
1233 newpos = offset
1229 elif whence == 1:
1234 elif whence == 1:
1230 newpos = self._pos + offset
1235 newpos = self._pos + offset
1231 elif whence == 2:
1236 elif whence == 2:
1232 if not self.consumed:
1237 if not self.consumed:
1233 self.read()
1238 self.read()
1234 newpos = self._chunkindex[-1][0] - offset
1239 newpos = self._chunkindex[-1][0] - offset
1235 else:
1240 else:
1236 raise ValueError('Unknown whence value: %r' % (whence,))
1241 raise ValueError('Unknown whence value: %r' % (whence,))
1237
1242
1238 if newpos > self._chunkindex[-1][0] and not self.consumed:
1243 if newpos > self._chunkindex[-1][0] and not self.consumed:
1239 self.read()
1244 self.read()
1240 if not 0 <= newpos <= self._chunkindex[-1][0]:
1245 if not 0 <= newpos <= self._chunkindex[-1][0]:
1241 raise ValueError('Offset out of range')
1246 raise ValueError('Offset out of range')
1242
1247
1243 if self._pos != newpos:
1248 if self._pos != newpos:
1244 chunk, internaloffset = self._findchunk(newpos)
1249 chunk, internaloffset = self._findchunk(newpos)
1245 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1250 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1246 adjust = self.read(internaloffset)
1251 adjust = self.read(internaloffset)
1247 if len(adjust) != internaloffset:
1252 if len(adjust) != internaloffset:
1248 raise error.Abort(_('Seek failed\n'))
1253 raise error.Abort(_('Seek failed\n'))
1249 self._pos = newpos
1254 self._pos = newpos
1250
1255
1251 # These are only the static capabilities.
1256 # These are only the static capabilities.
1252 # Check the 'getrepocaps' function for the rest.
1257 # Check the 'getrepocaps' function for the rest.
1253 capabilities = {'HG20': (),
1258 capabilities = {'HG20': (),
1254 'error': ('abort', 'unsupportedcontent', 'pushraced',
1259 'error': ('abort', 'unsupportedcontent', 'pushraced',
1255 'pushkey'),
1260 'pushkey'),
1256 'listkeys': (),
1261 'listkeys': (),
1257 'pushkey': (),
1262 'pushkey': (),
1258 'digests': tuple(sorted(util.DIGESTS.keys())),
1263 'digests': tuple(sorted(util.DIGESTS.keys())),
1259 'remote-changegroup': ('http', 'https'),
1264 'remote-changegroup': ('http', 'https'),
1260 'hgtagsfnodes': (),
1265 'hgtagsfnodes': (),
1261 }
1266 }
1262
1267
1263 def getrepocaps(repo, allowpushback=False):
1268 def getrepocaps(repo, allowpushback=False):
1264 """return the bundle2 capabilities for a given repo
1269 """return the bundle2 capabilities for a given repo
1265
1270
1266 Exists to allow extensions (like evolution) to mutate the capabilities.
1271 Exists to allow extensions (like evolution) to mutate the capabilities.
1267 """
1272 """
1268 caps = capabilities.copy()
1273 caps = capabilities.copy()
1269 caps['changegroup'] = tuple(sorted(
1274 caps['changegroup'] = tuple(sorted(
1270 changegroup.supportedincomingversions(repo)))
1275 changegroup.supportedincomingversions(repo)))
1271 if obsolete.isenabled(repo, obsolete.exchangeopt):
1276 if obsolete.isenabled(repo, obsolete.exchangeopt):
1272 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1277 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1273 caps['obsmarkers'] = supportedformat
1278 caps['obsmarkers'] = supportedformat
1274 if allowpushback:
1279 if allowpushback:
1275 caps['pushback'] = ()
1280 caps['pushback'] = ()
1276 return caps
1281 return caps
1277
1282
1278 def bundle2caps(remote):
1283 def bundle2caps(remote):
1279 """return the bundle capabilities of a peer as dict"""
1284 """return the bundle capabilities of a peer as dict"""
1280 raw = remote.capable('bundle2')
1285 raw = remote.capable('bundle2')
1281 if not raw and raw != '':
1286 if not raw and raw != '':
1282 return {}
1287 return {}
1283 capsblob = urlreq.unquote(remote.capable('bundle2'))
1288 capsblob = urlreq.unquote(remote.capable('bundle2'))
1284 return decodecaps(capsblob)
1289 return decodecaps(capsblob)
1285
1290
1286 def obsmarkersversion(caps):
1291 def obsmarkersversion(caps):
1287 """extract the list of supported obsmarkers versions from a bundle2caps dict
1292 """extract the list of supported obsmarkers versions from a bundle2caps dict
1288 """
1293 """
1289 obscaps = caps.get('obsmarkers', ())
1294 obscaps = caps.get('obsmarkers', ())
1290 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1295 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1291
1296
1292 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1297 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1293 compopts=None):
1298 compopts=None):
1294 """Write a bundle file and return its filename.
1299 """Write a bundle file and return its filename.
1295
1300
1296 Existing files will not be overwritten.
1301 Existing files will not be overwritten.
1297 If no filename is specified, a temporary file is created.
1302 If no filename is specified, a temporary file is created.
1298 bz2 compression can be turned off.
1303 bz2 compression can be turned off.
1299 The bundle file will be deleted in case of errors.
1304 The bundle file will be deleted in case of errors.
1300 """
1305 """
1301
1306
1302 if bundletype == "HG20":
1307 if bundletype == "HG20":
1303 bundle = bundle20(ui)
1308 bundle = bundle20(ui)
1304 bundle.setcompression(compression, compopts)
1309 bundle.setcompression(compression, compopts)
1305 part = bundle.newpart('changegroup', data=cg.getchunks())
1310 part = bundle.newpart('changegroup', data=cg.getchunks())
1306 part.addparam('version', cg.version)
1311 part.addparam('version', cg.version)
1307 if 'clcount' in cg.extras:
1312 if 'clcount' in cg.extras:
1308 part.addparam('nbchanges', str(cg.extras['clcount']),
1313 part.addparam('nbchanges', str(cg.extras['clcount']),
1309 mandatory=False)
1314 mandatory=False)
1310 chunkiter = bundle.getchunks()
1315 chunkiter = bundle.getchunks()
1311 else:
1316 else:
1312 # compression argument is only for the bundle2 case
1317 # compression argument is only for the bundle2 case
1313 assert compression is None
1318 assert compression is None
1314 if cg.version != '01':
1319 if cg.version != '01':
1315 raise error.Abort(_('old bundle types only supports v1 '
1320 raise error.Abort(_('old bundle types only supports v1 '
1316 'changegroups'))
1321 'changegroups'))
1317 header, comp = bundletypes[bundletype]
1322 header, comp = bundletypes[bundletype]
1318 if comp not in util.compengines.supportedbundletypes:
1323 if comp not in util.compengines.supportedbundletypes:
1319 raise error.Abort(_('unknown stream compression type: %s')
1324 raise error.Abort(_('unknown stream compression type: %s')
1320 % comp)
1325 % comp)
1321 compengine = util.compengines.forbundletype(comp)
1326 compengine = util.compengines.forbundletype(comp)
1322 def chunkiter():
1327 def chunkiter():
1323 yield header
1328 yield header
1324 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1329 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1325 yield chunk
1330 yield chunk
1326 chunkiter = chunkiter()
1331 chunkiter = chunkiter()
1327
1332
1328 # parse the changegroup data, otherwise we will block
1333 # parse the changegroup data, otherwise we will block
1329 # in case of sshrepo because we don't know the end of the stream
1334 # in case of sshrepo because we don't know the end of the stream
1330 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1335 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1331
1336
1332 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1337 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1333 def handlechangegroup(op, inpart):
1338 def handlechangegroup(op, inpart):
1334 """apply a changegroup part on the repo
1339 """apply a changegroup part on the repo
1335
1340
1336 This is a very early implementation that will massive rework before being
1341 This is a very early implementation that will massive rework before being
1337 inflicted to any end-user.
1342 inflicted to any end-user.
1338 """
1343 """
1339 # Make sure we trigger a transaction creation
1344 # Make sure we trigger a transaction creation
1340 #
1345 #
1341 # The addchangegroup function will get a transaction object by itself, but
1346 # The addchangegroup function will get a transaction object by itself, but
1342 # we need to make sure we trigger the creation of a transaction object used
1347 # we need to make sure we trigger the creation of a transaction object used
1343 # for the whole processing scope.
1348 # for the whole processing scope.
1344 op.gettransaction()
1349 op.gettransaction()
1345 unpackerversion = inpart.params.get('version', '01')
1350 unpackerversion = inpart.params.get('version', '01')
1346 # We should raise an appropriate exception here
1351 # We should raise an appropriate exception here
1347 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1352 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1348 # the source and url passed here are overwritten by the one contained in
1353 # the source and url passed here are overwritten by the one contained in
1349 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1354 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1350 nbchangesets = None
1355 nbchangesets = None
1351 if 'nbchanges' in inpart.params:
1356 if 'nbchanges' in inpart.params:
1352 nbchangesets = int(inpart.params.get('nbchanges'))
1357 nbchangesets = int(inpart.params.get('nbchanges'))
1353 if ('treemanifest' in inpart.params and
1358 if ('treemanifest' in inpart.params and
1354 'treemanifest' not in op.repo.requirements):
1359 'treemanifest' not in op.repo.requirements):
1355 if len(op.repo.changelog) != 0:
1360 if len(op.repo.changelog) != 0:
1356 raise error.Abort(_(
1361 raise error.Abort(_(
1357 "bundle contains tree manifests, but local repo is "
1362 "bundle contains tree manifests, but local repo is "
1358 "non-empty and does not use tree manifests"))
1363 "non-empty and does not use tree manifests"))
1359 op.repo.requirements.add('treemanifest')
1364 op.repo.requirements.add('treemanifest')
1360 op.repo._applyopenerreqs()
1365 op.repo._applyopenerreqs()
1361 op.repo._writerequirements()
1366 op.repo._writerequirements()
1362 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1367 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1363 op.records.add('changegroup', {'return': ret})
1368 op.records.add('changegroup', {'return': ret})
1364 if op.reply is not None:
1369 if op.reply is not None:
1365 # This is definitely not the final form of this
1370 # This is definitely not the final form of this
1366 # return. But one need to start somewhere.
1371 # return. But one need to start somewhere.
1367 part = op.reply.newpart('reply:changegroup', mandatory=False)
1372 part = op.reply.newpart('reply:changegroup', mandatory=False)
1368 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1373 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1369 part.addparam('return', '%i' % ret, mandatory=False)
1374 part.addparam('return', '%i' % ret, mandatory=False)
1370 assert not inpart.read()
1375 assert not inpart.read()
1371
1376
1372 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1377 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1373 ['digest:%s' % k for k in util.DIGESTS.keys()])
1378 ['digest:%s' % k for k in util.DIGESTS.keys()])
1374 @parthandler('remote-changegroup', _remotechangegroupparams)
1379 @parthandler('remote-changegroup', _remotechangegroupparams)
1375 def handleremotechangegroup(op, inpart):
1380 def handleremotechangegroup(op, inpart):
1376 """apply a bundle10 on the repo, given an url and validation information
1381 """apply a bundle10 on the repo, given an url and validation information
1377
1382
1378 All the information about the remote bundle to import are given as
1383 All the information about the remote bundle to import are given as
1379 parameters. The parameters include:
1384 parameters. The parameters include:
1380 - url: the url to the bundle10.
1385 - url: the url to the bundle10.
1381 - size: the bundle10 file size. It is used to validate what was
1386 - size: the bundle10 file size. It is used to validate what was
1382 retrieved by the client matches the server knowledge about the bundle.
1387 retrieved by the client matches the server knowledge about the bundle.
1383 - digests: a space separated list of the digest types provided as
1388 - digests: a space separated list of the digest types provided as
1384 parameters.
1389 parameters.
1385 - digest:<digest-type>: the hexadecimal representation of the digest with
1390 - digest:<digest-type>: the hexadecimal representation of the digest with
1386 that name. Like the size, it is used to validate what was retrieved by
1391 that name. Like the size, it is used to validate what was retrieved by
1387 the client matches what the server knows about the bundle.
1392 the client matches what the server knows about the bundle.
1388
1393
1389 When multiple digest types are given, all of them are checked.
1394 When multiple digest types are given, all of them are checked.
1390 """
1395 """
1391 try:
1396 try:
1392 raw_url = inpart.params['url']
1397 raw_url = inpart.params['url']
1393 except KeyError:
1398 except KeyError:
1394 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1399 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1395 parsed_url = util.url(raw_url)
1400 parsed_url = util.url(raw_url)
1396 if parsed_url.scheme not in capabilities['remote-changegroup']:
1401 if parsed_url.scheme not in capabilities['remote-changegroup']:
1397 raise error.Abort(_('remote-changegroup does not support %s urls') %
1402 raise error.Abort(_('remote-changegroup does not support %s urls') %
1398 parsed_url.scheme)
1403 parsed_url.scheme)
1399
1404
1400 try:
1405 try:
1401 size = int(inpart.params['size'])
1406 size = int(inpart.params['size'])
1402 except ValueError:
1407 except ValueError:
1403 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1408 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1404 % 'size')
1409 % 'size')
1405 except KeyError:
1410 except KeyError:
1406 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1411 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1407
1412
1408 digests = {}
1413 digests = {}
1409 for typ in inpart.params.get('digests', '').split():
1414 for typ in inpart.params.get('digests', '').split():
1410 param = 'digest:%s' % typ
1415 param = 'digest:%s' % typ
1411 try:
1416 try:
1412 value = inpart.params[param]
1417 value = inpart.params[param]
1413 except KeyError:
1418 except KeyError:
1414 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1419 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1415 param)
1420 param)
1416 digests[typ] = value
1421 digests[typ] = value
1417
1422
1418 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1423 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1419
1424
1420 # Make sure we trigger a transaction creation
1425 # Make sure we trigger a transaction creation
1421 #
1426 #
1422 # The addchangegroup function will get a transaction object by itself, but
1427 # The addchangegroup function will get a transaction object by itself, but
1423 # we need to make sure we trigger the creation of a transaction object used
1428 # we need to make sure we trigger the creation of a transaction object used
1424 # for the whole processing scope.
1429 # for the whole processing scope.
1425 op.gettransaction()
1430 op.gettransaction()
1426 from . import exchange
1431 from . import exchange
1427 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1432 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1428 if not isinstance(cg, changegroup.cg1unpacker):
1433 if not isinstance(cg, changegroup.cg1unpacker):
1429 raise error.Abort(_('%s: not a bundle version 1.0') %
1434 raise error.Abort(_('%s: not a bundle version 1.0') %
1430 util.hidepassword(raw_url))
1435 util.hidepassword(raw_url))
1431 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1436 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1432 op.records.add('changegroup', {'return': ret})
1437 op.records.add('changegroup', {'return': ret})
1433 if op.reply is not None:
1438 if op.reply is not None:
1434 # This is definitely not the final form of this
1439 # This is definitely not the final form of this
1435 # return. But one need to start somewhere.
1440 # return. But one need to start somewhere.
1436 part = op.reply.newpart('reply:changegroup')
1441 part = op.reply.newpart('reply:changegroup')
1437 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1442 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1438 part.addparam('return', '%i' % ret, mandatory=False)
1443 part.addparam('return', '%i' % ret, mandatory=False)
1439 try:
1444 try:
1440 real_part.validate()
1445 real_part.validate()
1441 except error.Abort as e:
1446 except error.Abort as e:
1442 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1447 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1443 (util.hidepassword(raw_url), str(e)))
1448 (util.hidepassword(raw_url), str(e)))
1444 assert not inpart.read()
1449 assert not inpart.read()
1445
1450
1446 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1451 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1447 def handlereplychangegroup(op, inpart):
1452 def handlereplychangegroup(op, inpart):
1448 ret = int(inpart.params['return'])
1453 ret = int(inpart.params['return'])
1449 replyto = int(inpart.params['in-reply-to'])
1454 replyto = int(inpart.params['in-reply-to'])
1450 op.records.add('changegroup', {'return': ret}, replyto)
1455 op.records.add('changegroup', {'return': ret}, replyto)
1451
1456
1452 @parthandler('check:heads')
1457 @parthandler('check:heads')
1453 def handlecheckheads(op, inpart):
1458 def handlecheckheads(op, inpart):
1454 """check that head of the repo did not change
1459 """check that head of the repo did not change
1455
1460
1456 This is used to detect a push race when using unbundle.
1461 This is used to detect a push race when using unbundle.
1457 This replaces the "heads" argument of unbundle."""
1462 This replaces the "heads" argument of unbundle."""
1458 h = inpart.read(20)
1463 h = inpart.read(20)
1459 heads = []
1464 heads = []
1460 while len(h) == 20:
1465 while len(h) == 20:
1461 heads.append(h)
1466 heads.append(h)
1462 h = inpart.read(20)
1467 h = inpart.read(20)
1463 assert not h
1468 assert not h
1464 # Trigger a transaction so that we are guaranteed to have the lock now.
1469 # Trigger a transaction so that we are guaranteed to have the lock now.
1465 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1470 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1466 op.gettransaction()
1471 op.gettransaction()
1467 if sorted(heads) != sorted(op.repo.heads()):
1472 if sorted(heads) != sorted(op.repo.heads()):
1468 raise error.PushRaced('repository changed while pushing - '
1473 raise error.PushRaced('repository changed while pushing - '
1469 'please try again')
1474 'please try again')
1470
1475
1471 @parthandler('output')
1476 @parthandler('output')
1472 def handleoutput(op, inpart):
1477 def handleoutput(op, inpart):
1473 """forward output captured on the server to the client"""
1478 """forward output captured on the server to the client"""
1474 for line in inpart.read().splitlines():
1479 for line in inpart.read().splitlines():
1475 op.ui.status(_('remote: %s\n') % line)
1480 op.ui.status(_('remote: %s\n') % line)
1476
1481
1477 @parthandler('replycaps')
1482 @parthandler('replycaps')
1478 def handlereplycaps(op, inpart):
1483 def handlereplycaps(op, inpart):
1479 """Notify that a reply bundle should be created
1484 """Notify that a reply bundle should be created
1480
1485
1481 The payload contains the capabilities information for the reply"""
1486 The payload contains the capabilities information for the reply"""
1482 caps = decodecaps(inpart.read())
1487 caps = decodecaps(inpart.read())
1483 if op.reply is None:
1488 if op.reply is None:
1484 op.reply = bundle20(op.ui, caps)
1489 op.reply = bundle20(op.ui, caps)
1485
1490
1486 class AbortFromPart(error.Abort):
1491 class AbortFromPart(error.Abort):
1487 """Sub-class of Abort that denotes an error from a bundle2 part."""
1492 """Sub-class of Abort that denotes an error from a bundle2 part."""
1488
1493
1489 @parthandler('error:abort', ('message', 'hint'))
1494 @parthandler('error:abort', ('message', 'hint'))
1490 def handleerrorabort(op, inpart):
1495 def handleerrorabort(op, inpart):
1491 """Used to transmit abort error over the wire"""
1496 """Used to transmit abort error over the wire"""
1492 raise AbortFromPart(inpart.params['message'],
1497 raise AbortFromPart(inpart.params['message'],
1493 hint=inpart.params.get('hint'))
1498 hint=inpart.params.get('hint'))
1494
1499
1495 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1500 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1496 'in-reply-to'))
1501 'in-reply-to'))
1497 def handleerrorpushkey(op, inpart):
1502 def handleerrorpushkey(op, inpart):
1498 """Used to transmit failure of a mandatory pushkey over the wire"""
1503 """Used to transmit failure of a mandatory pushkey over the wire"""
1499 kwargs = {}
1504 kwargs = {}
1500 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1505 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1501 value = inpart.params.get(name)
1506 value = inpart.params.get(name)
1502 if value is not None:
1507 if value is not None:
1503 kwargs[name] = value
1508 kwargs[name] = value
1504 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1509 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1505
1510
1506 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1511 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1507 def handleerrorunsupportedcontent(op, inpart):
1512 def handleerrorunsupportedcontent(op, inpart):
1508 """Used to transmit unknown content error over the wire"""
1513 """Used to transmit unknown content error over the wire"""
1509 kwargs = {}
1514 kwargs = {}
1510 parttype = inpart.params.get('parttype')
1515 parttype = inpart.params.get('parttype')
1511 if parttype is not None:
1516 if parttype is not None:
1512 kwargs['parttype'] = parttype
1517 kwargs['parttype'] = parttype
1513 params = inpart.params.get('params')
1518 params = inpart.params.get('params')
1514 if params is not None:
1519 if params is not None:
1515 kwargs['params'] = params.split('\0')
1520 kwargs['params'] = params.split('\0')
1516
1521
1517 raise error.BundleUnknownFeatureError(**kwargs)
1522 raise error.BundleUnknownFeatureError(**kwargs)
1518
1523
1519 @parthandler('error:pushraced', ('message',))
1524 @parthandler('error:pushraced', ('message',))
1520 def handleerrorpushraced(op, inpart):
1525 def handleerrorpushraced(op, inpart):
1521 """Used to transmit push race error over the wire"""
1526 """Used to transmit push race error over the wire"""
1522 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1527 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1523
1528
1524 @parthandler('listkeys', ('namespace',))
1529 @parthandler('listkeys', ('namespace',))
1525 def handlelistkeys(op, inpart):
1530 def handlelistkeys(op, inpart):
1526 """retrieve pushkey namespace content stored in a bundle2"""
1531 """retrieve pushkey namespace content stored in a bundle2"""
1527 namespace = inpart.params['namespace']
1532 namespace = inpart.params['namespace']
1528 r = pushkey.decodekeys(inpart.read())
1533 r = pushkey.decodekeys(inpart.read())
1529 op.records.add('listkeys', (namespace, r))
1534 op.records.add('listkeys', (namespace, r))
1530
1535
1531 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1536 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1532 def handlepushkey(op, inpart):
1537 def handlepushkey(op, inpart):
1533 """process a pushkey request"""
1538 """process a pushkey request"""
1534 dec = pushkey.decode
1539 dec = pushkey.decode
1535 namespace = dec(inpart.params['namespace'])
1540 namespace = dec(inpart.params['namespace'])
1536 key = dec(inpart.params['key'])
1541 key = dec(inpart.params['key'])
1537 old = dec(inpart.params['old'])
1542 old = dec(inpart.params['old'])
1538 new = dec(inpart.params['new'])
1543 new = dec(inpart.params['new'])
1539 # Grab the transaction to ensure that we have the lock before performing the
1544 # Grab the transaction to ensure that we have the lock before performing the
1540 # pushkey.
1545 # pushkey.
1541 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1546 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1542 op.gettransaction()
1547 op.gettransaction()
1543 ret = op.repo.pushkey(namespace, key, old, new)
1548 ret = op.repo.pushkey(namespace, key, old, new)
1544 record = {'namespace': namespace,
1549 record = {'namespace': namespace,
1545 'key': key,
1550 'key': key,
1546 'old': old,
1551 'old': old,
1547 'new': new}
1552 'new': new}
1548 op.records.add('pushkey', record)
1553 op.records.add('pushkey', record)
1549 if op.reply is not None:
1554 if op.reply is not None:
1550 rpart = op.reply.newpart('reply:pushkey')
1555 rpart = op.reply.newpart('reply:pushkey')
1551 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1556 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1552 rpart.addparam('return', '%i' % ret, mandatory=False)
1557 rpart.addparam('return', '%i' % ret, mandatory=False)
1553 if inpart.mandatory and not ret:
1558 if inpart.mandatory and not ret:
1554 kwargs = {}
1559 kwargs = {}
1555 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1560 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1556 if key in inpart.params:
1561 if key in inpart.params:
1557 kwargs[key] = inpart.params[key]
1562 kwargs[key] = inpart.params[key]
1558 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1563 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1559
1564
1560 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1565 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1561 def handlepushkeyreply(op, inpart):
1566 def handlepushkeyreply(op, inpart):
1562 """retrieve the result of a pushkey request"""
1567 """retrieve the result of a pushkey request"""
1563 ret = int(inpart.params['return'])
1568 ret = int(inpart.params['return'])
1564 partid = int(inpart.params['in-reply-to'])
1569 partid = int(inpart.params['in-reply-to'])
1565 op.records.add('pushkey', {'return': ret}, partid)
1570 op.records.add('pushkey', {'return': ret}, partid)
1566
1571
1567 @parthandler('obsmarkers')
1572 @parthandler('obsmarkers')
1568 def handleobsmarker(op, inpart):
1573 def handleobsmarker(op, inpart):
1569 """add a stream of obsmarkers to the repo"""
1574 """add a stream of obsmarkers to the repo"""
1570 tr = op.gettransaction()
1575 tr = op.gettransaction()
1571 markerdata = inpart.read()
1576 markerdata = inpart.read()
1572 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1577 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1573 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1578 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1574 % len(markerdata))
1579 % len(markerdata))
1575 # The mergemarkers call will crash if marker creation is not enabled.
1580 # The mergemarkers call will crash if marker creation is not enabled.
1576 # we want to avoid this if the part is advisory.
1581 # we want to avoid this if the part is advisory.
1577 if not inpart.mandatory and op.repo.obsstore.readonly:
1582 if not inpart.mandatory and op.repo.obsstore.readonly:
1578 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1583 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1579 return
1584 return
1580 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1585 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1581 if new:
1586 if new:
1582 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1587 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1583 op.records.add('obsmarkers', {'new': new})
1588 op.records.add('obsmarkers', {'new': new})
1584 if op.reply is not None:
1589 if op.reply is not None:
1585 rpart = op.reply.newpart('reply:obsmarkers')
1590 rpart = op.reply.newpart('reply:obsmarkers')
1586 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1591 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1587 rpart.addparam('new', '%i' % new, mandatory=False)
1592 rpart.addparam('new', '%i' % new, mandatory=False)
1588
1593
1589
1594
1590 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1595 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1591 def handleobsmarkerreply(op, inpart):
1596 def handleobsmarkerreply(op, inpart):
1592 """retrieve the result of a pushkey request"""
1597 """retrieve the result of a pushkey request"""
1593 ret = int(inpart.params['new'])
1598 ret = int(inpart.params['new'])
1594 partid = int(inpart.params['in-reply-to'])
1599 partid = int(inpart.params['in-reply-to'])
1595 op.records.add('obsmarkers', {'new': ret}, partid)
1600 op.records.add('obsmarkers', {'new': ret}, partid)
1596
1601
1597 @parthandler('hgtagsfnodes')
1602 @parthandler('hgtagsfnodes')
1598 def handlehgtagsfnodes(op, inpart):
1603 def handlehgtagsfnodes(op, inpart):
1599 """Applies .hgtags fnodes cache entries to the local repo.
1604 """Applies .hgtags fnodes cache entries to the local repo.
1600
1605
1601 Payload is pairs of 20 byte changeset nodes and filenodes.
1606 Payload is pairs of 20 byte changeset nodes and filenodes.
1602 """
1607 """
1603 # Grab the transaction so we ensure that we have the lock at this point.
1608 # Grab the transaction so we ensure that we have the lock at this point.
1604 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1609 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1605 op.gettransaction()
1610 op.gettransaction()
1606 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1611 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1607
1612
1608 count = 0
1613 count = 0
1609 while True:
1614 while True:
1610 node = inpart.read(20)
1615 node = inpart.read(20)
1611 fnode = inpart.read(20)
1616 fnode = inpart.read(20)
1612 if len(node) < 20 or len(fnode) < 20:
1617 if len(node) < 20 or len(fnode) < 20:
1613 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1618 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1614 break
1619 break
1615 cache.setfnode(node, fnode)
1620 cache.setfnode(node, fnode)
1616 count += 1
1621 count += 1
1617
1622
1618 cache.write()
1623 cache.write()
1619 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1624 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now