##// END OF EJS Templates
bundle2: use a sorted dict for holding parameters...
Gregory Szorc -
r29591:6215b553 default
parent child Browse files
Show More
@@ -1,1608 +1,1608 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 pushkey,
161 pushkey,
162 tags,
162 tags,
163 url,
163 url,
164 util,
164 util,
165 )
165 )
166
166
167 urlerr = util.urlerr
167 urlerr = util.urlerr
168 urlreq = util.urlreq
168 urlreq = util.urlreq
169
169
170 _pack = struct.pack
170 _pack = struct.pack
171 _unpack = struct.unpack
171 _unpack = struct.unpack
172
172
173 _fstreamparamsize = '>i'
173 _fstreamparamsize = '>i'
174 _fpartheadersize = '>i'
174 _fpartheadersize = '>i'
175 _fparttypesize = '>B'
175 _fparttypesize = '>B'
176 _fpartid = '>I'
176 _fpartid = '>I'
177 _fpayloadsize = '>i'
177 _fpayloadsize = '>i'
178 _fpartparamcount = '>BB'
178 _fpartparamcount = '>BB'
179
179
180 preferedchunksize = 4096
180 preferedchunksize = 4096
181
181
182 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
182 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
183
183
184 def outdebug(ui, message):
184 def outdebug(ui, message):
185 """debug regarding output stream (bundling)"""
185 """debug regarding output stream (bundling)"""
186 if ui.configbool('devel', 'bundle2.debug', False):
186 if ui.configbool('devel', 'bundle2.debug', False):
187 ui.debug('bundle2-output: %s\n' % message)
187 ui.debug('bundle2-output: %s\n' % message)
188
188
189 def indebug(ui, message):
189 def indebug(ui, message):
190 """debug on input stream (unbundling)"""
190 """debug on input stream (unbundling)"""
191 if ui.configbool('devel', 'bundle2.debug', False):
191 if ui.configbool('devel', 'bundle2.debug', False):
192 ui.debug('bundle2-input: %s\n' % message)
192 ui.debug('bundle2-input: %s\n' % message)
193
193
194 def validateparttype(parttype):
194 def validateparttype(parttype):
195 """raise ValueError if a parttype contains invalid character"""
195 """raise ValueError if a parttype contains invalid character"""
196 if _parttypeforbidden.search(parttype):
196 if _parttypeforbidden.search(parttype):
197 raise ValueError(parttype)
197 raise ValueError(parttype)
198
198
199 def _makefpartparamsizes(nbparams):
199 def _makefpartparamsizes(nbparams):
200 """return a struct format to read part parameter sizes
200 """return a struct format to read part parameter sizes
201
201
202 The number parameters is variable so we need to build that format
202 The number parameters is variable so we need to build that format
203 dynamically.
203 dynamically.
204 """
204 """
205 return '>'+('BB'*nbparams)
205 return '>'+('BB'*nbparams)
206
206
207 parthandlermapping = {}
207 parthandlermapping = {}
208
208
209 def parthandler(parttype, params=()):
209 def parthandler(parttype, params=()):
210 """decorator that register a function as a bundle2 part handler
210 """decorator that register a function as a bundle2 part handler
211
211
212 eg::
212 eg::
213
213
214 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
214 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
215 def myparttypehandler(...):
215 def myparttypehandler(...):
216 '''process a part of type "my part".'''
216 '''process a part of type "my part".'''
217 ...
217 ...
218 """
218 """
219 validateparttype(parttype)
219 validateparttype(parttype)
220 def _decorator(func):
220 def _decorator(func):
221 lparttype = parttype.lower() # enforce lower case matching.
221 lparttype = parttype.lower() # enforce lower case matching.
222 assert lparttype not in parthandlermapping
222 assert lparttype not in parthandlermapping
223 parthandlermapping[lparttype] = func
223 parthandlermapping[lparttype] = func
224 func.params = frozenset(params)
224 func.params = frozenset(params)
225 return func
225 return func
226 return _decorator
226 return _decorator
227
227
228 class unbundlerecords(object):
228 class unbundlerecords(object):
229 """keep record of what happens during and unbundle
229 """keep record of what happens during and unbundle
230
230
231 New records are added using `records.add('cat', obj)`. Where 'cat' is a
231 New records are added using `records.add('cat', obj)`. Where 'cat' is a
232 category of record and obj is an arbitrary object.
232 category of record and obj is an arbitrary object.
233
233
234 `records['cat']` will return all entries of this category 'cat'.
234 `records['cat']` will return all entries of this category 'cat'.
235
235
236 Iterating on the object itself will yield `('category', obj)` tuples
236 Iterating on the object itself will yield `('category', obj)` tuples
237 for all entries.
237 for all entries.
238
238
239 All iterations happens in chronological order.
239 All iterations happens in chronological order.
240 """
240 """
241
241
242 def __init__(self):
242 def __init__(self):
243 self._categories = {}
243 self._categories = {}
244 self._sequences = []
244 self._sequences = []
245 self._replies = {}
245 self._replies = {}
246
246
247 def add(self, category, entry, inreplyto=None):
247 def add(self, category, entry, inreplyto=None):
248 """add a new record of a given category.
248 """add a new record of a given category.
249
249
250 The entry can then be retrieved in the list returned by
250 The entry can then be retrieved in the list returned by
251 self['category']."""
251 self['category']."""
252 self._categories.setdefault(category, []).append(entry)
252 self._categories.setdefault(category, []).append(entry)
253 self._sequences.append((category, entry))
253 self._sequences.append((category, entry))
254 if inreplyto is not None:
254 if inreplyto is not None:
255 self.getreplies(inreplyto).add(category, entry)
255 self.getreplies(inreplyto).add(category, entry)
256
256
257 def getreplies(self, partid):
257 def getreplies(self, partid):
258 """get the records that are replies to a specific part"""
258 """get the records that are replies to a specific part"""
259 return self._replies.setdefault(partid, unbundlerecords())
259 return self._replies.setdefault(partid, unbundlerecords())
260
260
261 def __getitem__(self, cat):
261 def __getitem__(self, cat):
262 return tuple(self._categories.get(cat, ()))
262 return tuple(self._categories.get(cat, ()))
263
263
264 def __iter__(self):
264 def __iter__(self):
265 return iter(self._sequences)
265 return iter(self._sequences)
266
266
267 def __len__(self):
267 def __len__(self):
268 return len(self._sequences)
268 return len(self._sequences)
269
269
270 def __nonzero__(self):
270 def __nonzero__(self):
271 return bool(self._sequences)
271 return bool(self._sequences)
272
272
273 class bundleoperation(object):
273 class bundleoperation(object):
274 """an object that represents a single bundling process
274 """an object that represents a single bundling process
275
275
276 Its purpose is to carry unbundle-related objects and states.
276 Its purpose is to carry unbundle-related objects and states.
277
277
278 A new object should be created at the beginning of each bundle processing.
278 A new object should be created at the beginning of each bundle processing.
279 The object is to be returned by the processing function.
279 The object is to be returned by the processing function.
280
280
281 The object has very little content now it will ultimately contain:
281 The object has very little content now it will ultimately contain:
282 * an access to the repo the bundle is applied to,
282 * an access to the repo the bundle is applied to,
283 * a ui object,
283 * a ui object,
284 * a way to retrieve a transaction to add changes to the repo,
284 * a way to retrieve a transaction to add changes to the repo,
285 * a way to record the result of processing each part,
285 * a way to record the result of processing each part,
286 * a way to construct a bundle response when applicable.
286 * a way to construct a bundle response when applicable.
287 """
287 """
288
288
289 def __init__(self, repo, transactiongetter, captureoutput=True):
289 def __init__(self, repo, transactiongetter, captureoutput=True):
290 self.repo = repo
290 self.repo = repo
291 self.ui = repo.ui
291 self.ui = repo.ui
292 self.records = unbundlerecords()
292 self.records = unbundlerecords()
293 self.gettransaction = transactiongetter
293 self.gettransaction = transactiongetter
294 self.reply = None
294 self.reply = None
295 self.captureoutput = captureoutput
295 self.captureoutput = captureoutput
296
296
297 class TransactionUnavailable(RuntimeError):
297 class TransactionUnavailable(RuntimeError):
298 pass
298 pass
299
299
300 def _notransaction():
300 def _notransaction():
301 """default method to get a transaction while processing a bundle
301 """default method to get a transaction while processing a bundle
302
302
303 Raise an exception to highlight the fact that no transaction was expected
303 Raise an exception to highlight the fact that no transaction was expected
304 to be created"""
304 to be created"""
305 raise TransactionUnavailable()
305 raise TransactionUnavailable()
306
306
307 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
307 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
308 # transform me into unbundler.apply() as soon as the freeze is lifted
308 # transform me into unbundler.apply() as soon as the freeze is lifted
309 tr.hookargs['bundle2'] = '1'
309 tr.hookargs['bundle2'] = '1'
310 if source is not None and 'source' not in tr.hookargs:
310 if source is not None and 'source' not in tr.hookargs:
311 tr.hookargs['source'] = source
311 tr.hookargs['source'] = source
312 if url is not None and 'url' not in tr.hookargs:
312 if url is not None and 'url' not in tr.hookargs:
313 tr.hookargs['url'] = url
313 tr.hookargs['url'] = url
314 return processbundle(repo, unbundler, lambda: tr, op=op)
314 return processbundle(repo, unbundler, lambda: tr, op=op)
315
315
316 def processbundle(repo, unbundler, transactiongetter=None, op=None):
316 def processbundle(repo, unbundler, transactiongetter=None, op=None):
317 """This function process a bundle, apply effect to/from a repo
317 """This function process a bundle, apply effect to/from a repo
318
318
319 It iterates over each part then searches for and uses the proper handling
319 It iterates over each part then searches for and uses the proper handling
320 code to process the part. Parts are processed in order.
320 code to process the part. Parts are processed in order.
321
321
322 This is very early version of this function that will be strongly reworked
322 This is very early version of this function that will be strongly reworked
323 before final usage.
323 before final usage.
324
324
325 Unknown Mandatory part will abort the process.
325 Unknown Mandatory part will abort the process.
326
326
327 It is temporarily possible to provide a prebuilt bundleoperation to the
327 It is temporarily possible to provide a prebuilt bundleoperation to the
328 function. This is used to ensure output is properly propagated in case of
328 function. This is used to ensure output is properly propagated in case of
329 an error during the unbundling. This output capturing part will likely be
329 an error during the unbundling. This output capturing part will likely be
330 reworked and this ability will probably go away in the process.
330 reworked and this ability will probably go away in the process.
331 """
331 """
332 if op is None:
332 if op is None:
333 if transactiongetter is None:
333 if transactiongetter is None:
334 transactiongetter = _notransaction
334 transactiongetter = _notransaction
335 op = bundleoperation(repo, transactiongetter)
335 op = bundleoperation(repo, transactiongetter)
336 # todo:
336 # todo:
337 # - replace this is a init function soon.
337 # - replace this is a init function soon.
338 # - exception catching
338 # - exception catching
339 unbundler.params
339 unbundler.params
340 if repo.ui.debugflag:
340 if repo.ui.debugflag:
341 msg = ['bundle2-input-bundle:']
341 msg = ['bundle2-input-bundle:']
342 if unbundler.params:
342 if unbundler.params:
343 msg.append(' %i params')
343 msg.append(' %i params')
344 if op.gettransaction is None:
344 if op.gettransaction is None:
345 msg.append(' no-transaction')
345 msg.append(' no-transaction')
346 else:
346 else:
347 msg.append(' with-transaction')
347 msg.append(' with-transaction')
348 msg.append('\n')
348 msg.append('\n')
349 repo.ui.debug(''.join(msg))
349 repo.ui.debug(''.join(msg))
350 iterparts = enumerate(unbundler.iterparts())
350 iterparts = enumerate(unbundler.iterparts())
351 part = None
351 part = None
352 nbpart = 0
352 nbpart = 0
353 try:
353 try:
354 for nbpart, part in iterparts:
354 for nbpart, part in iterparts:
355 _processpart(op, part)
355 _processpart(op, part)
356 except BaseException as exc:
356 except BaseException as exc:
357 for nbpart, part in iterparts:
357 for nbpart, part in iterparts:
358 # consume the bundle content
358 # consume the bundle content
359 part.seek(0, 2)
359 part.seek(0, 2)
360 # Small hack to let caller code distinguish exceptions from bundle2
360 # Small hack to let caller code distinguish exceptions from bundle2
361 # processing from processing the old format. This is mostly
361 # processing from processing the old format. This is mostly
362 # needed to handle different return codes to unbundle according to the
362 # needed to handle different return codes to unbundle according to the
363 # type of bundle. We should probably clean up or drop this return code
363 # type of bundle. We should probably clean up or drop this return code
364 # craziness in a future version.
364 # craziness in a future version.
365 exc.duringunbundle2 = True
365 exc.duringunbundle2 = True
366 salvaged = []
366 salvaged = []
367 replycaps = None
367 replycaps = None
368 if op.reply is not None:
368 if op.reply is not None:
369 salvaged = op.reply.salvageoutput()
369 salvaged = op.reply.salvageoutput()
370 replycaps = op.reply.capabilities
370 replycaps = op.reply.capabilities
371 exc._replycaps = replycaps
371 exc._replycaps = replycaps
372 exc._bundle2salvagedoutput = salvaged
372 exc._bundle2salvagedoutput = salvaged
373 raise
373 raise
374 finally:
374 finally:
375 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
375 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
376
376
377 return op
377 return op
378
378
379 def _processpart(op, part):
379 def _processpart(op, part):
380 """process a single part from a bundle
380 """process a single part from a bundle
381
381
382 The part is guaranteed to have been fully consumed when the function exits
382 The part is guaranteed to have been fully consumed when the function exits
383 (even if an exception is raised)."""
383 (even if an exception is raised)."""
384 status = 'unknown' # used by debug output
384 status = 'unknown' # used by debug output
385 try:
385 try:
386 try:
386 try:
387 handler = parthandlermapping.get(part.type)
387 handler = parthandlermapping.get(part.type)
388 if handler is None:
388 if handler is None:
389 status = 'unsupported-type'
389 status = 'unsupported-type'
390 raise error.BundleUnknownFeatureError(parttype=part.type)
390 raise error.BundleUnknownFeatureError(parttype=part.type)
391 indebug(op.ui, 'found a handler for part %r' % part.type)
391 indebug(op.ui, 'found a handler for part %r' % part.type)
392 unknownparams = part.mandatorykeys - handler.params
392 unknownparams = part.mandatorykeys - handler.params
393 if unknownparams:
393 if unknownparams:
394 unknownparams = list(unknownparams)
394 unknownparams = list(unknownparams)
395 unknownparams.sort()
395 unknownparams.sort()
396 status = 'unsupported-params (%s)' % unknownparams
396 status = 'unsupported-params (%s)' % unknownparams
397 raise error.BundleUnknownFeatureError(parttype=part.type,
397 raise error.BundleUnknownFeatureError(parttype=part.type,
398 params=unknownparams)
398 params=unknownparams)
399 status = 'supported'
399 status = 'supported'
400 except error.BundleUnknownFeatureError as exc:
400 except error.BundleUnknownFeatureError as exc:
401 if part.mandatory: # mandatory parts
401 if part.mandatory: # mandatory parts
402 raise
402 raise
403 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
403 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
404 return # skip to part processing
404 return # skip to part processing
405 finally:
405 finally:
406 if op.ui.debugflag:
406 if op.ui.debugflag:
407 msg = ['bundle2-input-part: "%s"' % part.type]
407 msg = ['bundle2-input-part: "%s"' % part.type]
408 if not part.mandatory:
408 if not part.mandatory:
409 msg.append(' (advisory)')
409 msg.append(' (advisory)')
410 nbmp = len(part.mandatorykeys)
410 nbmp = len(part.mandatorykeys)
411 nbap = len(part.params) - nbmp
411 nbap = len(part.params) - nbmp
412 if nbmp or nbap:
412 if nbmp or nbap:
413 msg.append(' (params:')
413 msg.append(' (params:')
414 if nbmp:
414 if nbmp:
415 msg.append(' %i mandatory' % nbmp)
415 msg.append(' %i mandatory' % nbmp)
416 if nbap:
416 if nbap:
417 msg.append(' %i advisory' % nbmp)
417 msg.append(' %i advisory' % nbmp)
418 msg.append(')')
418 msg.append(')')
419 msg.append(' %s\n' % status)
419 msg.append(' %s\n' % status)
420 op.ui.debug(''.join(msg))
420 op.ui.debug(''.join(msg))
421
421
422 # handler is called outside the above try block so that we don't
422 # handler is called outside the above try block so that we don't
423 # risk catching KeyErrors from anything other than the
423 # risk catching KeyErrors from anything other than the
424 # parthandlermapping lookup (any KeyError raised by handler()
424 # parthandlermapping lookup (any KeyError raised by handler()
425 # itself represents a defect of a different variety).
425 # itself represents a defect of a different variety).
426 output = None
426 output = None
427 if op.captureoutput and op.reply is not None:
427 if op.captureoutput and op.reply is not None:
428 op.ui.pushbuffer(error=True, subproc=True)
428 op.ui.pushbuffer(error=True, subproc=True)
429 output = ''
429 output = ''
430 try:
430 try:
431 handler(op, part)
431 handler(op, part)
432 finally:
432 finally:
433 if output is not None:
433 if output is not None:
434 output = op.ui.popbuffer()
434 output = op.ui.popbuffer()
435 if output:
435 if output:
436 outpart = op.reply.newpart('output', data=output,
436 outpart = op.reply.newpart('output', data=output,
437 mandatory=False)
437 mandatory=False)
438 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
438 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
439 finally:
439 finally:
440 # consume the part content to not corrupt the stream.
440 # consume the part content to not corrupt the stream.
441 part.seek(0, 2)
441 part.seek(0, 2)
442
442
443
443
444 def decodecaps(blob):
444 def decodecaps(blob):
445 """decode a bundle2 caps bytes blob into a dictionary
445 """decode a bundle2 caps bytes blob into a dictionary
446
446
447 The blob is a list of capabilities (one per line)
447 The blob is a list of capabilities (one per line)
448 Capabilities may have values using a line of the form::
448 Capabilities may have values using a line of the form::
449
449
450 capability=value1,value2,value3
450 capability=value1,value2,value3
451
451
452 The values are always a list."""
452 The values are always a list."""
453 caps = {}
453 caps = {}
454 for line in blob.splitlines():
454 for line in blob.splitlines():
455 if not line:
455 if not line:
456 continue
456 continue
457 if '=' not in line:
457 if '=' not in line:
458 key, vals = line, ()
458 key, vals = line, ()
459 else:
459 else:
460 key, vals = line.split('=', 1)
460 key, vals = line.split('=', 1)
461 vals = vals.split(',')
461 vals = vals.split(',')
462 key = urlreq.unquote(key)
462 key = urlreq.unquote(key)
463 vals = [urlreq.unquote(v) for v in vals]
463 vals = [urlreq.unquote(v) for v in vals]
464 caps[key] = vals
464 caps[key] = vals
465 return caps
465 return caps
466
466
467 def encodecaps(caps):
467 def encodecaps(caps):
468 """encode a bundle2 caps dictionary into a bytes blob"""
468 """encode a bundle2 caps dictionary into a bytes blob"""
469 chunks = []
469 chunks = []
470 for ca in sorted(caps):
470 for ca in sorted(caps):
471 vals = caps[ca]
471 vals = caps[ca]
472 ca = urlreq.quote(ca)
472 ca = urlreq.quote(ca)
473 vals = [urlreq.quote(v) for v in vals]
473 vals = [urlreq.quote(v) for v in vals]
474 if vals:
474 if vals:
475 ca = "%s=%s" % (ca, ','.join(vals))
475 ca = "%s=%s" % (ca, ','.join(vals))
476 chunks.append(ca)
476 chunks.append(ca)
477 return '\n'.join(chunks)
477 return '\n'.join(chunks)
478
478
479 bundletypes = {
479 bundletypes = {
480 "": ("", None), # only when using unbundle on ssh and old http servers
480 "": ("", None), # only when using unbundle on ssh and old http servers
481 # since the unification ssh accepts a header but there
481 # since the unification ssh accepts a header but there
482 # is no capability signaling it.
482 # is no capability signaling it.
483 "HG20": (), # special-cased below
483 "HG20": (), # special-cased below
484 "HG10UN": ("HG10UN", None),
484 "HG10UN": ("HG10UN", None),
485 "HG10BZ": ("HG10", 'BZ'),
485 "HG10BZ": ("HG10", 'BZ'),
486 "HG10GZ": ("HG10GZ", 'GZ'),
486 "HG10GZ": ("HG10GZ", 'GZ'),
487 }
487 }
488
488
489 # hgweb uses this list to communicate its preferred type
489 # hgweb uses this list to communicate its preferred type
490 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
490 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
491
491
492 class bundle20(object):
492 class bundle20(object):
493 """represent an outgoing bundle2 container
493 """represent an outgoing bundle2 container
494
494
495 Use the `addparam` method to add stream level parameter. and `newpart` to
495 Use the `addparam` method to add stream level parameter. and `newpart` to
496 populate it. Then call `getchunks` to retrieve all the binary chunks of
496 populate it. Then call `getchunks` to retrieve all the binary chunks of
497 data that compose the bundle2 container."""
497 data that compose the bundle2 container."""
498
498
499 _magicstring = 'HG20'
499 _magicstring = 'HG20'
500
500
501 def __init__(self, ui, capabilities=()):
501 def __init__(self, ui, capabilities=()):
502 self.ui = ui
502 self.ui = ui
503 self._params = []
503 self._params = []
504 self._parts = []
504 self._parts = []
505 self.capabilities = dict(capabilities)
505 self.capabilities = dict(capabilities)
506 self._compressor = util.compressors[None]()
506 self._compressor = util.compressors[None]()
507
507
508 def setcompression(self, alg):
508 def setcompression(self, alg):
509 """setup core part compression to <alg>"""
509 """setup core part compression to <alg>"""
510 if alg is None:
510 if alg is None:
511 return
511 return
512 assert not any(n.lower() == 'Compression' for n, v in self._params)
512 assert not any(n.lower() == 'Compression' for n, v in self._params)
513 self.addparam('Compression', alg)
513 self.addparam('Compression', alg)
514 self._compressor = util.compressors[alg]()
514 self._compressor = util.compressors[alg]()
515
515
516 @property
516 @property
517 def nbparts(self):
517 def nbparts(self):
518 """total number of parts added to the bundler"""
518 """total number of parts added to the bundler"""
519 return len(self._parts)
519 return len(self._parts)
520
520
521 # methods used to defines the bundle2 content
521 # methods used to defines the bundle2 content
522 def addparam(self, name, value=None):
522 def addparam(self, name, value=None):
523 """add a stream level parameter"""
523 """add a stream level parameter"""
524 if not name:
524 if not name:
525 raise ValueError('empty parameter name')
525 raise ValueError('empty parameter name')
526 if name[0] not in string.letters:
526 if name[0] not in string.letters:
527 raise ValueError('non letter first character: %r' % name)
527 raise ValueError('non letter first character: %r' % name)
528 self._params.append((name, value))
528 self._params.append((name, value))
529
529
530 def addpart(self, part):
530 def addpart(self, part):
531 """add a new part to the bundle2 container
531 """add a new part to the bundle2 container
532
532
533 Parts contains the actual applicative payload."""
533 Parts contains the actual applicative payload."""
534 assert part.id is None
534 assert part.id is None
535 part.id = len(self._parts) # very cheap counter
535 part.id = len(self._parts) # very cheap counter
536 self._parts.append(part)
536 self._parts.append(part)
537
537
538 def newpart(self, typeid, *args, **kwargs):
538 def newpart(self, typeid, *args, **kwargs):
539 """create a new part and add it to the containers
539 """create a new part and add it to the containers
540
540
541 As the part is directly added to the containers. For now, this means
541 As the part is directly added to the containers. For now, this means
542 that any failure to properly initialize the part after calling
542 that any failure to properly initialize the part after calling
543 ``newpart`` should result in a failure of the whole bundling process.
543 ``newpart`` should result in a failure of the whole bundling process.
544
544
545 You can still fall back to manually create and add if you need better
545 You can still fall back to manually create and add if you need better
546 control."""
546 control."""
547 part = bundlepart(typeid, *args, **kwargs)
547 part = bundlepart(typeid, *args, **kwargs)
548 self.addpart(part)
548 self.addpart(part)
549 return part
549 return part
550
550
551 # methods used to generate the bundle2 stream
551 # methods used to generate the bundle2 stream
552 def getchunks(self):
552 def getchunks(self):
553 if self.ui.debugflag:
553 if self.ui.debugflag:
554 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
554 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
555 if self._params:
555 if self._params:
556 msg.append(' (%i params)' % len(self._params))
556 msg.append(' (%i params)' % len(self._params))
557 msg.append(' %i parts total\n' % len(self._parts))
557 msg.append(' %i parts total\n' % len(self._parts))
558 self.ui.debug(''.join(msg))
558 self.ui.debug(''.join(msg))
559 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
559 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
560 yield self._magicstring
560 yield self._magicstring
561 param = self._paramchunk()
561 param = self._paramchunk()
562 outdebug(self.ui, 'bundle parameter: %s' % param)
562 outdebug(self.ui, 'bundle parameter: %s' % param)
563 yield _pack(_fstreamparamsize, len(param))
563 yield _pack(_fstreamparamsize, len(param))
564 if param:
564 if param:
565 yield param
565 yield param
566 # starting compression
566 # starting compression
567 for chunk in self._getcorechunk():
567 for chunk in self._getcorechunk():
568 yield self._compressor.compress(chunk)
568 yield self._compressor.compress(chunk)
569 yield self._compressor.flush()
569 yield self._compressor.flush()
570
570
571 def _paramchunk(self):
571 def _paramchunk(self):
572 """return a encoded version of all stream parameters"""
572 """return a encoded version of all stream parameters"""
573 blocks = []
573 blocks = []
574 for par, value in self._params:
574 for par, value in self._params:
575 par = urlreq.quote(par)
575 par = urlreq.quote(par)
576 if value is not None:
576 if value is not None:
577 value = urlreq.quote(value)
577 value = urlreq.quote(value)
578 par = '%s=%s' % (par, value)
578 par = '%s=%s' % (par, value)
579 blocks.append(par)
579 blocks.append(par)
580 return ' '.join(blocks)
580 return ' '.join(blocks)
581
581
582 def _getcorechunk(self):
582 def _getcorechunk(self):
583 """yield chunk for the core part of the bundle
583 """yield chunk for the core part of the bundle
584
584
585 (all but headers and parameters)"""
585 (all but headers and parameters)"""
586 outdebug(self.ui, 'start of parts')
586 outdebug(self.ui, 'start of parts')
587 for part in self._parts:
587 for part in self._parts:
588 outdebug(self.ui, 'bundle part: "%s"' % part.type)
588 outdebug(self.ui, 'bundle part: "%s"' % part.type)
589 for chunk in part.getchunks(ui=self.ui):
589 for chunk in part.getchunks(ui=self.ui):
590 yield chunk
590 yield chunk
591 outdebug(self.ui, 'end of bundle')
591 outdebug(self.ui, 'end of bundle')
592 yield _pack(_fpartheadersize, 0)
592 yield _pack(_fpartheadersize, 0)
593
593
594
594
595 def salvageoutput(self):
595 def salvageoutput(self):
596 """return a list with a copy of all output parts in the bundle
596 """return a list with a copy of all output parts in the bundle
597
597
598 This is meant to be used during error handling to make sure we preserve
598 This is meant to be used during error handling to make sure we preserve
599 server output"""
599 server output"""
600 salvaged = []
600 salvaged = []
601 for part in self._parts:
601 for part in self._parts:
602 if part.type.startswith('output'):
602 if part.type.startswith('output'):
603 salvaged.append(part.copy())
603 salvaged.append(part.copy())
604 return salvaged
604 return salvaged
605
605
606
606
607 class unpackermixin(object):
607 class unpackermixin(object):
608 """A mixin to extract bytes and struct data from a stream"""
608 """A mixin to extract bytes and struct data from a stream"""
609
609
610 def __init__(self, fp):
610 def __init__(self, fp):
611 self._fp = fp
611 self._fp = fp
612 self._seekable = (util.safehasattr(fp, 'seek') and
612 self._seekable = (util.safehasattr(fp, 'seek') and
613 util.safehasattr(fp, 'tell'))
613 util.safehasattr(fp, 'tell'))
614
614
615 def _unpack(self, format):
615 def _unpack(self, format):
616 """unpack this struct format from the stream"""
616 """unpack this struct format from the stream"""
617 data = self._readexact(struct.calcsize(format))
617 data = self._readexact(struct.calcsize(format))
618 return _unpack(format, data)
618 return _unpack(format, data)
619
619
620 def _readexact(self, size):
620 def _readexact(self, size):
621 """read exactly <size> bytes from the stream"""
621 """read exactly <size> bytes from the stream"""
622 return changegroup.readexactly(self._fp, size)
622 return changegroup.readexactly(self._fp, size)
623
623
624 def seek(self, offset, whence=0):
624 def seek(self, offset, whence=0):
625 """move the underlying file pointer"""
625 """move the underlying file pointer"""
626 if self._seekable:
626 if self._seekable:
627 return self._fp.seek(offset, whence)
627 return self._fp.seek(offset, whence)
628 else:
628 else:
629 raise NotImplementedError(_('File pointer is not seekable'))
629 raise NotImplementedError(_('File pointer is not seekable'))
630
630
631 def tell(self):
631 def tell(self):
632 """return the file offset, or None if file is not seekable"""
632 """return the file offset, or None if file is not seekable"""
633 if self._seekable:
633 if self._seekable:
634 try:
634 try:
635 return self._fp.tell()
635 return self._fp.tell()
636 except IOError as e:
636 except IOError as e:
637 if e.errno == errno.ESPIPE:
637 if e.errno == errno.ESPIPE:
638 self._seekable = False
638 self._seekable = False
639 else:
639 else:
640 raise
640 raise
641 return None
641 return None
642
642
643 def close(self):
643 def close(self):
644 """close underlying file"""
644 """close underlying file"""
645 if util.safehasattr(self._fp, 'close'):
645 if util.safehasattr(self._fp, 'close'):
646 return self._fp.close()
646 return self._fp.close()
647
647
648 def getunbundler(ui, fp, magicstring=None):
648 def getunbundler(ui, fp, magicstring=None):
649 """return a valid unbundler object for a given magicstring"""
649 """return a valid unbundler object for a given magicstring"""
650 if magicstring is None:
650 if magicstring is None:
651 magicstring = changegroup.readexactly(fp, 4)
651 magicstring = changegroup.readexactly(fp, 4)
652 magic, version = magicstring[0:2], magicstring[2:4]
652 magic, version = magicstring[0:2], magicstring[2:4]
653 if magic != 'HG':
653 if magic != 'HG':
654 raise error.Abort(_('not a Mercurial bundle'))
654 raise error.Abort(_('not a Mercurial bundle'))
655 unbundlerclass = formatmap.get(version)
655 unbundlerclass = formatmap.get(version)
656 if unbundlerclass is None:
656 if unbundlerclass is None:
657 raise error.Abort(_('unknown bundle version %s') % version)
657 raise error.Abort(_('unknown bundle version %s') % version)
658 unbundler = unbundlerclass(ui, fp)
658 unbundler = unbundlerclass(ui, fp)
659 indebug(ui, 'start processing of %s stream' % magicstring)
659 indebug(ui, 'start processing of %s stream' % magicstring)
660 return unbundler
660 return unbundler
661
661
662 class unbundle20(unpackermixin):
662 class unbundle20(unpackermixin):
663 """interpret a bundle2 stream
663 """interpret a bundle2 stream
664
664
665 This class is fed with a binary stream and yields parts through its
665 This class is fed with a binary stream and yields parts through its
666 `iterparts` methods."""
666 `iterparts` methods."""
667
667
668 _magicstring = 'HG20'
668 _magicstring = 'HG20'
669
669
670 def __init__(self, ui, fp):
670 def __init__(self, ui, fp):
671 """If header is specified, we do not read it out of the stream."""
671 """If header is specified, we do not read it out of the stream."""
672 self.ui = ui
672 self.ui = ui
673 self._decompressor = util.decompressors[None]
673 self._decompressor = util.decompressors[None]
674 self._compressed = None
674 self._compressed = None
675 super(unbundle20, self).__init__(fp)
675 super(unbundle20, self).__init__(fp)
676
676
677 @util.propertycache
677 @util.propertycache
678 def params(self):
678 def params(self):
679 """dictionary of stream level parameters"""
679 """dictionary of stream level parameters"""
680 indebug(self.ui, 'reading bundle2 stream parameters')
680 indebug(self.ui, 'reading bundle2 stream parameters')
681 params = {}
681 params = {}
682 paramssize = self._unpack(_fstreamparamsize)[0]
682 paramssize = self._unpack(_fstreamparamsize)[0]
683 if paramssize < 0:
683 if paramssize < 0:
684 raise error.BundleValueError('negative bundle param size: %i'
684 raise error.BundleValueError('negative bundle param size: %i'
685 % paramssize)
685 % paramssize)
686 if paramssize:
686 if paramssize:
687 params = self._readexact(paramssize)
687 params = self._readexact(paramssize)
688 params = self._processallparams(params)
688 params = self._processallparams(params)
689 return params
689 return params
690
690
691 def _processallparams(self, paramsblock):
691 def _processallparams(self, paramsblock):
692 """"""
692 """"""
693 params = {}
693 params = util.sortdict()
694 for p in paramsblock.split(' '):
694 for p in paramsblock.split(' '):
695 p = p.split('=', 1)
695 p = p.split('=', 1)
696 p = [urlreq.unquote(i) for i in p]
696 p = [urlreq.unquote(i) for i in p]
697 if len(p) < 2:
697 if len(p) < 2:
698 p.append(None)
698 p.append(None)
699 self._processparam(*p)
699 self._processparam(*p)
700 params[p[0]] = p[1]
700 params[p[0]] = p[1]
701 return params
701 return params
702
702
703
703
704 def _processparam(self, name, value):
704 def _processparam(self, name, value):
705 """process a parameter, applying its effect if needed
705 """process a parameter, applying its effect if needed
706
706
707 Parameter starting with a lower case letter are advisory and will be
707 Parameter starting with a lower case letter are advisory and will be
708 ignored when unknown. Those starting with an upper case letter are
708 ignored when unknown. Those starting with an upper case letter are
709 mandatory and will this function will raise a KeyError when unknown.
709 mandatory and will this function will raise a KeyError when unknown.
710
710
711 Note: no option are currently supported. Any input will be either
711 Note: no option are currently supported. Any input will be either
712 ignored or failing.
712 ignored or failing.
713 """
713 """
714 if not name:
714 if not name:
715 raise ValueError('empty parameter name')
715 raise ValueError('empty parameter name')
716 if name[0] not in string.letters:
716 if name[0] not in string.letters:
717 raise ValueError('non letter first character: %r' % name)
717 raise ValueError('non letter first character: %r' % name)
718 try:
718 try:
719 handler = b2streamparamsmap[name.lower()]
719 handler = b2streamparamsmap[name.lower()]
720 except KeyError:
720 except KeyError:
721 if name[0].islower():
721 if name[0].islower():
722 indebug(self.ui, "ignoring unknown parameter %r" % name)
722 indebug(self.ui, "ignoring unknown parameter %r" % name)
723 else:
723 else:
724 raise error.BundleUnknownFeatureError(params=(name,))
724 raise error.BundleUnknownFeatureError(params=(name,))
725 else:
725 else:
726 handler(self, name, value)
726 handler(self, name, value)
727
727
728 def _forwardchunks(self):
728 def _forwardchunks(self):
729 """utility to transfer a bundle2 as binary
729 """utility to transfer a bundle2 as binary
730
730
731 This is made necessary by the fact the 'getbundle' command over 'ssh'
731 This is made necessary by the fact the 'getbundle' command over 'ssh'
732 have no way to know then the reply end, relying on the bundle to be
732 have no way to know then the reply end, relying on the bundle to be
733 interpreted to know its end. This is terrible and we are sorry, but we
733 interpreted to know its end. This is terrible and we are sorry, but we
734 needed to move forward to get general delta enabled.
734 needed to move forward to get general delta enabled.
735 """
735 """
736 yield self._magicstring
736 yield self._magicstring
737 assert 'params' not in vars(self)
737 assert 'params' not in vars(self)
738 paramssize = self._unpack(_fstreamparamsize)[0]
738 paramssize = self._unpack(_fstreamparamsize)[0]
739 if paramssize < 0:
739 if paramssize < 0:
740 raise error.BundleValueError('negative bundle param size: %i'
740 raise error.BundleValueError('negative bundle param size: %i'
741 % paramssize)
741 % paramssize)
742 yield _pack(_fstreamparamsize, paramssize)
742 yield _pack(_fstreamparamsize, paramssize)
743 if paramssize:
743 if paramssize:
744 params = self._readexact(paramssize)
744 params = self._readexact(paramssize)
745 self._processallparams(params)
745 self._processallparams(params)
746 yield params
746 yield params
747 assert self._decompressor is util.decompressors[None]
747 assert self._decompressor is util.decompressors[None]
748 # From there, payload might need to be decompressed
748 # From there, payload might need to be decompressed
749 self._fp = self._decompressor(self._fp)
749 self._fp = self._decompressor(self._fp)
750 emptycount = 0
750 emptycount = 0
751 while emptycount < 2:
751 while emptycount < 2:
752 # so we can brainlessly loop
752 # so we can brainlessly loop
753 assert _fpartheadersize == _fpayloadsize
753 assert _fpartheadersize == _fpayloadsize
754 size = self._unpack(_fpartheadersize)[0]
754 size = self._unpack(_fpartheadersize)[0]
755 yield _pack(_fpartheadersize, size)
755 yield _pack(_fpartheadersize, size)
756 if size:
756 if size:
757 emptycount = 0
757 emptycount = 0
758 else:
758 else:
759 emptycount += 1
759 emptycount += 1
760 continue
760 continue
761 if size == flaginterrupt:
761 if size == flaginterrupt:
762 continue
762 continue
763 elif size < 0:
763 elif size < 0:
764 raise error.BundleValueError('negative chunk size: %i')
764 raise error.BundleValueError('negative chunk size: %i')
765 yield self._readexact(size)
765 yield self._readexact(size)
766
766
767
767
768 def iterparts(self):
768 def iterparts(self):
769 """yield all parts contained in the stream"""
769 """yield all parts contained in the stream"""
770 # make sure param have been loaded
770 # make sure param have been loaded
771 self.params
771 self.params
772 # From there, payload need to be decompressed
772 # From there, payload need to be decompressed
773 self._fp = self._decompressor(self._fp)
773 self._fp = self._decompressor(self._fp)
774 indebug(self.ui, 'start extraction of bundle2 parts')
774 indebug(self.ui, 'start extraction of bundle2 parts')
775 headerblock = self._readpartheader()
775 headerblock = self._readpartheader()
776 while headerblock is not None:
776 while headerblock is not None:
777 part = unbundlepart(self.ui, headerblock, self._fp)
777 part = unbundlepart(self.ui, headerblock, self._fp)
778 yield part
778 yield part
779 part.seek(0, 2)
779 part.seek(0, 2)
780 headerblock = self._readpartheader()
780 headerblock = self._readpartheader()
781 indebug(self.ui, 'end of bundle2 stream')
781 indebug(self.ui, 'end of bundle2 stream')
782
782
783 def _readpartheader(self):
783 def _readpartheader(self):
784 """reads a part header size and return the bytes blob
784 """reads a part header size and return the bytes blob
785
785
786 returns None if empty"""
786 returns None if empty"""
787 headersize = self._unpack(_fpartheadersize)[0]
787 headersize = self._unpack(_fpartheadersize)[0]
788 if headersize < 0:
788 if headersize < 0:
789 raise error.BundleValueError('negative part header size: %i'
789 raise error.BundleValueError('negative part header size: %i'
790 % headersize)
790 % headersize)
791 indebug(self.ui, 'part header size: %i' % headersize)
791 indebug(self.ui, 'part header size: %i' % headersize)
792 if headersize:
792 if headersize:
793 return self._readexact(headersize)
793 return self._readexact(headersize)
794 return None
794 return None
795
795
796 def compressed(self):
796 def compressed(self):
797 self.params # load params
797 self.params # load params
798 return self._compressed
798 return self._compressed
799
799
800 formatmap = {'20': unbundle20}
800 formatmap = {'20': unbundle20}
801
801
802 b2streamparamsmap = {}
802 b2streamparamsmap = {}
803
803
804 def b2streamparamhandler(name):
804 def b2streamparamhandler(name):
805 """register a handler for a stream level parameter"""
805 """register a handler for a stream level parameter"""
806 def decorator(func):
806 def decorator(func):
807 assert name not in formatmap
807 assert name not in formatmap
808 b2streamparamsmap[name] = func
808 b2streamparamsmap[name] = func
809 return func
809 return func
810 return decorator
810 return decorator
811
811
812 @b2streamparamhandler('compression')
812 @b2streamparamhandler('compression')
813 def processcompression(unbundler, param, value):
813 def processcompression(unbundler, param, value):
814 """read compression parameter and install payload decompression"""
814 """read compression parameter and install payload decompression"""
815 if value not in util.decompressors:
815 if value not in util.decompressors:
816 raise error.BundleUnknownFeatureError(params=(param,),
816 raise error.BundleUnknownFeatureError(params=(param,),
817 values=(value,))
817 values=(value,))
818 unbundler._decompressor = util.decompressors[value]
818 unbundler._decompressor = util.decompressors[value]
819 if value is not None:
819 if value is not None:
820 unbundler._compressed = True
820 unbundler._compressed = True
821
821
822 class bundlepart(object):
822 class bundlepart(object):
823 """A bundle2 part contains application level payload
823 """A bundle2 part contains application level payload
824
824
825 The part `type` is used to route the part to the application level
825 The part `type` is used to route the part to the application level
826 handler.
826 handler.
827
827
828 The part payload is contained in ``part.data``. It could be raw bytes or a
828 The part payload is contained in ``part.data``. It could be raw bytes or a
829 generator of byte chunks.
829 generator of byte chunks.
830
830
831 You can add parameters to the part using the ``addparam`` method.
831 You can add parameters to the part using the ``addparam`` method.
832 Parameters can be either mandatory (default) or advisory. Remote side
832 Parameters can be either mandatory (default) or advisory. Remote side
833 should be able to safely ignore the advisory ones.
833 should be able to safely ignore the advisory ones.
834
834
835 Both data and parameters cannot be modified after the generation has begun.
835 Both data and parameters cannot be modified after the generation has begun.
836 """
836 """
837
837
838 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
838 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
839 data='', mandatory=True):
839 data='', mandatory=True):
840 validateparttype(parttype)
840 validateparttype(parttype)
841 self.id = None
841 self.id = None
842 self.type = parttype
842 self.type = parttype
843 self._data = data
843 self._data = data
844 self._mandatoryparams = list(mandatoryparams)
844 self._mandatoryparams = list(mandatoryparams)
845 self._advisoryparams = list(advisoryparams)
845 self._advisoryparams = list(advisoryparams)
846 # checking for duplicated entries
846 # checking for duplicated entries
847 self._seenparams = set()
847 self._seenparams = set()
848 for pname, __ in self._mandatoryparams + self._advisoryparams:
848 for pname, __ in self._mandatoryparams + self._advisoryparams:
849 if pname in self._seenparams:
849 if pname in self._seenparams:
850 raise RuntimeError('duplicated params: %s' % pname)
850 raise RuntimeError('duplicated params: %s' % pname)
851 self._seenparams.add(pname)
851 self._seenparams.add(pname)
852 # status of the part's generation:
852 # status of the part's generation:
853 # - None: not started,
853 # - None: not started,
854 # - False: currently generated,
854 # - False: currently generated,
855 # - True: generation done.
855 # - True: generation done.
856 self._generated = None
856 self._generated = None
857 self.mandatory = mandatory
857 self.mandatory = mandatory
858
858
859 def copy(self):
859 def copy(self):
860 """return a copy of the part
860 """return a copy of the part
861
861
862 The new part have the very same content but no partid assigned yet.
862 The new part have the very same content but no partid assigned yet.
863 Parts with generated data cannot be copied."""
863 Parts with generated data cannot be copied."""
864 assert not util.safehasattr(self.data, 'next')
864 assert not util.safehasattr(self.data, 'next')
865 return self.__class__(self.type, self._mandatoryparams,
865 return self.__class__(self.type, self._mandatoryparams,
866 self._advisoryparams, self._data, self.mandatory)
866 self._advisoryparams, self._data, self.mandatory)
867
867
868 # methods used to defines the part content
868 # methods used to defines the part content
869 @property
869 @property
870 def data(self):
870 def data(self):
871 return self._data
871 return self._data
872
872
873 @data.setter
873 @data.setter
874 def data(self, data):
874 def data(self, data):
875 if self._generated is not None:
875 if self._generated is not None:
876 raise error.ReadOnlyPartError('part is being generated')
876 raise error.ReadOnlyPartError('part is being generated')
877 self._data = data
877 self._data = data
878
878
879 @property
879 @property
880 def mandatoryparams(self):
880 def mandatoryparams(self):
881 # make it an immutable tuple to force people through ``addparam``
881 # make it an immutable tuple to force people through ``addparam``
882 return tuple(self._mandatoryparams)
882 return tuple(self._mandatoryparams)
883
883
884 @property
884 @property
885 def advisoryparams(self):
885 def advisoryparams(self):
886 # make it an immutable tuple to force people through ``addparam``
886 # make it an immutable tuple to force people through ``addparam``
887 return tuple(self._advisoryparams)
887 return tuple(self._advisoryparams)
888
888
889 def addparam(self, name, value='', mandatory=True):
889 def addparam(self, name, value='', mandatory=True):
890 if self._generated is not None:
890 if self._generated is not None:
891 raise error.ReadOnlyPartError('part is being generated')
891 raise error.ReadOnlyPartError('part is being generated')
892 if name in self._seenparams:
892 if name in self._seenparams:
893 raise ValueError('duplicated params: %s' % name)
893 raise ValueError('duplicated params: %s' % name)
894 self._seenparams.add(name)
894 self._seenparams.add(name)
895 params = self._advisoryparams
895 params = self._advisoryparams
896 if mandatory:
896 if mandatory:
897 params = self._mandatoryparams
897 params = self._mandatoryparams
898 params.append((name, value))
898 params.append((name, value))
899
899
900 # methods used to generates the bundle2 stream
900 # methods used to generates the bundle2 stream
901 def getchunks(self, ui):
901 def getchunks(self, ui):
902 if self._generated is not None:
902 if self._generated is not None:
903 raise RuntimeError('part can only be consumed once')
903 raise RuntimeError('part can only be consumed once')
904 self._generated = False
904 self._generated = False
905
905
906 if ui.debugflag:
906 if ui.debugflag:
907 msg = ['bundle2-output-part: "%s"' % self.type]
907 msg = ['bundle2-output-part: "%s"' % self.type]
908 if not self.mandatory:
908 if not self.mandatory:
909 msg.append(' (advisory)')
909 msg.append(' (advisory)')
910 nbmp = len(self.mandatoryparams)
910 nbmp = len(self.mandatoryparams)
911 nbap = len(self.advisoryparams)
911 nbap = len(self.advisoryparams)
912 if nbmp or nbap:
912 if nbmp or nbap:
913 msg.append(' (params:')
913 msg.append(' (params:')
914 if nbmp:
914 if nbmp:
915 msg.append(' %i mandatory' % nbmp)
915 msg.append(' %i mandatory' % nbmp)
916 if nbap:
916 if nbap:
917 msg.append(' %i advisory' % nbmp)
917 msg.append(' %i advisory' % nbmp)
918 msg.append(')')
918 msg.append(')')
919 if not self.data:
919 if not self.data:
920 msg.append(' empty payload')
920 msg.append(' empty payload')
921 elif util.safehasattr(self.data, 'next'):
921 elif util.safehasattr(self.data, 'next'):
922 msg.append(' streamed payload')
922 msg.append(' streamed payload')
923 else:
923 else:
924 msg.append(' %i bytes payload' % len(self.data))
924 msg.append(' %i bytes payload' % len(self.data))
925 msg.append('\n')
925 msg.append('\n')
926 ui.debug(''.join(msg))
926 ui.debug(''.join(msg))
927
927
928 #### header
928 #### header
929 if self.mandatory:
929 if self.mandatory:
930 parttype = self.type.upper()
930 parttype = self.type.upper()
931 else:
931 else:
932 parttype = self.type.lower()
932 parttype = self.type.lower()
933 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
933 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
934 ## parttype
934 ## parttype
935 header = [_pack(_fparttypesize, len(parttype)),
935 header = [_pack(_fparttypesize, len(parttype)),
936 parttype, _pack(_fpartid, self.id),
936 parttype, _pack(_fpartid, self.id),
937 ]
937 ]
938 ## parameters
938 ## parameters
939 # count
939 # count
940 manpar = self.mandatoryparams
940 manpar = self.mandatoryparams
941 advpar = self.advisoryparams
941 advpar = self.advisoryparams
942 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
942 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
943 # size
943 # size
944 parsizes = []
944 parsizes = []
945 for key, value in manpar:
945 for key, value in manpar:
946 parsizes.append(len(key))
946 parsizes.append(len(key))
947 parsizes.append(len(value))
947 parsizes.append(len(value))
948 for key, value in advpar:
948 for key, value in advpar:
949 parsizes.append(len(key))
949 parsizes.append(len(key))
950 parsizes.append(len(value))
950 parsizes.append(len(value))
951 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
951 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
952 header.append(paramsizes)
952 header.append(paramsizes)
953 # key, value
953 # key, value
954 for key, value in manpar:
954 for key, value in manpar:
955 header.append(key)
955 header.append(key)
956 header.append(value)
956 header.append(value)
957 for key, value in advpar:
957 for key, value in advpar:
958 header.append(key)
958 header.append(key)
959 header.append(value)
959 header.append(value)
960 ## finalize header
960 ## finalize header
961 headerchunk = ''.join(header)
961 headerchunk = ''.join(header)
962 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
962 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
963 yield _pack(_fpartheadersize, len(headerchunk))
963 yield _pack(_fpartheadersize, len(headerchunk))
964 yield headerchunk
964 yield headerchunk
965 ## payload
965 ## payload
966 try:
966 try:
967 for chunk in self._payloadchunks():
967 for chunk in self._payloadchunks():
968 outdebug(ui, 'payload chunk size: %i' % len(chunk))
968 outdebug(ui, 'payload chunk size: %i' % len(chunk))
969 yield _pack(_fpayloadsize, len(chunk))
969 yield _pack(_fpayloadsize, len(chunk))
970 yield chunk
970 yield chunk
971 except GeneratorExit:
971 except GeneratorExit:
972 # GeneratorExit means that nobody is listening for our
972 # GeneratorExit means that nobody is listening for our
973 # results anyway, so just bail quickly rather than trying
973 # results anyway, so just bail quickly rather than trying
974 # to produce an error part.
974 # to produce an error part.
975 ui.debug('bundle2-generatorexit\n')
975 ui.debug('bundle2-generatorexit\n')
976 raise
976 raise
977 except BaseException as exc:
977 except BaseException as exc:
978 # backup exception data for later
978 # backup exception data for later
979 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
979 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
980 % exc)
980 % exc)
981 exc_info = sys.exc_info()
981 exc_info = sys.exc_info()
982 msg = 'unexpected error: %s' % exc
982 msg = 'unexpected error: %s' % exc
983 interpart = bundlepart('error:abort', [('message', msg)],
983 interpart = bundlepart('error:abort', [('message', msg)],
984 mandatory=False)
984 mandatory=False)
985 interpart.id = 0
985 interpart.id = 0
986 yield _pack(_fpayloadsize, -1)
986 yield _pack(_fpayloadsize, -1)
987 for chunk in interpart.getchunks(ui=ui):
987 for chunk in interpart.getchunks(ui=ui):
988 yield chunk
988 yield chunk
989 outdebug(ui, 'closing payload chunk')
989 outdebug(ui, 'closing payload chunk')
990 # abort current part payload
990 # abort current part payload
991 yield _pack(_fpayloadsize, 0)
991 yield _pack(_fpayloadsize, 0)
992 raise exc_info[0], exc_info[1], exc_info[2]
992 raise exc_info[0], exc_info[1], exc_info[2]
993 # end of payload
993 # end of payload
994 outdebug(ui, 'closing payload chunk')
994 outdebug(ui, 'closing payload chunk')
995 yield _pack(_fpayloadsize, 0)
995 yield _pack(_fpayloadsize, 0)
996 self._generated = True
996 self._generated = True
997
997
998 def _payloadchunks(self):
998 def _payloadchunks(self):
999 """yield chunks of a the part payload
999 """yield chunks of a the part payload
1000
1000
1001 Exists to handle the different methods to provide data to a part."""
1001 Exists to handle the different methods to provide data to a part."""
1002 # we only support fixed size data now.
1002 # we only support fixed size data now.
1003 # This will be improved in the future.
1003 # This will be improved in the future.
1004 if util.safehasattr(self.data, 'next'):
1004 if util.safehasattr(self.data, 'next'):
1005 buff = util.chunkbuffer(self.data)
1005 buff = util.chunkbuffer(self.data)
1006 chunk = buff.read(preferedchunksize)
1006 chunk = buff.read(preferedchunksize)
1007 while chunk:
1007 while chunk:
1008 yield chunk
1008 yield chunk
1009 chunk = buff.read(preferedchunksize)
1009 chunk = buff.read(preferedchunksize)
1010 elif len(self.data):
1010 elif len(self.data):
1011 yield self.data
1011 yield self.data
1012
1012
1013
1013
1014 flaginterrupt = -1
1014 flaginterrupt = -1
1015
1015
1016 class interrupthandler(unpackermixin):
1016 class interrupthandler(unpackermixin):
1017 """read one part and process it with restricted capability
1017 """read one part and process it with restricted capability
1018
1018
1019 This allows to transmit exception raised on the producer size during part
1019 This allows to transmit exception raised on the producer size during part
1020 iteration while the consumer is reading a part.
1020 iteration while the consumer is reading a part.
1021
1021
1022 Part processed in this manner only have access to a ui object,"""
1022 Part processed in this manner only have access to a ui object,"""
1023
1023
1024 def __init__(self, ui, fp):
1024 def __init__(self, ui, fp):
1025 super(interrupthandler, self).__init__(fp)
1025 super(interrupthandler, self).__init__(fp)
1026 self.ui = ui
1026 self.ui = ui
1027
1027
1028 def _readpartheader(self):
1028 def _readpartheader(self):
1029 """reads a part header size and return the bytes blob
1029 """reads a part header size and return the bytes blob
1030
1030
1031 returns None if empty"""
1031 returns None if empty"""
1032 headersize = self._unpack(_fpartheadersize)[0]
1032 headersize = self._unpack(_fpartheadersize)[0]
1033 if headersize < 0:
1033 if headersize < 0:
1034 raise error.BundleValueError('negative part header size: %i'
1034 raise error.BundleValueError('negative part header size: %i'
1035 % headersize)
1035 % headersize)
1036 indebug(self.ui, 'part header size: %i\n' % headersize)
1036 indebug(self.ui, 'part header size: %i\n' % headersize)
1037 if headersize:
1037 if headersize:
1038 return self._readexact(headersize)
1038 return self._readexact(headersize)
1039 return None
1039 return None
1040
1040
1041 def __call__(self):
1041 def __call__(self):
1042
1042
1043 self.ui.debug('bundle2-input-stream-interrupt:'
1043 self.ui.debug('bundle2-input-stream-interrupt:'
1044 ' opening out of band context\n')
1044 ' opening out of band context\n')
1045 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1045 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1046 headerblock = self._readpartheader()
1046 headerblock = self._readpartheader()
1047 if headerblock is None:
1047 if headerblock is None:
1048 indebug(self.ui, 'no part found during interruption.')
1048 indebug(self.ui, 'no part found during interruption.')
1049 return
1049 return
1050 part = unbundlepart(self.ui, headerblock, self._fp)
1050 part = unbundlepart(self.ui, headerblock, self._fp)
1051 op = interruptoperation(self.ui)
1051 op = interruptoperation(self.ui)
1052 _processpart(op, part)
1052 _processpart(op, part)
1053 self.ui.debug('bundle2-input-stream-interrupt:'
1053 self.ui.debug('bundle2-input-stream-interrupt:'
1054 ' closing out of band context\n')
1054 ' closing out of band context\n')
1055
1055
1056 class interruptoperation(object):
1056 class interruptoperation(object):
1057 """A limited operation to be use by part handler during interruption
1057 """A limited operation to be use by part handler during interruption
1058
1058
1059 It only have access to an ui object.
1059 It only have access to an ui object.
1060 """
1060 """
1061
1061
1062 def __init__(self, ui):
1062 def __init__(self, ui):
1063 self.ui = ui
1063 self.ui = ui
1064 self.reply = None
1064 self.reply = None
1065 self.captureoutput = False
1065 self.captureoutput = False
1066
1066
1067 @property
1067 @property
1068 def repo(self):
1068 def repo(self):
1069 raise RuntimeError('no repo access from stream interruption')
1069 raise RuntimeError('no repo access from stream interruption')
1070
1070
1071 def gettransaction(self):
1071 def gettransaction(self):
1072 raise TransactionUnavailable('no repo access from stream interruption')
1072 raise TransactionUnavailable('no repo access from stream interruption')
1073
1073
1074 class unbundlepart(unpackermixin):
1074 class unbundlepart(unpackermixin):
1075 """a bundle part read from a bundle"""
1075 """a bundle part read from a bundle"""
1076
1076
1077 def __init__(self, ui, header, fp):
1077 def __init__(self, ui, header, fp):
1078 super(unbundlepart, self).__init__(fp)
1078 super(unbundlepart, self).__init__(fp)
1079 self.ui = ui
1079 self.ui = ui
1080 # unbundle state attr
1080 # unbundle state attr
1081 self._headerdata = header
1081 self._headerdata = header
1082 self._headeroffset = 0
1082 self._headeroffset = 0
1083 self._initialized = False
1083 self._initialized = False
1084 self.consumed = False
1084 self.consumed = False
1085 # part data
1085 # part data
1086 self.id = None
1086 self.id = None
1087 self.type = None
1087 self.type = None
1088 self.mandatoryparams = None
1088 self.mandatoryparams = None
1089 self.advisoryparams = None
1089 self.advisoryparams = None
1090 self.params = None
1090 self.params = None
1091 self.mandatorykeys = ()
1091 self.mandatorykeys = ()
1092 self._payloadstream = None
1092 self._payloadstream = None
1093 self._readheader()
1093 self._readheader()
1094 self._mandatory = None
1094 self._mandatory = None
1095 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1095 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1096 self._pos = 0
1096 self._pos = 0
1097
1097
1098 def _fromheader(self, size):
1098 def _fromheader(self, size):
1099 """return the next <size> byte from the header"""
1099 """return the next <size> byte from the header"""
1100 offset = self._headeroffset
1100 offset = self._headeroffset
1101 data = self._headerdata[offset:(offset + size)]
1101 data = self._headerdata[offset:(offset + size)]
1102 self._headeroffset = offset + size
1102 self._headeroffset = offset + size
1103 return data
1103 return data
1104
1104
1105 def _unpackheader(self, format):
1105 def _unpackheader(self, format):
1106 """read given format from header
1106 """read given format from header
1107
1107
1108 This automatically compute the size of the format to read."""
1108 This automatically compute the size of the format to read."""
1109 data = self._fromheader(struct.calcsize(format))
1109 data = self._fromheader(struct.calcsize(format))
1110 return _unpack(format, data)
1110 return _unpack(format, data)
1111
1111
1112 def _initparams(self, mandatoryparams, advisoryparams):
1112 def _initparams(self, mandatoryparams, advisoryparams):
1113 """internal function to setup all logic related parameters"""
1113 """internal function to setup all logic related parameters"""
1114 # make it read only to prevent people touching it by mistake.
1114 # make it read only to prevent people touching it by mistake.
1115 self.mandatoryparams = tuple(mandatoryparams)
1115 self.mandatoryparams = tuple(mandatoryparams)
1116 self.advisoryparams = tuple(advisoryparams)
1116 self.advisoryparams = tuple(advisoryparams)
1117 # user friendly UI
1117 # user friendly UI
1118 self.params = dict(self.mandatoryparams)
1118 self.params = util.sortdict(self.mandatoryparams)
1119 self.params.update(dict(self.advisoryparams))
1119 self.params.update(self.advisoryparams)
1120 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1120 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1121
1121
1122 def _payloadchunks(self, chunknum=0):
1122 def _payloadchunks(self, chunknum=0):
1123 '''seek to specified chunk and start yielding data'''
1123 '''seek to specified chunk and start yielding data'''
1124 if len(self._chunkindex) == 0:
1124 if len(self._chunkindex) == 0:
1125 assert chunknum == 0, 'Must start with chunk 0'
1125 assert chunknum == 0, 'Must start with chunk 0'
1126 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1126 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1127 else:
1127 else:
1128 assert chunknum < len(self._chunkindex), \
1128 assert chunknum < len(self._chunkindex), \
1129 'Unknown chunk %d' % chunknum
1129 'Unknown chunk %d' % chunknum
1130 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1130 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1131
1131
1132 pos = self._chunkindex[chunknum][0]
1132 pos = self._chunkindex[chunknum][0]
1133 payloadsize = self._unpack(_fpayloadsize)[0]
1133 payloadsize = self._unpack(_fpayloadsize)[0]
1134 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1134 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1135 while payloadsize:
1135 while payloadsize:
1136 if payloadsize == flaginterrupt:
1136 if payloadsize == flaginterrupt:
1137 # interruption detection, the handler will now read a
1137 # interruption detection, the handler will now read a
1138 # single part and process it.
1138 # single part and process it.
1139 interrupthandler(self.ui, self._fp)()
1139 interrupthandler(self.ui, self._fp)()
1140 elif payloadsize < 0:
1140 elif payloadsize < 0:
1141 msg = 'negative payload chunk size: %i' % payloadsize
1141 msg = 'negative payload chunk size: %i' % payloadsize
1142 raise error.BundleValueError(msg)
1142 raise error.BundleValueError(msg)
1143 else:
1143 else:
1144 result = self._readexact(payloadsize)
1144 result = self._readexact(payloadsize)
1145 chunknum += 1
1145 chunknum += 1
1146 pos += payloadsize
1146 pos += payloadsize
1147 if chunknum == len(self._chunkindex):
1147 if chunknum == len(self._chunkindex):
1148 self._chunkindex.append((pos,
1148 self._chunkindex.append((pos,
1149 super(unbundlepart, self).tell()))
1149 super(unbundlepart, self).tell()))
1150 yield result
1150 yield result
1151 payloadsize = self._unpack(_fpayloadsize)[0]
1151 payloadsize = self._unpack(_fpayloadsize)[0]
1152 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1152 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1153
1153
1154 def _findchunk(self, pos):
1154 def _findchunk(self, pos):
1155 '''for a given payload position, return a chunk number and offset'''
1155 '''for a given payload position, return a chunk number and offset'''
1156 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1156 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1157 if ppos == pos:
1157 if ppos == pos:
1158 return chunk, 0
1158 return chunk, 0
1159 elif ppos > pos:
1159 elif ppos > pos:
1160 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1160 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1161 raise ValueError('Unknown chunk')
1161 raise ValueError('Unknown chunk')
1162
1162
1163 def _readheader(self):
1163 def _readheader(self):
1164 """read the header and setup the object"""
1164 """read the header and setup the object"""
1165 typesize = self._unpackheader(_fparttypesize)[0]
1165 typesize = self._unpackheader(_fparttypesize)[0]
1166 self.type = self._fromheader(typesize)
1166 self.type = self._fromheader(typesize)
1167 indebug(self.ui, 'part type: "%s"' % self.type)
1167 indebug(self.ui, 'part type: "%s"' % self.type)
1168 self.id = self._unpackheader(_fpartid)[0]
1168 self.id = self._unpackheader(_fpartid)[0]
1169 indebug(self.ui, 'part id: "%s"' % self.id)
1169 indebug(self.ui, 'part id: "%s"' % self.id)
1170 # extract mandatory bit from type
1170 # extract mandatory bit from type
1171 self.mandatory = (self.type != self.type.lower())
1171 self.mandatory = (self.type != self.type.lower())
1172 self.type = self.type.lower()
1172 self.type = self.type.lower()
1173 ## reading parameters
1173 ## reading parameters
1174 # param count
1174 # param count
1175 mancount, advcount = self._unpackheader(_fpartparamcount)
1175 mancount, advcount = self._unpackheader(_fpartparamcount)
1176 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1176 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1177 # param size
1177 # param size
1178 fparamsizes = _makefpartparamsizes(mancount + advcount)
1178 fparamsizes = _makefpartparamsizes(mancount + advcount)
1179 paramsizes = self._unpackheader(fparamsizes)
1179 paramsizes = self._unpackheader(fparamsizes)
1180 # make it a list of couple again
1180 # make it a list of couple again
1181 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1181 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1182 # split mandatory from advisory
1182 # split mandatory from advisory
1183 mansizes = paramsizes[:mancount]
1183 mansizes = paramsizes[:mancount]
1184 advsizes = paramsizes[mancount:]
1184 advsizes = paramsizes[mancount:]
1185 # retrieve param value
1185 # retrieve param value
1186 manparams = []
1186 manparams = []
1187 for key, value in mansizes:
1187 for key, value in mansizes:
1188 manparams.append((self._fromheader(key), self._fromheader(value)))
1188 manparams.append((self._fromheader(key), self._fromheader(value)))
1189 advparams = []
1189 advparams = []
1190 for key, value in advsizes:
1190 for key, value in advsizes:
1191 advparams.append((self._fromheader(key), self._fromheader(value)))
1191 advparams.append((self._fromheader(key), self._fromheader(value)))
1192 self._initparams(manparams, advparams)
1192 self._initparams(manparams, advparams)
1193 ## part payload
1193 ## part payload
1194 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1194 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1195 # we read the data, tell it
1195 # we read the data, tell it
1196 self._initialized = True
1196 self._initialized = True
1197
1197
1198 def read(self, size=None):
1198 def read(self, size=None):
1199 """read payload data"""
1199 """read payload data"""
1200 if not self._initialized:
1200 if not self._initialized:
1201 self._readheader()
1201 self._readheader()
1202 if size is None:
1202 if size is None:
1203 data = self._payloadstream.read()
1203 data = self._payloadstream.read()
1204 else:
1204 else:
1205 data = self._payloadstream.read(size)
1205 data = self._payloadstream.read(size)
1206 self._pos += len(data)
1206 self._pos += len(data)
1207 if size is None or len(data) < size:
1207 if size is None or len(data) < size:
1208 if not self.consumed and self._pos:
1208 if not self.consumed and self._pos:
1209 self.ui.debug('bundle2-input-part: total payload size %i\n'
1209 self.ui.debug('bundle2-input-part: total payload size %i\n'
1210 % self._pos)
1210 % self._pos)
1211 self.consumed = True
1211 self.consumed = True
1212 return data
1212 return data
1213
1213
1214 def tell(self):
1214 def tell(self):
1215 return self._pos
1215 return self._pos
1216
1216
1217 def seek(self, offset, whence=0):
1217 def seek(self, offset, whence=0):
1218 if whence == 0:
1218 if whence == 0:
1219 newpos = offset
1219 newpos = offset
1220 elif whence == 1:
1220 elif whence == 1:
1221 newpos = self._pos + offset
1221 newpos = self._pos + offset
1222 elif whence == 2:
1222 elif whence == 2:
1223 if not self.consumed:
1223 if not self.consumed:
1224 self.read()
1224 self.read()
1225 newpos = self._chunkindex[-1][0] - offset
1225 newpos = self._chunkindex[-1][0] - offset
1226 else:
1226 else:
1227 raise ValueError('Unknown whence value: %r' % (whence,))
1227 raise ValueError('Unknown whence value: %r' % (whence,))
1228
1228
1229 if newpos > self._chunkindex[-1][0] and not self.consumed:
1229 if newpos > self._chunkindex[-1][0] and not self.consumed:
1230 self.read()
1230 self.read()
1231 if not 0 <= newpos <= self._chunkindex[-1][0]:
1231 if not 0 <= newpos <= self._chunkindex[-1][0]:
1232 raise ValueError('Offset out of range')
1232 raise ValueError('Offset out of range')
1233
1233
1234 if self._pos != newpos:
1234 if self._pos != newpos:
1235 chunk, internaloffset = self._findchunk(newpos)
1235 chunk, internaloffset = self._findchunk(newpos)
1236 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1236 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1237 adjust = self.read(internaloffset)
1237 adjust = self.read(internaloffset)
1238 if len(adjust) != internaloffset:
1238 if len(adjust) != internaloffset:
1239 raise error.Abort(_('Seek failed\n'))
1239 raise error.Abort(_('Seek failed\n'))
1240 self._pos = newpos
1240 self._pos = newpos
1241
1241
1242 # These are only the static capabilities.
1242 # These are only the static capabilities.
1243 # Check the 'getrepocaps' function for the rest.
1243 # Check the 'getrepocaps' function for the rest.
1244 capabilities = {'HG20': (),
1244 capabilities = {'HG20': (),
1245 'error': ('abort', 'unsupportedcontent', 'pushraced',
1245 'error': ('abort', 'unsupportedcontent', 'pushraced',
1246 'pushkey'),
1246 'pushkey'),
1247 'listkeys': (),
1247 'listkeys': (),
1248 'pushkey': (),
1248 'pushkey': (),
1249 'digests': tuple(sorted(util.DIGESTS.keys())),
1249 'digests': tuple(sorted(util.DIGESTS.keys())),
1250 'remote-changegroup': ('http', 'https'),
1250 'remote-changegroup': ('http', 'https'),
1251 'hgtagsfnodes': (),
1251 'hgtagsfnodes': (),
1252 }
1252 }
1253
1253
1254 def getrepocaps(repo, allowpushback=False):
1254 def getrepocaps(repo, allowpushback=False):
1255 """return the bundle2 capabilities for a given repo
1255 """return the bundle2 capabilities for a given repo
1256
1256
1257 Exists to allow extensions (like evolution) to mutate the capabilities.
1257 Exists to allow extensions (like evolution) to mutate the capabilities.
1258 """
1258 """
1259 caps = capabilities.copy()
1259 caps = capabilities.copy()
1260 caps['changegroup'] = tuple(sorted(
1260 caps['changegroup'] = tuple(sorted(
1261 changegroup.supportedincomingversions(repo)))
1261 changegroup.supportedincomingversions(repo)))
1262 if obsolete.isenabled(repo, obsolete.exchangeopt):
1262 if obsolete.isenabled(repo, obsolete.exchangeopt):
1263 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1263 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1264 caps['obsmarkers'] = supportedformat
1264 caps['obsmarkers'] = supportedformat
1265 if allowpushback:
1265 if allowpushback:
1266 caps['pushback'] = ()
1266 caps['pushback'] = ()
1267 return caps
1267 return caps
1268
1268
1269 def bundle2caps(remote):
1269 def bundle2caps(remote):
1270 """return the bundle capabilities of a peer as dict"""
1270 """return the bundle capabilities of a peer as dict"""
1271 raw = remote.capable('bundle2')
1271 raw = remote.capable('bundle2')
1272 if not raw and raw != '':
1272 if not raw and raw != '':
1273 return {}
1273 return {}
1274 capsblob = urlreq.unquote(remote.capable('bundle2'))
1274 capsblob = urlreq.unquote(remote.capable('bundle2'))
1275 return decodecaps(capsblob)
1275 return decodecaps(capsblob)
1276
1276
1277 def obsmarkersversion(caps):
1277 def obsmarkersversion(caps):
1278 """extract the list of supported obsmarkers versions from a bundle2caps dict
1278 """extract the list of supported obsmarkers versions from a bundle2caps dict
1279 """
1279 """
1280 obscaps = caps.get('obsmarkers', ())
1280 obscaps = caps.get('obsmarkers', ())
1281 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1281 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1282
1282
1283 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
1283 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
1284 """Write a bundle file and return its filename.
1284 """Write a bundle file and return its filename.
1285
1285
1286 Existing files will not be overwritten.
1286 Existing files will not be overwritten.
1287 If no filename is specified, a temporary file is created.
1287 If no filename is specified, a temporary file is created.
1288 bz2 compression can be turned off.
1288 bz2 compression can be turned off.
1289 The bundle file will be deleted in case of errors.
1289 The bundle file will be deleted in case of errors.
1290 """
1290 """
1291
1291
1292 if bundletype == "HG20":
1292 if bundletype == "HG20":
1293 bundle = bundle20(ui)
1293 bundle = bundle20(ui)
1294 bundle.setcompression(compression)
1294 bundle.setcompression(compression)
1295 part = bundle.newpart('changegroup', data=cg.getchunks())
1295 part = bundle.newpart('changegroup', data=cg.getchunks())
1296 part.addparam('version', cg.version)
1296 part.addparam('version', cg.version)
1297 chunkiter = bundle.getchunks()
1297 chunkiter = bundle.getchunks()
1298 else:
1298 else:
1299 # compression argument is only for the bundle2 case
1299 # compression argument is only for the bundle2 case
1300 assert compression is None
1300 assert compression is None
1301 if cg.version != '01':
1301 if cg.version != '01':
1302 raise error.Abort(_('old bundle types only supports v1 '
1302 raise error.Abort(_('old bundle types only supports v1 '
1303 'changegroups'))
1303 'changegroups'))
1304 header, comp = bundletypes[bundletype]
1304 header, comp = bundletypes[bundletype]
1305 if comp not in util.compressors:
1305 if comp not in util.compressors:
1306 raise error.Abort(_('unknown stream compression type: %s')
1306 raise error.Abort(_('unknown stream compression type: %s')
1307 % comp)
1307 % comp)
1308 z = util.compressors[comp]()
1308 z = util.compressors[comp]()
1309 subchunkiter = cg.getchunks()
1309 subchunkiter = cg.getchunks()
1310 def chunkiter():
1310 def chunkiter():
1311 yield header
1311 yield header
1312 for chunk in subchunkiter:
1312 for chunk in subchunkiter:
1313 yield z.compress(chunk)
1313 yield z.compress(chunk)
1314 yield z.flush()
1314 yield z.flush()
1315 chunkiter = chunkiter()
1315 chunkiter = chunkiter()
1316
1316
1317 # parse the changegroup data, otherwise we will block
1317 # parse the changegroup data, otherwise we will block
1318 # in case of sshrepo because we don't know the end of the stream
1318 # in case of sshrepo because we don't know the end of the stream
1319 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1319 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1320
1320
1321 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1321 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1322 def handlechangegroup(op, inpart):
1322 def handlechangegroup(op, inpart):
1323 """apply a changegroup part on the repo
1323 """apply a changegroup part on the repo
1324
1324
1325 This is a very early implementation that will massive rework before being
1325 This is a very early implementation that will massive rework before being
1326 inflicted to any end-user.
1326 inflicted to any end-user.
1327 """
1327 """
1328 # Make sure we trigger a transaction creation
1328 # Make sure we trigger a transaction creation
1329 #
1329 #
1330 # The addchangegroup function will get a transaction object by itself, but
1330 # The addchangegroup function will get a transaction object by itself, but
1331 # we need to make sure we trigger the creation of a transaction object used
1331 # we need to make sure we trigger the creation of a transaction object used
1332 # for the whole processing scope.
1332 # for the whole processing scope.
1333 op.gettransaction()
1333 op.gettransaction()
1334 unpackerversion = inpart.params.get('version', '01')
1334 unpackerversion = inpart.params.get('version', '01')
1335 # We should raise an appropriate exception here
1335 # We should raise an appropriate exception here
1336 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1336 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1337 # the source and url passed here are overwritten by the one contained in
1337 # the source and url passed here are overwritten by the one contained in
1338 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1338 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1339 nbchangesets = None
1339 nbchangesets = None
1340 if 'nbchanges' in inpart.params:
1340 if 'nbchanges' in inpart.params:
1341 nbchangesets = int(inpart.params.get('nbchanges'))
1341 nbchangesets = int(inpart.params.get('nbchanges'))
1342 if ('treemanifest' in inpart.params and
1342 if ('treemanifest' in inpart.params and
1343 'treemanifest' not in op.repo.requirements):
1343 'treemanifest' not in op.repo.requirements):
1344 if len(op.repo.changelog) != 0:
1344 if len(op.repo.changelog) != 0:
1345 raise error.Abort(_(
1345 raise error.Abort(_(
1346 "bundle contains tree manifests, but local repo is "
1346 "bundle contains tree manifests, but local repo is "
1347 "non-empty and does not use tree manifests"))
1347 "non-empty and does not use tree manifests"))
1348 op.repo.requirements.add('treemanifest')
1348 op.repo.requirements.add('treemanifest')
1349 op.repo._applyopenerreqs()
1349 op.repo._applyopenerreqs()
1350 op.repo._writerequirements()
1350 op.repo._writerequirements()
1351 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1351 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1352 op.records.add('changegroup', {'return': ret})
1352 op.records.add('changegroup', {'return': ret})
1353 if op.reply is not None:
1353 if op.reply is not None:
1354 # This is definitely not the final form of this
1354 # This is definitely not the final form of this
1355 # return. But one need to start somewhere.
1355 # return. But one need to start somewhere.
1356 part = op.reply.newpart('reply:changegroup', mandatory=False)
1356 part = op.reply.newpart('reply:changegroup', mandatory=False)
1357 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1357 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1358 part.addparam('return', '%i' % ret, mandatory=False)
1358 part.addparam('return', '%i' % ret, mandatory=False)
1359 assert not inpart.read()
1359 assert not inpart.read()
1360
1360
1361 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1361 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1362 ['digest:%s' % k for k in util.DIGESTS.keys()])
1362 ['digest:%s' % k for k in util.DIGESTS.keys()])
1363 @parthandler('remote-changegroup', _remotechangegroupparams)
1363 @parthandler('remote-changegroup', _remotechangegroupparams)
1364 def handleremotechangegroup(op, inpart):
1364 def handleremotechangegroup(op, inpart):
1365 """apply a bundle10 on the repo, given an url and validation information
1365 """apply a bundle10 on the repo, given an url and validation information
1366
1366
1367 All the information about the remote bundle to import are given as
1367 All the information about the remote bundle to import are given as
1368 parameters. The parameters include:
1368 parameters. The parameters include:
1369 - url: the url to the bundle10.
1369 - url: the url to the bundle10.
1370 - size: the bundle10 file size. It is used to validate what was
1370 - size: the bundle10 file size. It is used to validate what was
1371 retrieved by the client matches the server knowledge about the bundle.
1371 retrieved by the client matches the server knowledge about the bundle.
1372 - digests: a space separated list of the digest types provided as
1372 - digests: a space separated list of the digest types provided as
1373 parameters.
1373 parameters.
1374 - digest:<digest-type>: the hexadecimal representation of the digest with
1374 - digest:<digest-type>: the hexadecimal representation of the digest with
1375 that name. Like the size, it is used to validate what was retrieved by
1375 that name. Like the size, it is used to validate what was retrieved by
1376 the client matches what the server knows about the bundle.
1376 the client matches what the server knows about the bundle.
1377
1377
1378 When multiple digest types are given, all of them are checked.
1378 When multiple digest types are given, all of them are checked.
1379 """
1379 """
1380 try:
1380 try:
1381 raw_url = inpart.params['url']
1381 raw_url = inpart.params['url']
1382 except KeyError:
1382 except KeyError:
1383 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1383 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1384 parsed_url = util.url(raw_url)
1384 parsed_url = util.url(raw_url)
1385 if parsed_url.scheme not in capabilities['remote-changegroup']:
1385 if parsed_url.scheme not in capabilities['remote-changegroup']:
1386 raise error.Abort(_('remote-changegroup does not support %s urls') %
1386 raise error.Abort(_('remote-changegroup does not support %s urls') %
1387 parsed_url.scheme)
1387 parsed_url.scheme)
1388
1388
1389 try:
1389 try:
1390 size = int(inpart.params['size'])
1390 size = int(inpart.params['size'])
1391 except ValueError:
1391 except ValueError:
1392 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1392 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1393 % 'size')
1393 % 'size')
1394 except KeyError:
1394 except KeyError:
1395 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1395 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1396
1396
1397 digests = {}
1397 digests = {}
1398 for typ in inpart.params.get('digests', '').split():
1398 for typ in inpart.params.get('digests', '').split():
1399 param = 'digest:%s' % typ
1399 param = 'digest:%s' % typ
1400 try:
1400 try:
1401 value = inpart.params[param]
1401 value = inpart.params[param]
1402 except KeyError:
1402 except KeyError:
1403 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1403 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1404 param)
1404 param)
1405 digests[typ] = value
1405 digests[typ] = value
1406
1406
1407 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1407 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1408
1408
1409 # Make sure we trigger a transaction creation
1409 # Make sure we trigger a transaction creation
1410 #
1410 #
1411 # The addchangegroup function will get a transaction object by itself, but
1411 # The addchangegroup function will get a transaction object by itself, but
1412 # we need to make sure we trigger the creation of a transaction object used
1412 # we need to make sure we trigger the creation of a transaction object used
1413 # for the whole processing scope.
1413 # for the whole processing scope.
1414 op.gettransaction()
1414 op.gettransaction()
1415 from . import exchange
1415 from . import exchange
1416 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1416 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1417 if not isinstance(cg, changegroup.cg1unpacker):
1417 if not isinstance(cg, changegroup.cg1unpacker):
1418 raise error.Abort(_('%s: not a bundle version 1.0') %
1418 raise error.Abort(_('%s: not a bundle version 1.0') %
1419 util.hidepassword(raw_url))
1419 util.hidepassword(raw_url))
1420 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1420 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1421 op.records.add('changegroup', {'return': ret})
1421 op.records.add('changegroup', {'return': ret})
1422 if op.reply is not None:
1422 if op.reply is not None:
1423 # This is definitely not the final form of this
1423 # This is definitely not the final form of this
1424 # return. But one need to start somewhere.
1424 # return. But one need to start somewhere.
1425 part = op.reply.newpart('reply:changegroup')
1425 part = op.reply.newpart('reply:changegroup')
1426 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1426 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1427 part.addparam('return', '%i' % ret, mandatory=False)
1427 part.addparam('return', '%i' % ret, mandatory=False)
1428 try:
1428 try:
1429 real_part.validate()
1429 real_part.validate()
1430 except error.Abort as e:
1430 except error.Abort as e:
1431 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1431 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1432 (util.hidepassword(raw_url), str(e)))
1432 (util.hidepassword(raw_url), str(e)))
1433 assert not inpart.read()
1433 assert not inpart.read()
1434
1434
1435 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1435 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1436 def handlereplychangegroup(op, inpart):
1436 def handlereplychangegroup(op, inpart):
1437 ret = int(inpart.params['return'])
1437 ret = int(inpart.params['return'])
1438 replyto = int(inpart.params['in-reply-to'])
1438 replyto = int(inpart.params['in-reply-to'])
1439 op.records.add('changegroup', {'return': ret}, replyto)
1439 op.records.add('changegroup', {'return': ret}, replyto)
1440
1440
1441 @parthandler('check:heads')
1441 @parthandler('check:heads')
1442 def handlecheckheads(op, inpart):
1442 def handlecheckheads(op, inpart):
1443 """check that head of the repo did not change
1443 """check that head of the repo did not change
1444
1444
1445 This is used to detect a push race when using unbundle.
1445 This is used to detect a push race when using unbundle.
1446 This replaces the "heads" argument of unbundle."""
1446 This replaces the "heads" argument of unbundle."""
1447 h = inpart.read(20)
1447 h = inpart.read(20)
1448 heads = []
1448 heads = []
1449 while len(h) == 20:
1449 while len(h) == 20:
1450 heads.append(h)
1450 heads.append(h)
1451 h = inpart.read(20)
1451 h = inpart.read(20)
1452 assert not h
1452 assert not h
1453 # Trigger a transaction so that we are guaranteed to have the lock now.
1453 # Trigger a transaction so that we are guaranteed to have the lock now.
1454 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1454 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1455 op.gettransaction()
1455 op.gettransaction()
1456 if sorted(heads) != sorted(op.repo.heads()):
1456 if sorted(heads) != sorted(op.repo.heads()):
1457 raise error.PushRaced('repository changed while pushing - '
1457 raise error.PushRaced('repository changed while pushing - '
1458 'please try again')
1458 'please try again')
1459
1459
1460 @parthandler('output')
1460 @parthandler('output')
1461 def handleoutput(op, inpart):
1461 def handleoutput(op, inpart):
1462 """forward output captured on the server to the client"""
1462 """forward output captured on the server to the client"""
1463 for line in inpart.read().splitlines():
1463 for line in inpart.read().splitlines():
1464 op.ui.status(('remote: %s\n' % line))
1464 op.ui.status(('remote: %s\n' % line))
1465
1465
1466 @parthandler('replycaps')
1466 @parthandler('replycaps')
1467 def handlereplycaps(op, inpart):
1467 def handlereplycaps(op, inpart):
1468 """Notify that a reply bundle should be created
1468 """Notify that a reply bundle should be created
1469
1469
1470 The payload contains the capabilities information for the reply"""
1470 The payload contains the capabilities information for the reply"""
1471 caps = decodecaps(inpart.read())
1471 caps = decodecaps(inpart.read())
1472 if op.reply is None:
1472 if op.reply is None:
1473 op.reply = bundle20(op.ui, caps)
1473 op.reply = bundle20(op.ui, caps)
1474
1474
1475 class AbortFromPart(error.Abort):
1475 class AbortFromPart(error.Abort):
1476 """Sub-class of Abort that denotes an error from a bundle2 part."""
1476 """Sub-class of Abort that denotes an error from a bundle2 part."""
1477
1477
1478 @parthandler('error:abort', ('message', 'hint'))
1478 @parthandler('error:abort', ('message', 'hint'))
1479 def handleerrorabort(op, inpart):
1479 def handleerrorabort(op, inpart):
1480 """Used to transmit abort error over the wire"""
1480 """Used to transmit abort error over the wire"""
1481 raise AbortFromPart(inpart.params['message'],
1481 raise AbortFromPart(inpart.params['message'],
1482 hint=inpart.params.get('hint'))
1482 hint=inpart.params.get('hint'))
1483
1483
1484 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1484 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1485 'in-reply-to'))
1485 'in-reply-to'))
1486 def handleerrorpushkey(op, inpart):
1486 def handleerrorpushkey(op, inpart):
1487 """Used to transmit failure of a mandatory pushkey over the wire"""
1487 """Used to transmit failure of a mandatory pushkey over the wire"""
1488 kwargs = {}
1488 kwargs = {}
1489 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1489 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1490 value = inpart.params.get(name)
1490 value = inpart.params.get(name)
1491 if value is not None:
1491 if value is not None:
1492 kwargs[name] = value
1492 kwargs[name] = value
1493 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1493 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1494
1494
1495 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1495 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1496 def handleerrorunsupportedcontent(op, inpart):
1496 def handleerrorunsupportedcontent(op, inpart):
1497 """Used to transmit unknown content error over the wire"""
1497 """Used to transmit unknown content error over the wire"""
1498 kwargs = {}
1498 kwargs = {}
1499 parttype = inpart.params.get('parttype')
1499 parttype = inpart.params.get('parttype')
1500 if parttype is not None:
1500 if parttype is not None:
1501 kwargs['parttype'] = parttype
1501 kwargs['parttype'] = parttype
1502 params = inpart.params.get('params')
1502 params = inpart.params.get('params')
1503 if params is not None:
1503 if params is not None:
1504 kwargs['params'] = params.split('\0')
1504 kwargs['params'] = params.split('\0')
1505
1505
1506 raise error.BundleUnknownFeatureError(**kwargs)
1506 raise error.BundleUnknownFeatureError(**kwargs)
1507
1507
1508 @parthandler('error:pushraced', ('message',))
1508 @parthandler('error:pushraced', ('message',))
1509 def handleerrorpushraced(op, inpart):
1509 def handleerrorpushraced(op, inpart):
1510 """Used to transmit push race error over the wire"""
1510 """Used to transmit push race error over the wire"""
1511 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1511 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1512
1512
1513 @parthandler('listkeys', ('namespace',))
1513 @parthandler('listkeys', ('namespace',))
1514 def handlelistkeys(op, inpart):
1514 def handlelistkeys(op, inpart):
1515 """retrieve pushkey namespace content stored in a bundle2"""
1515 """retrieve pushkey namespace content stored in a bundle2"""
1516 namespace = inpart.params['namespace']
1516 namespace = inpart.params['namespace']
1517 r = pushkey.decodekeys(inpart.read())
1517 r = pushkey.decodekeys(inpart.read())
1518 op.records.add('listkeys', (namespace, r))
1518 op.records.add('listkeys', (namespace, r))
1519
1519
1520 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1520 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1521 def handlepushkey(op, inpart):
1521 def handlepushkey(op, inpart):
1522 """process a pushkey request"""
1522 """process a pushkey request"""
1523 dec = pushkey.decode
1523 dec = pushkey.decode
1524 namespace = dec(inpart.params['namespace'])
1524 namespace = dec(inpart.params['namespace'])
1525 key = dec(inpart.params['key'])
1525 key = dec(inpart.params['key'])
1526 old = dec(inpart.params['old'])
1526 old = dec(inpart.params['old'])
1527 new = dec(inpart.params['new'])
1527 new = dec(inpart.params['new'])
1528 # Grab the transaction to ensure that we have the lock before performing the
1528 # Grab the transaction to ensure that we have the lock before performing the
1529 # pushkey.
1529 # pushkey.
1530 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1530 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1531 op.gettransaction()
1531 op.gettransaction()
1532 ret = op.repo.pushkey(namespace, key, old, new)
1532 ret = op.repo.pushkey(namespace, key, old, new)
1533 record = {'namespace': namespace,
1533 record = {'namespace': namespace,
1534 'key': key,
1534 'key': key,
1535 'old': old,
1535 'old': old,
1536 'new': new}
1536 'new': new}
1537 op.records.add('pushkey', record)
1537 op.records.add('pushkey', record)
1538 if op.reply is not None:
1538 if op.reply is not None:
1539 rpart = op.reply.newpart('reply:pushkey')
1539 rpart = op.reply.newpart('reply:pushkey')
1540 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1540 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1541 rpart.addparam('return', '%i' % ret, mandatory=False)
1541 rpart.addparam('return', '%i' % ret, mandatory=False)
1542 if inpart.mandatory and not ret:
1542 if inpart.mandatory and not ret:
1543 kwargs = {}
1543 kwargs = {}
1544 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1544 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1545 if key in inpart.params:
1545 if key in inpart.params:
1546 kwargs[key] = inpart.params[key]
1546 kwargs[key] = inpart.params[key]
1547 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1547 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1548
1548
1549 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1549 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1550 def handlepushkeyreply(op, inpart):
1550 def handlepushkeyreply(op, inpart):
1551 """retrieve the result of a pushkey request"""
1551 """retrieve the result of a pushkey request"""
1552 ret = int(inpart.params['return'])
1552 ret = int(inpart.params['return'])
1553 partid = int(inpart.params['in-reply-to'])
1553 partid = int(inpart.params['in-reply-to'])
1554 op.records.add('pushkey', {'return': ret}, partid)
1554 op.records.add('pushkey', {'return': ret}, partid)
1555
1555
1556 @parthandler('obsmarkers')
1556 @parthandler('obsmarkers')
1557 def handleobsmarker(op, inpart):
1557 def handleobsmarker(op, inpart):
1558 """add a stream of obsmarkers to the repo"""
1558 """add a stream of obsmarkers to the repo"""
1559 tr = op.gettransaction()
1559 tr = op.gettransaction()
1560 markerdata = inpart.read()
1560 markerdata = inpart.read()
1561 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1561 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1562 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1562 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1563 % len(markerdata))
1563 % len(markerdata))
1564 # The mergemarkers call will crash if marker creation is not enabled.
1564 # The mergemarkers call will crash if marker creation is not enabled.
1565 # we want to avoid this if the part is advisory.
1565 # we want to avoid this if the part is advisory.
1566 if not inpart.mandatory and op.repo.obsstore.readonly:
1566 if not inpart.mandatory and op.repo.obsstore.readonly:
1567 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1567 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1568 return
1568 return
1569 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1569 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1570 if new:
1570 if new:
1571 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1571 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1572 op.records.add('obsmarkers', {'new': new})
1572 op.records.add('obsmarkers', {'new': new})
1573 if op.reply is not None:
1573 if op.reply is not None:
1574 rpart = op.reply.newpart('reply:obsmarkers')
1574 rpart = op.reply.newpart('reply:obsmarkers')
1575 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1575 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1576 rpart.addparam('new', '%i' % new, mandatory=False)
1576 rpart.addparam('new', '%i' % new, mandatory=False)
1577
1577
1578
1578
1579 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1579 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1580 def handleobsmarkerreply(op, inpart):
1580 def handleobsmarkerreply(op, inpart):
1581 """retrieve the result of a pushkey request"""
1581 """retrieve the result of a pushkey request"""
1582 ret = int(inpart.params['new'])
1582 ret = int(inpart.params['new'])
1583 partid = int(inpart.params['in-reply-to'])
1583 partid = int(inpart.params['in-reply-to'])
1584 op.records.add('obsmarkers', {'new': ret}, partid)
1584 op.records.add('obsmarkers', {'new': ret}, partid)
1585
1585
1586 @parthandler('hgtagsfnodes')
1586 @parthandler('hgtagsfnodes')
1587 def handlehgtagsfnodes(op, inpart):
1587 def handlehgtagsfnodes(op, inpart):
1588 """Applies .hgtags fnodes cache entries to the local repo.
1588 """Applies .hgtags fnodes cache entries to the local repo.
1589
1589
1590 Payload is pairs of 20 byte changeset nodes and filenodes.
1590 Payload is pairs of 20 byte changeset nodes and filenodes.
1591 """
1591 """
1592 # Grab the transaction so we ensure that we have the lock at this point.
1592 # Grab the transaction so we ensure that we have the lock at this point.
1593 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1593 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1594 op.gettransaction()
1594 op.gettransaction()
1595 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1595 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1596
1596
1597 count = 0
1597 count = 0
1598 while True:
1598 while True:
1599 node = inpart.read(20)
1599 node = inpart.read(20)
1600 fnode = inpart.read(20)
1600 fnode = inpart.read(20)
1601 if len(node) < 20 or len(fnode) < 20:
1601 if len(node) < 20 or len(fnode) < 20:
1602 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1602 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1603 break
1603 break
1604 cache.setfnode(node, fnode)
1604 cache.setfnode(node, fnode)
1605 count += 1
1605 count += 1
1606
1606
1607 cache.write()
1607 cache.write()
1608 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1608 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now