##// END OF EJS Templates
bundle2: introduce an "applybundle" function...
Pierre-Yves David -
r26790:28a6c2d7 default
parent child Browse files
Show More
@@ -1,1527 +1,1531 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155 import urllib
155 import urllib
156
156
157 from .i18n import _
157 from .i18n import _
158 from . import (
158 from . import (
159 changegroup,
159 changegroup,
160 error,
160 error,
161 obsolete,
161 obsolete,
162 pushkey,
162 pushkey,
163 tags,
163 tags,
164 url,
164 url,
165 util,
165 util,
166 )
166 )
167
167
168 _pack = struct.pack
168 _pack = struct.pack
169 _unpack = struct.unpack
169 _unpack = struct.unpack
170
170
171 _fstreamparamsize = '>i'
171 _fstreamparamsize = '>i'
172 _fpartheadersize = '>i'
172 _fpartheadersize = '>i'
173 _fparttypesize = '>B'
173 _fparttypesize = '>B'
174 _fpartid = '>I'
174 _fpartid = '>I'
175 _fpayloadsize = '>i'
175 _fpayloadsize = '>i'
176 _fpartparamcount = '>BB'
176 _fpartparamcount = '>BB'
177
177
178 preferedchunksize = 4096
178 preferedchunksize = 4096
179
179
180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
181
181
182 def outdebug(ui, message):
182 def outdebug(ui, message):
183 """debug regarding output stream (bundling)"""
183 """debug regarding output stream (bundling)"""
184 if ui.configbool('devel', 'bundle2.debug', False):
184 if ui.configbool('devel', 'bundle2.debug', False):
185 ui.debug('bundle2-output: %s\n' % message)
185 ui.debug('bundle2-output: %s\n' % message)
186
186
187 def indebug(ui, message):
187 def indebug(ui, message):
188 """debug on input stream (unbundling)"""
188 """debug on input stream (unbundling)"""
189 if ui.configbool('devel', 'bundle2.debug', False):
189 if ui.configbool('devel', 'bundle2.debug', False):
190 ui.debug('bundle2-input: %s\n' % message)
190 ui.debug('bundle2-input: %s\n' % message)
191
191
192 def validateparttype(parttype):
192 def validateparttype(parttype):
193 """raise ValueError if a parttype contains invalid character"""
193 """raise ValueError if a parttype contains invalid character"""
194 if _parttypeforbidden.search(parttype):
194 if _parttypeforbidden.search(parttype):
195 raise ValueError(parttype)
195 raise ValueError(parttype)
196
196
197 def _makefpartparamsizes(nbparams):
197 def _makefpartparamsizes(nbparams):
198 """return a struct format to read part parameter sizes
198 """return a struct format to read part parameter sizes
199
199
200 The number parameters is variable so we need to build that format
200 The number parameters is variable so we need to build that format
201 dynamically.
201 dynamically.
202 """
202 """
203 return '>'+('BB'*nbparams)
203 return '>'+('BB'*nbparams)
204
204
205 parthandlermapping = {}
205 parthandlermapping = {}
206
206
207 def parthandler(parttype, params=()):
207 def parthandler(parttype, params=()):
208 """decorator that register a function as a bundle2 part handler
208 """decorator that register a function as a bundle2 part handler
209
209
210 eg::
210 eg::
211
211
212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
213 def myparttypehandler(...):
213 def myparttypehandler(...):
214 '''process a part of type "my part".'''
214 '''process a part of type "my part".'''
215 ...
215 ...
216 """
216 """
217 validateparttype(parttype)
217 validateparttype(parttype)
218 def _decorator(func):
218 def _decorator(func):
219 lparttype = parttype.lower() # enforce lower case matching.
219 lparttype = parttype.lower() # enforce lower case matching.
220 assert lparttype not in parthandlermapping
220 assert lparttype not in parthandlermapping
221 parthandlermapping[lparttype] = func
221 parthandlermapping[lparttype] = func
222 func.params = frozenset(params)
222 func.params = frozenset(params)
223 return func
223 return func
224 return _decorator
224 return _decorator
225
225
226 class unbundlerecords(object):
226 class unbundlerecords(object):
227 """keep record of what happens during and unbundle
227 """keep record of what happens during and unbundle
228
228
229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
230 category of record and obj is an arbitrary object.
230 category of record and obj is an arbitrary object.
231
231
232 `records['cat']` will return all entries of this category 'cat'.
232 `records['cat']` will return all entries of this category 'cat'.
233
233
234 Iterating on the object itself will yield `('category', obj)` tuples
234 Iterating on the object itself will yield `('category', obj)` tuples
235 for all entries.
235 for all entries.
236
236
237 All iterations happens in chronological order.
237 All iterations happens in chronological order.
238 """
238 """
239
239
240 def __init__(self):
240 def __init__(self):
241 self._categories = {}
241 self._categories = {}
242 self._sequences = []
242 self._sequences = []
243 self._replies = {}
243 self._replies = {}
244
244
245 def add(self, category, entry, inreplyto=None):
245 def add(self, category, entry, inreplyto=None):
246 """add a new record of a given category.
246 """add a new record of a given category.
247
247
248 The entry can then be retrieved in the list returned by
248 The entry can then be retrieved in the list returned by
249 self['category']."""
249 self['category']."""
250 self._categories.setdefault(category, []).append(entry)
250 self._categories.setdefault(category, []).append(entry)
251 self._sequences.append((category, entry))
251 self._sequences.append((category, entry))
252 if inreplyto is not None:
252 if inreplyto is not None:
253 self.getreplies(inreplyto).add(category, entry)
253 self.getreplies(inreplyto).add(category, entry)
254
254
255 def getreplies(self, partid):
255 def getreplies(self, partid):
256 """get the records that are replies to a specific part"""
256 """get the records that are replies to a specific part"""
257 return self._replies.setdefault(partid, unbundlerecords())
257 return self._replies.setdefault(partid, unbundlerecords())
258
258
259 def __getitem__(self, cat):
259 def __getitem__(self, cat):
260 return tuple(self._categories.get(cat, ()))
260 return tuple(self._categories.get(cat, ()))
261
261
262 def __iter__(self):
262 def __iter__(self):
263 return iter(self._sequences)
263 return iter(self._sequences)
264
264
265 def __len__(self):
265 def __len__(self):
266 return len(self._sequences)
266 return len(self._sequences)
267
267
268 def __nonzero__(self):
268 def __nonzero__(self):
269 return bool(self._sequences)
269 return bool(self._sequences)
270
270
271 class bundleoperation(object):
271 class bundleoperation(object):
272 """an object that represents a single bundling process
272 """an object that represents a single bundling process
273
273
274 Its purpose is to carry unbundle-related objects and states.
274 Its purpose is to carry unbundle-related objects and states.
275
275
276 A new object should be created at the beginning of each bundle processing.
276 A new object should be created at the beginning of each bundle processing.
277 The object is to be returned by the processing function.
277 The object is to be returned by the processing function.
278
278
279 The object has very little content now it will ultimately contain:
279 The object has very little content now it will ultimately contain:
280 * an access to the repo the bundle is applied to,
280 * an access to the repo the bundle is applied to,
281 * a ui object,
281 * a ui object,
282 * a way to retrieve a transaction to add changes to the repo,
282 * a way to retrieve a transaction to add changes to the repo,
283 * a way to record the result of processing each part,
283 * a way to record the result of processing each part,
284 * a way to construct a bundle response when applicable.
284 * a way to construct a bundle response when applicable.
285 """
285 """
286
286
287 def __init__(self, repo, transactiongetter, captureoutput=True):
287 def __init__(self, repo, transactiongetter, captureoutput=True):
288 self.repo = repo
288 self.repo = repo
289 self.ui = repo.ui
289 self.ui = repo.ui
290 self.records = unbundlerecords()
290 self.records = unbundlerecords()
291 self.gettransaction = transactiongetter
291 self.gettransaction = transactiongetter
292 self.reply = None
292 self.reply = None
293 self.captureoutput = captureoutput
293 self.captureoutput = captureoutput
294
294
295 class TransactionUnavailable(RuntimeError):
295 class TransactionUnavailable(RuntimeError):
296 pass
296 pass
297
297
298 def _notransaction():
298 def _notransaction():
299 """default method to get a transaction while processing a bundle
299 """default method to get a transaction while processing a bundle
300
300
301 Raise an exception to highlight the fact that no transaction was expected
301 Raise an exception to highlight the fact that no transaction was expected
302 to be created"""
302 to be created"""
303 raise TransactionUnavailable()
303 raise TransactionUnavailable()
304
304
305 def applybundle(repo, unbundler, tr, op=None):
306 # transform me into unbundler.apply() as soon as the freeze is lifted
307 return processbundle(repo, unbundler, lambda: tr, op=op)
308
305 def processbundle(repo, unbundler, transactiongetter=None, op=None):
309 def processbundle(repo, unbundler, transactiongetter=None, op=None):
306 """This function process a bundle, apply effect to/from a repo
310 """This function process a bundle, apply effect to/from a repo
307
311
308 It iterates over each part then searches for and uses the proper handling
312 It iterates over each part then searches for and uses the proper handling
309 code to process the part. Parts are processed in order.
313 code to process the part. Parts are processed in order.
310
314
311 This is very early version of this function that will be strongly reworked
315 This is very early version of this function that will be strongly reworked
312 before final usage.
316 before final usage.
313
317
314 Unknown Mandatory part will abort the process.
318 Unknown Mandatory part will abort the process.
315
319
316 It is temporarily possible to provide a prebuilt bundleoperation to the
320 It is temporarily possible to provide a prebuilt bundleoperation to the
317 function. This is used to ensure output is properly propagated in case of
321 function. This is used to ensure output is properly propagated in case of
318 an error during the unbundling. This output capturing part will likely be
322 an error during the unbundling. This output capturing part will likely be
319 reworked and this ability will probably go away in the process.
323 reworked and this ability will probably go away in the process.
320 """
324 """
321 if op is None:
325 if op is None:
322 if transactiongetter is None:
326 if transactiongetter is None:
323 transactiongetter = _notransaction
327 transactiongetter = _notransaction
324 op = bundleoperation(repo, transactiongetter)
328 op = bundleoperation(repo, transactiongetter)
325 # todo:
329 # todo:
326 # - replace this is a init function soon.
330 # - replace this is a init function soon.
327 # - exception catching
331 # - exception catching
328 unbundler.params
332 unbundler.params
329 if repo.ui.debugflag:
333 if repo.ui.debugflag:
330 msg = ['bundle2-input-bundle:']
334 msg = ['bundle2-input-bundle:']
331 if unbundler.params:
335 if unbundler.params:
332 msg.append(' %i params')
336 msg.append(' %i params')
333 if op.gettransaction is None:
337 if op.gettransaction is None:
334 msg.append(' no-transaction')
338 msg.append(' no-transaction')
335 else:
339 else:
336 msg.append(' with-transaction')
340 msg.append(' with-transaction')
337 msg.append('\n')
341 msg.append('\n')
338 repo.ui.debug(''.join(msg))
342 repo.ui.debug(''.join(msg))
339 iterparts = enumerate(unbundler.iterparts())
343 iterparts = enumerate(unbundler.iterparts())
340 part = None
344 part = None
341 nbpart = 0
345 nbpart = 0
342 try:
346 try:
343 for nbpart, part in iterparts:
347 for nbpart, part in iterparts:
344 _processpart(op, part)
348 _processpart(op, part)
345 except BaseException as exc:
349 except BaseException as exc:
346 for nbpart, part in iterparts:
350 for nbpart, part in iterparts:
347 # consume the bundle content
351 # consume the bundle content
348 part.seek(0, 2)
352 part.seek(0, 2)
349 # Small hack to let caller code distinguish exceptions from bundle2
353 # Small hack to let caller code distinguish exceptions from bundle2
350 # processing from processing the old format. This is mostly
354 # processing from processing the old format. This is mostly
351 # needed to handle different return codes to unbundle according to the
355 # needed to handle different return codes to unbundle according to the
352 # type of bundle. We should probably clean up or drop this return code
356 # type of bundle. We should probably clean up or drop this return code
353 # craziness in a future version.
357 # craziness in a future version.
354 exc.duringunbundle2 = True
358 exc.duringunbundle2 = True
355 salvaged = []
359 salvaged = []
356 replycaps = None
360 replycaps = None
357 if op.reply is not None:
361 if op.reply is not None:
358 salvaged = op.reply.salvageoutput()
362 salvaged = op.reply.salvageoutput()
359 replycaps = op.reply.capabilities
363 replycaps = op.reply.capabilities
360 exc._replycaps = replycaps
364 exc._replycaps = replycaps
361 exc._bundle2salvagedoutput = salvaged
365 exc._bundle2salvagedoutput = salvaged
362 raise
366 raise
363 finally:
367 finally:
364 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
368 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
365
369
366 return op
370 return op
367
371
368 def _processpart(op, part):
372 def _processpart(op, part):
369 """process a single part from a bundle
373 """process a single part from a bundle
370
374
371 The part is guaranteed to have been fully consumed when the function exits
375 The part is guaranteed to have been fully consumed when the function exits
372 (even if an exception is raised)."""
376 (even if an exception is raised)."""
373 status = 'unknown' # used by debug output
377 status = 'unknown' # used by debug output
374 try:
378 try:
375 try:
379 try:
376 handler = parthandlermapping.get(part.type)
380 handler = parthandlermapping.get(part.type)
377 if handler is None:
381 if handler is None:
378 status = 'unsupported-type'
382 status = 'unsupported-type'
379 raise error.BundleUnknownFeatureError(parttype=part.type)
383 raise error.BundleUnknownFeatureError(parttype=part.type)
380 indebug(op.ui, 'found a handler for part %r' % part.type)
384 indebug(op.ui, 'found a handler for part %r' % part.type)
381 unknownparams = part.mandatorykeys - handler.params
385 unknownparams = part.mandatorykeys - handler.params
382 if unknownparams:
386 if unknownparams:
383 unknownparams = list(unknownparams)
387 unknownparams = list(unknownparams)
384 unknownparams.sort()
388 unknownparams.sort()
385 status = 'unsupported-params (%s)' % unknownparams
389 status = 'unsupported-params (%s)' % unknownparams
386 raise error.BundleUnknownFeatureError(parttype=part.type,
390 raise error.BundleUnknownFeatureError(parttype=part.type,
387 params=unknownparams)
391 params=unknownparams)
388 status = 'supported'
392 status = 'supported'
389 except error.BundleUnknownFeatureError as exc:
393 except error.BundleUnknownFeatureError as exc:
390 if part.mandatory: # mandatory parts
394 if part.mandatory: # mandatory parts
391 raise
395 raise
392 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
396 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
393 return # skip to part processing
397 return # skip to part processing
394 finally:
398 finally:
395 if op.ui.debugflag:
399 if op.ui.debugflag:
396 msg = ['bundle2-input-part: "%s"' % part.type]
400 msg = ['bundle2-input-part: "%s"' % part.type]
397 if not part.mandatory:
401 if not part.mandatory:
398 msg.append(' (advisory)')
402 msg.append(' (advisory)')
399 nbmp = len(part.mandatorykeys)
403 nbmp = len(part.mandatorykeys)
400 nbap = len(part.params) - nbmp
404 nbap = len(part.params) - nbmp
401 if nbmp or nbap:
405 if nbmp or nbap:
402 msg.append(' (params:')
406 msg.append(' (params:')
403 if nbmp:
407 if nbmp:
404 msg.append(' %i mandatory' % nbmp)
408 msg.append(' %i mandatory' % nbmp)
405 if nbap:
409 if nbap:
406 msg.append(' %i advisory' % nbmp)
410 msg.append(' %i advisory' % nbmp)
407 msg.append(')')
411 msg.append(')')
408 msg.append(' %s\n' % status)
412 msg.append(' %s\n' % status)
409 op.ui.debug(''.join(msg))
413 op.ui.debug(''.join(msg))
410
414
411 # handler is called outside the above try block so that we don't
415 # handler is called outside the above try block so that we don't
412 # risk catching KeyErrors from anything other than the
416 # risk catching KeyErrors from anything other than the
413 # parthandlermapping lookup (any KeyError raised by handler()
417 # parthandlermapping lookup (any KeyError raised by handler()
414 # itself represents a defect of a different variety).
418 # itself represents a defect of a different variety).
415 output = None
419 output = None
416 if op.captureoutput and op.reply is not None:
420 if op.captureoutput and op.reply is not None:
417 op.ui.pushbuffer(error=True, subproc=True)
421 op.ui.pushbuffer(error=True, subproc=True)
418 output = ''
422 output = ''
419 try:
423 try:
420 handler(op, part)
424 handler(op, part)
421 finally:
425 finally:
422 if output is not None:
426 if output is not None:
423 output = op.ui.popbuffer()
427 output = op.ui.popbuffer()
424 if output:
428 if output:
425 outpart = op.reply.newpart('output', data=output,
429 outpart = op.reply.newpart('output', data=output,
426 mandatory=False)
430 mandatory=False)
427 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
431 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
428 finally:
432 finally:
429 # consume the part content to not corrupt the stream.
433 # consume the part content to not corrupt the stream.
430 part.seek(0, 2)
434 part.seek(0, 2)
431
435
432
436
433 def decodecaps(blob):
437 def decodecaps(blob):
434 """decode a bundle2 caps bytes blob into a dictionary
438 """decode a bundle2 caps bytes blob into a dictionary
435
439
436 The blob is a list of capabilities (one per line)
440 The blob is a list of capabilities (one per line)
437 Capabilities may have values using a line of the form::
441 Capabilities may have values using a line of the form::
438
442
439 capability=value1,value2,value3
443 capability=value1,value2,value3
440
444
441 The values are always a list."""
445 The values are always a list."""
442 caps = {}
446 caps = {}
443 for line in blob.splitlines():
447 for line in blob.splitlines():
444 if not line:
448 if not line:
445 continue
449 continue
446 if '=' not in line:
450 if '=' not in line:
447 key, vals = line, ()
451 key, vals = line, ()
448 else:
452 else:
449 key, vals = line.split('=', 1)
453 key, vals = line.split('=', 1)
450 vals = vals.split(',')
454 vals = vals.split(',')
451 key = urllib.unquote(key)
455 key = urllib.unquote(key)
452 vals = [urllib.unquote(v) for v in vals]
456 vals = [urllib.unquote(v) for v in vals]
453 caps[key] = vals
457 caps[key] = vals
454 return caps
458 return caps
455
459
456 def encodecaps(caps):
460 def encodecaps(caps):
457 """encode a bundle2 caps dictionary into a bytes blob"""
461 """encode a bundle2 caps dictionary into a bytes blob"""
458 chunks = []
462 chunks = []
459 for ca in sorted(caps):
463 for ca in sorted(caps):
460 vals = caps[ca]
464 vals = caps[ca]
461 ca = urllib.quote(ca)
465 ca = urllib.quote(ca)
462 vals = [urllib.quote(v) for v in vals]
466 vals = [urllib.quote(v) for v in vals]
463 if vals:
467 if vals:
464 ca = "%s=%s" % (ca, ','.join(vals))
468 ca = "%s=%s" % (ca, ','.join(vals))
465 chunks.append(ca)
469 chunks.append(ca)
466 return '\n'.join(chunks)
470 return '\n'.join(chunks)
467
471
468 class bundle20(object):
472 class bundle20(object):
469 """represent an outgoing bundle2 container
473 """represent an outgoing bundle2 container
470
474
471 Use the `addparam` method to add stream level parameter. and `newpart` to
475 Use the `addparam` method to add stream level parameter. and `newpart` to
472 populate it. Then call `getchunks` to retrieve all the binary chunks of
476 populate it. Then call `getchunks` to retrieve all the binary chunks of
473 data that compose the bundle2 container."""
477 data that compose the bundle2 container."""
474
478
475 _magicstring = 'HG20'
479 _magicstring = 'HG20'
476
480
477 def __init__(self, ui, capabilities=()):
481 def __init__(self, ui, capabilities=()):
478 self.ui = ui
482 self.ui = ui
479 self._params = []
483 self._params = []
480 self._parts = []
484 self._parts = []
481 self.capabilities = dict(capabilities)
485 self.capabilities = dict(capabilities)
482 self._compressor = util.compressors[None]()
486 self._compressor = util.compressors[None]()
483
487
484 def setcompression(self, alg):
488 def setcompression(self, alg):
485 """setup core part compression to <alg>"""
489 """setup core part compression to <alg>"""
486 if alg is None:
490 if alg is None:
487 return
491 return
488 assert not any(n.lower() == 'Compression' for n, v in self._params)
492 assert not any(n.lower() == 'Compression' for n, v in self._params)
489 self.addparam('Compression', alg)
493 self.addparam('Compression', alg)
490 self._compressor = util.compressors[alg]()
494 self._compressor = util.compressors[alg]()
491
495
492 @property
496 @property
493 def nbparts(self):
497 def nbparts(self):
494 """total number of parts added to the bundler"""
498 """total number of parts added to the bundler"""
495 return len(self._parts)
499 return len(self._parts)
496
500
497 # methods used to defines the bundle2 content
501 # methods used to defines the bundle2 content
498 def addparam(self, name, value=None):
502 def addparam(self, name, value=None):
499 """add a stream level parameter"""
503 """add a stream level parameter"""
500 if not name:
504 if not name:
501 raise ValueError('empty parameter name')
505 raise ValueError('empty parameter name')
502 if name[0] not in string.letters:
506 if name[0] not in string.letters:
503 raise ValueError('non letter first character: %r' % name)
507 raise ValueError('non letter first character: %r' % name)
504 self._params.append((name, value))
508 self._params.append((name, value))
505
509
506 def addpart(self, part):
510 def addpart(self, part):
507 """add a new part to the bundle2 container
511 """add a new part to the bundle2 container
508
512
509 Parts contains the actual applicative payload."""
513 Parts contains the actual applicative payload."""
510 assert part.id is None
514 assert part.id is None
511 part.id = len(self._parts) # very cheap counter
515 part.id = len(self._parts) # very cheap counter
512 self._parts.append(part)
516 self._parts.append(part)
513
517
514 def newpart(self, typeid, *args, **kwargs):
518 def newpart(self, typeid, *args, **kwargs):
515 """create a new part and add it to the containers
519 """create a new part and add it to the containers
516
520
517 As the part is directly added to the containers. For now, this means
521 As the part is directly added to the containers. For now, this means
518 that any failure to properly initialize the part after calling
522 that any failure to properly initialize the part after calling
519 ``newpart`` should result in a failure of the whole bundling process.
523 ``newpart`` should result in a failure of the whole bundling process.
520
524
521 You can still fall back to manually create and add if you need better
525 You can still fall back to manually create and add if you need better
522 control."""
526 control."""
523 part = bundlepart(typeid, *args, **kwargs)
527 part = bundlepart(typeid, *args, **kwargs)
524 self.addpart(part)
528 self.addpart(part)
525 return part
529 return part
526
530
527 # methods used to generate the bundle2 stream
531 # methods used to generate the bundle2 stream
528 def getchunks(self):
532 def getchunks(self):
529 if self.ui.debugflag:
533 if self.ui.debugflag:
530 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
534 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
531 if self._params:
535 if self._params:
532 msg.append(' (%i params)' % len(self._params))
536 msg.append(' (%i params)' % len(self._params))
533 msg.append(' %i parts total\n' % len(self._parts))
537 msg.append(' %i parts total\n' % len(self._parts))
534 self.ui.debug(''.join(msg))
538 self.ui.debug(''.join(msg))
535 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
539 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
536 yield self._magicstring
540 yield self._magicstring
537 param = self._paramchunk()
541 param = self._paramchunk()
538 outdebug(self.ui, 'bundle parameter: %s' % param)
542 outdebug(self.ui, 'bundle parameter: %s' % param)
539 yield _pack(_fstreamparamsize, len(param))
543 yield _pack(_fstreamparamsize, len(param))
540 if param:
544 if param:
541 yield param
545 yield param
542 # starting compression
546 # starting compression
543 for chunk in self._getcorechunk():
547 for chunk in self._getcorechunk():
544 yield self._compressor.compress(chunk)
548 yield self._compressor.compress(chunk)
545 yield self._compressor.flush()
549 yield self._compressor.flush()
546
550
547 def _paramchunk(self):
551 def _paramchunk(self):
548 """return a encoded version of all stream parameters"""
552 """return a encoded version of all stream parameters"""
549 blocks = []
553 blocks = []
550 for par, value in self._params:
554 for par, value in self._params:
551 par = urllib.quote(par)
555 par = urllib.quote(par)
552 if value is not None:
556 if value is not None:
553 value = urllib.quote(value)
557 value = urllib.quote(value)
554 par = '%s=%s' % (par, value)
558 par = '%s=%s' % (par, value)
555 blocks.append(par)
559 blocks.append(par)
556 return ' '.join(blocks)
560 return ' '.join(blocks)
557
561
558 def _getcorechunk(self):
562 def _getcorechunk(self):
559 """yield chunk for the core part of the bundle
563 """yield chunk for the core part of the bundle
560
564
561 (all but headers and parameters)"""
565 (all but headers and parameters)"""
562 outdebug(self.ui, 'start of parts')
566 outdebug(self.ui, 'start of parts')
563 for part in self._parts:
567 for part in self._parts:
564 outdebug(self.ui, 'bundle part: "%s"' % part.type)
568 outdebug(self.ui, 'bundle part: "%s"' % part.type)
565 for chunk in part.getchunks(ui=self.ui):
569 for chunk in part.getchunks(ui=self.ui):
566 yield chunk
570 yield chunk
567 outdebug(self.ui, 'end of bundle')
571 outdebug(self.ui, 'end of bundle')
568 yield _pack(_fpartheadersize, 0)
572 yield _pack(_fpartheadersize, 0)
569
573
570
574
571 def salvageoutput(self):
575 def salvageoutput(self):
572 """return a list with a copy of all output parts in the bundle
576 """return a list with a copy of all output parts in the bundle
573
577
574 This is meant to be used during error handling to make sure we preserve
578 This is meant to be used during error handling to make sure we preserve
575 server output"""
579 server output"""
576 salvaged = []
580 salvaged = []
577 for part in self._parts:
581 for part in self._parts:
578 if part.type.startswith('output'):
582 if part.type.startswith('output'):
579 salvaged.append(part.copy())
583 salvaged.append(part.copy())
580 return salvaged
584 return salvaged
581
585
582
586
583 class unpackermixin(object):
587 class unpackermixin(object):
584 """A mixin to extract bytes and struct data from a stream"""
588 """A mixin to extract bytes and struct data from a stream"""
585
589
586 def __init__(self, fp):
590 def __init__(self, fp):
587 self._fp = fp
591 self._fp = fp
588 self._seekable = (util.safehasattr(fp, 'seek') and
592 self._seekable = (util.safehasattr(fp, 'seek') and
589 util.safehasattr(fp, 'tell'))
593 util.safehasattr(fp, 'tell'))
590
594
591 def _unpack(self, format):
595 def _unpack(self, format):
592 """unpack this struct format from the stream"""
596 """unpack this struct format from the stream"""
593 data = self._readexact(struct.calcsize(format))
597 data = self._readexact(struct.calcsize(format))
594 return _unpack(format, data)
598 return _unpack(format, data)
595
599
596 def _readexact(self, size):
600 def _readexact(self, size):
597 """read exactly <size> bytes from the stream"""
601 """read exactly <size> bytes from the stream"""
598 return changegroup.readexactly(self._fp, size)
602 return changegroup.readexactly(self._fp, size)
599
603
600 def seek(self, offset, whence=0):
604 def seek(self, offset, whence=0):
601 """move the underlying file pointer"""
605 """move the underlying file pointer"""
602 if self._seekable:
606 if self._seekable:
603 return self._fp.seek(offset, whence)
607 return self._fp.seek(offset, whence)
604 else:
608 else:
605 raise NotImplementedError(_('File pointer is not seekable'))
609 raise NotImplementedError(_('File pointer is not seekable'))
606
610
607 def tell(self):
611 def tell(self):
608 """return the file offset, or None if file is not seekable"""
612 """return the file offset, or None if file is not seekable"""
609 if self._seekable:
613 if self._seekable:
610 try:
614 try:
611 return self._fp.tell()
615 return self._fp.tell()
612 except IOError as e:
616 except IOError as e:
613 if e.errno == errno.ESPIPE:
617 if e.errno == errno.ESPIPE:
614 self._seekable = False
618 self._seekable = False
615 else:
619 else:
616 raise
620 raise
617 return None
621 return None
618
622
619 def close(self):
623 def close(self):
620 """close underlying file"""
624 """close underlying file"""
621 if util.safehasattr(self._fp, 'close'):
625 if util.safehasattr(self._fp, 'close'):
622 return self._fp.close()
626 return self._fp.close()
623
627
624 def getunbundler(ui, fp, magicstring=None):
628 def getunbundler(ui, fp, magicstring=None):
625 """return a valid unbundler object for a given magicstring"""
629 """return a valid unbundler object for a given magicstring"""
626 if magicstring is None:
630 if magicstring is None:
627 magicstring = changegroup.readexactly(fp, 4)
631 magicstring = changegroup.readexactly(fp, 4)
628 magic, version = magicstring[0:2], magicstring[2:4]
632 magic, version = magicstring[0:2], magicstring[2:4]
629 if magic != 'HG':
633 if magic != 'HG':
630 raise error.Abort(_('not a Mercurial bundle'))
634 raise error.Abort(_('not a Mercurial bundle'))
631 unbundlerclass = formatmap.get(version)
635 unbundlerclass = formatmap.get(version)
632 if unbundlerclass is None:
636 if unbundlerclass is None:
633 raise error.Abort(_('unknown bundle version %s') % version)
637 raise error.Abort(_('unknown bundle version %s') % version)
634 unbundler = unbundlerclass(ui, fp)
638 unbundler = unbundlerclass(ui, fp)
635 indebug(ui, 'start processing of %s stream' % magicstring)
639 indebug(ui, 'start processing of %s stream' % magicstring)
636 return unbundler
640 return unbundler
637
641
638 class unbundle20(unpackermixin):
642 class unbundle20(unpackermixin):
639 """interpret a bundle2 stream
643 """interpret a bundle2 stream
640
644
641 This class is fed with a binary stream and yields parts through its
645 This class is fed with a binary stream and yields parts through its
642 `iterparts` methods."""
646 `iterparts` methods."""
643
647
644 _magicstring = 'HG20'
648 _magicstring = 'HG20'
645
649
646 def __init__(self, ui, fp):
650 def __init__(self, ui, fp):
647 """If header is specified, we do not read it out of the stream."""
651 """If header is specified, we do not read it out of the stream."""
648 self.ui = ui
652 self.ui = ui
649 self._decompressor = util.decompressors[None]
653 self._decompressor = util.decompressors[None]
650 super(unbundle20, self).__init__(fp)
654 super(unbundle20, self).__init__(fp)
651
655
652 @util.propertycache
656 @util.propertycache
653 def params(self):
657 def params(self):
654 """dictionary of stream level parameters"""
658 """dictionary of stream level parameters"""
655 indebug(self.ui, 'reading bundle2 stream parameters')
659 indebug(self.ui, 'reading bundle2 stream parameters')
656 params = {}
660 params = {}
657 paramssize = self._unpack(_fstreamparamsize)[0]
661 paramssize = self._unpack(_fstreamparamsize)[0]
658 if paramssize < 0:
662 if paramssize < 0:
659 raise error.BundleValueError('negative bundle param size: %i'
663 raise error.BundleValueError('negative bundle param size: %i'
660 % paramssize)
664 % paramssize)
661 if paramssize:
665 if paramssize:
662 params = self._readexact(paramssize)
666 params = self._readexact(paramssize)
663 params = self._processallparams(params)
667 params = self._processallparams(params)
664 return params
668 return params
665
669
666 def _processallparams(self, paramsblock):
670 def _processallparams(self, paramsblock):
667 """"""
671 """"""
668 params = {}
672 params = {}
669 for p in paramsblock.split(' '):
673 for p in paramsblock.split(' '):
670 p = p.split('=', 1)
674 p = p.split('=', 1)
671 p = [urllib.unquote(i) for i in p]
675 p = [urllib.unquote(i) for i in p]
672 if len(p) < 2:
676 if len(p) < 2:
673 p.append(None)
677 p.append(None)
674 self._processparam(*p)
678 self._processparam(*p)
675 params[p[0]] = p[1]
679 params[p[0]] = p[1]
676 return params
680 return params
677
681
678
682
679 def _processparam(self, name, value):
683 def _processparam(self, name, value):
680 """process a parameter, applying its effect if needed
684 """process a parameter, applying its effect if needed
681
685
682 Parameter starting with a lower case letter are advisory and will be
686 Parameter starting with a lower case letter are advisory and will be
683 ignored when unknown. Those starting with an upper case letter are
687 ignored when unknown. Those starting with an upper case letter are
684 mandatory and will this function will raise a KeyError when unknown.
688 mandatory and will this function will raise a KeyError when unknown.
685
689
686 Note: no option are currently supported. Any input will be either
690 Note: no option are currently supported. Any input will be either
687 ignored or failing.
691 ignored or failing.
688 """
692 """
689 if not name:
693 if not name:
690 raise ValueError('empty parameter name')
694 raise ValueError('empty parameter name')
691 if name[0] not in string.letters:
695 if name[0] not in string.letters:
692 raise ValueError('non letter first character: %r' % name)
696 raise ValueError('non letter first character: %r' % name)
693 try:
697 try:
694 handler = b2streamparamsmap[name.lower()]
698 handler = b2streamparamsmap[name.lower()]
695 except KeyError:
699 except KeyError:
696 if name[0].islower():
700 if name[0].islower():
697 indebug(self.ui, "ignoring unknown parameter %r" % name)
701 indebug(self.ui, "ignoring unknown parameter %r" % name)
698 else:
702 else:
699 raise error.BundleUnknownFeatureError(params=(name,))
703 raise error.BundleUnknownFeatureError(params=(name,))
700 else:
704 else:
701 handler(self, name, value)
705 handler(self, name, value)
702
706
703 def _forwardchunks(self):
707 def _forwardchunks(self):
704 """utility to transfer a bundle2 as binary
708 """utility to transfer a bundle2 as binary
705
709
706 This is made necessary by the fact the 'getbundle' command over 'ssh'
710 This is made necessary by the fact the 'getbundle' command over 'ssh'
707 have no way to know then the reply end, relying on the bundle to be
711 have no way to know then the reply end, relying on the bundle to be
708 interpreted to know its end. This is terrible and we are sorry, but we
712 interpreted to know its end. This is terrible and we are sorry, but we
709 needed to move forward to get general delta enabled.
713 needed to move forward to get general delta enabled.
710 """
714 """
711 yield self._magicstring
715 yield self._magicstring
712 assert 'params' not in vars(self)
716 assert 'params' not in vars(self)
713 paramssize = self._unpack(_fstreamparamsize)[0]
717 paramssize = self._unpack(_fstreamparamsize)[0]
714 if paramssize < 0:
718 if paramssize < 0:
715 raise error.BundleValueError('negative bundle param size: %i'
719 raise error.BundleValueError('negative bundle param size: %i'
716 % paramssize)
720 % paramssize)
717 yield _pack(_fstreamparamsize, paramssize)
721 yield _pack(_fstreamparamsize, paramssize)
718 if paramssize:
722 if paramssize:
719 params = self._readexact(paramssize)
723 params = self._readexact(paramssize)
720 self._processallparams(params)
724 self._processallparams(params)
721 yield params
725 yield params
722 assert self._decompressor is util.decompressors[None]
726 assert self._decompressor is util.decompressors[None]
723 # From there, payload might need to be decompressed
727 # From there, payload might need to be decompressed
724 self._fp = self._decompressor(self._fp)
728 self._fp = self._decompressor(self._fp)
725 emptycount = 0
729 emptycount = 0
726 while emptycount < 2:
730 while emptycount < 2:
727 # so we can brainlessly loop
731 # so we can brainlessly loop
728 assert _fpartheadersize == _fpayloadsize
732 assert _fpartheadersize == _fpayloadsize
729 size = self._unpack(_fpartheadersize)[0]
733 size = self._unpack(_fpartheadersize)[0]
730 yield _pack(_fpartheadersize, size)
734 yield _pack(_fpartheadersize, size)
731 if size:
735 if size:
732 emptycount = 0
736 emptycount = 0
733 else:
737 else:
734 emptycount += 1
738 emptycount += 1
735 continue
739 continue
736 if size == flaginterrupt:
740 if size == flaginterrupt:
737 continue
741 continue
738 elif size < 0:
742 elif size < 0:
739 raise error.BundleValueError('negative chunk size: %i')
743 raise error.BundleValueError('negative chunk size: %i')
740 yield self._readexact(size)
744 yield self._readexact(size)
741
745
742
746
743 def iterparts(self):
747 def iterparts(self):
744 """yield all parts contained in the stream"""
748 """yield all parts contained in the stream"""
745 # make sure param have been loaded
749 # make sure param have been loaded
746 self.params
750 self.params
747 # From there, payload need to be decompressed
751 # From there, payload need to be decompressed
748 self._fp = self._decompressor(self._fp)
752 self._fp = self._decompressor(self._fp)
749 indebug(self.ui, 'start extraction of bundle2 parts')
753 indebug(self.ui, 'start extraction of bundle2 parts')
750 headerblock = self._readpartheader()
754 headerblock = self._readpartheader()
751 while headerblock is not None:
755 while headerblock is not None:
752 part = unbundlepart(self.ui, headerblock, self._fp)
756 part = unbundlepart(self.ui, headerblock, self._fp)
753 yield part
757 yield part
754 part.seek(0, 2)
758 part.seek(0, 2)
755 headerblock = self._readpartheader()
759 headerblock = self._readpartheader()
756 indebug(self.ui, 'end of bundle2 stream')
760 indebug(self.ui, 'end of bundle2 stream')
757
761
758 def _readpartheader(self):
762 def _readpartheader(self):
759 """reads a part header size and return the bytes blob
763 """reads a part header size and return the bytes blob
760
764
761 returns None if empty"""
765 returns None if empty"""
762 headersize = self._unpack(_fpartheadersize)[0]
766 headersize = self._unpack(_fpartheadersize)[0]
763 if headersize < 0:
767 if headersize < 0:
764 raise error.BundleValueError('negative part header size: %i'
768 raise error.BundleValueError('negative part header size: %i'
765 % headersize)
769 % headersize)
766 indebug(self.ui, 'part header size: %i' % headersize)
770 indebug(self.ui, 'part header size: %i' % headersize)
767 if headersize:
771 if headersize:
768 return self._readexact(headersize)
772 return self._readexact(headersize)
769 return None
773 return None
770
774
771 def compressed(self):
775 def compressed(self):
772 return False
776 return False
773
777
774 formatmap = {'20': unbundle20}
778 formatmap = {'20': unbundle20}
775
779
776 b2streamparamsmap = {}
780 b2streamparamsmap = {}
777
781
778 def b2streamparamhandler(name):
782 def b2streamparamhandler(name):
779 """register a handler for a stream level parameter"""
783 """register a handler for a stream level parameter"""
780 def decorator(func):
784 def decorator(func):
781 assert name not in formatmap
785 assert name not in formatmap
782 b2streamparamsmap[name] = func
786 b2streamparamsmap[name] = func
783 return func
787 return func
784 return decorator
788 return decorator
785
789
786 @b2streamparamhandler('compression')
790 @b2streamparamhandler('compression')
787 def processcompression(unbundler, param, value):
791 def processcompression(unbundler, param, value):
788 """read compression parameter and install payload decompression"""
792 """read compression parameter and install payload decompression"""
789 if value not in util.decompressors:
793 if value not in util.decompressors:
790 raise error.BundleUnknownFeatureError(params=(param,),
794 raise error.BundleUnknownFeatureError(params=(param,),
791 values=(value,))
795 values=(value,))
792 unbundler._decompressor = util.decompressors[value]
796 unbundler._decompressor = util.decompressors[value]
793
797
794 class bundlepart(object):
798 class bundlepart(object):
795 """A bundle2 part contains application level payload
799 """A bundle2 part contains application level payload
796
800
797 The part `type` is used to route the part to the application level
801 The part `type` is used to route the part to the application level
798 handler.
802 handler.
799
803
800 The part payload is contained in ``part.data``. It could be raw bytes or a
804 The part payload is contained in ``part.data``. It could be raw bytes or a
801 generator of byte chunks.
805 generator of byte chunks.
802
806
803 You can add parameters to the part using the ``addparam`` method.
807 You can add parameters to the part using the ``addparam`` method.
804 Parameters can be either mandatory (default) or advisory. Remote side
808 Parameters can be either mandatory (default) or advisory. Remote side
805 should be able to safely ignore the advisory ones.
809 should be able to safely ignore the advisory ones.
806
810
807 Both data and parameters cannot be modified after the generation has begun.
811 Both data and parameters cannot be modified after the generation has begun.
808 """
812 """
809
813
810 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
814 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
811 data='', mandatory=True):
815 data='', mandatory=True):
812 validateparttype(parttype)
816 validateparttype(parttype)
813 self.id = None
817 self.id = None
814 self.type = parttype
818 self.type = parttype
815 self._data = data
819 self._data = data
816 self._mandatoryparams = list(mandatoryparams)
820 self._mandatoryparams = list(mandatoryparams)
817 self._advisoryparams = list(advisoryparams)
821 self._advisoryparams = list(advisoryparams)
818 # checking for duplicated entries
822 # checking for duplicated entries
819 self._seenparams = set()
823 self._seenparams = set()
820 for pname, __ in self._mandatoryparams + self._advisoryparams:
824 for pname, __ in self._mandatoryparams + self._advisoryparams:
821 if pname in self._seenparams:
825 if pname in self._seenparams:
822 raise RuntimeError('duplicated params: %s' % pname)
826 raise RuntimeError('duplicated params: %s' % pname)
823 self._seenparams.add(pname)
827 self._seenparams.add(pname)
824 # status of the part's generation:
828 # status of the part's generation:
825 # - None: not started,
829 # - None: not started,
826 # - False: currently generated,
830 # - False: currently generated,
827 # - True: generation done.
831 # - True: generation done.
828 self._generated = None
832 self._generated = None
829 self.mandatory = mandatory
833 self.mandatory = mandatory
830
834
831 def copy(self):
835 def copy(self):
832 """return a copy of the part
836 """return a copy of the part
833
837
834 The new part have the very same content but no partid assigned yet.
838 The new part have the very same content but no partid assigned yet.
835 Parts with generated data cannot be copied."""
839 Parts with generated data cannot be copied."""
836 assert not util.safehasattr(self.data, 'next')
840 assert not util.safehasattr(self.data, 'next')
837 return self.__class__(self.type, self._mandatoryparams,
841 return self.__class__(self.type, self._mandatoryparams,
838 self._advisoryparams, self._data, self.mandatory)
842 self._advisoryparams, self._data, self.mandatory)
839
843
840 # methods used to defines the part content
844 # methods used to defines the part content
841 def __setdata(self, data):
845 def __setdata(self, data):
842 if self._generated is not None:
846 if self._generated is not None:
843 raise error.ReadOnlyPartError('part is being generated')
847 raise error.ReadOnlyPartError('part is being generated')
844 self._data = data
848 self._data = data
845 def __getdata(self):
849 def __getdata(self):
846 return self._data
850 return self._data
847 data = property(__getdata, __setdata)
851 data = property(__getdata, __setdata)
848
852
849 @property
853 @property
850 def mandatoryparams(self):
854 def mandatoryparams(self):
851 # make it an immutable tuple to force people through ``addparam``
855 # make it an immutable tuple to force people through ``addparam``
852 return tuple(self._mandatoryparams)
856 return tuple(self._mandatoryparams)
853
857
854 @property
858 @property
855 def advisoryparams(self):
859 def advisoryparams(self):
856 # make it an immutable tuple to force people through ``addparam``
860 # make it an immutable tuple to force people through ``addparam``
857 return tuple(self._advisoryparams)
861 return tuple(self._advisoryparams)
858
862
859 def addparam(self, name, value='', mandatory=True):
863 def addparam(self, name, value='', mandatory=True):
860 if self._generated is not None:
864 if self._generated is not None:
861 raise error.ReadOnlyPartError('part is being generated')
865 raise error.ReadOnlyPartError('part is being generated')
862 if name in self._seenparams:
866 if name in self._seenparams:
863 raise ValueError('duplicated params: %s' % name)
867 raise ValueError('duplicated params: %s' % name)
864 self._seenparams.add(name)
868 self._seenparams.add(name)
865 params = self._advisoryparams
869 params = self._advisoryparams
866 if mandatory:
870 if mandatory:
867 params = self._mandatoryparams
871 params = self._mandatoryparams
868 params.append((name, value))
872 params.append((name, value))
869
873
870 # methods used to generates the bundle2 stream
874 # methods used to generates the bundle2 stream
871 def getchunks(self, ui):
875 def getchunks(self, ui):
872 if self._generated is not None:
876 if self._generated is not None:
873 raise RuntimeError('part can only be consumed once')
877 raise RuntimeError('part can only be consumed once')
874 self._generated = False
878 self._generated = False
875
879
876 if ui.debugflag:
880 if ui.debugflag:
877 msg = ['bundle2-output-part: "%s"' % self.type]
881 msg = ['bundle2-output-part: "%s"' % self.type]
878 if not self.mandatory:
882 if not self.mandatory:
879 msg.append(' (advisory)')
883 msg.append(' (advisory)')
880 nbmp = len(self.mandatoryparams)
884 nbmp = len(self.mandatoryparams)
881 nbap = len(self.advisoryparams)
885 nbap = len(self.advisoryparams)
882 if nbmp or nbap:
886 if nbmp or nbap:
883 msg.append(' (params:')
887 msg.append(' (params:')
884 if nbmp:
888 if nbmp:
885 msg.append(' %i mandatory' % nbmp)
889 msg.append(' %i mandatory' % nbmp)
886 if nbap:
890 if nbap:
887 msg.append(' %i advisory' % nbmp)
891 msg.append(' %i advisory' % nbmp)
888 msg.append(')')
892 msg.append(')')
889 if not self.data:
893 if not self.data:
890 msg.append(' empty payload')
894 msg.append(' empty payload')
891 elif util.safehasattr(self.data, 'next'):
895 elif util.safehasattr(self.data, 'next'):
892 msg.append(' streamed payload')
896 msg.append(' streamed payload')
893 else:
897 else:
894 msg.append(' %i bytes payload' % len(self.data))
898 msg.append(' %i bytes payload' % len(self.data))
895 msg.append('\n')
899 msg.append('\n')
896 ui.debug(''.join(msg))
900 ui.debug(''.join(msg))
897
901
898 #### header
902 #### header
899 if self.mandatory:
903 if self.mandatory:
900 parttype = self.type.upper()
904 parttype = self.type.upper()
901 else:
905 else:
902 parttype = self.type.lower()
906 parttype = self.type.lower()
903 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
907 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
904 ## parttype
908 ## parttype
905 header = [_pack(_fparttypesize, len(parttype)),
909 header = [_pack(_fparttypesize, len(parttype)),
906 parttype, _pack(_fpartid, self.id),
910 parttype, _pack(_fpartid, self.id),
907 ]
911 ]
908 ## parameters
912 ## parameters
909 # count
913 # count
910 manpar = self.mandatoryparams
914 manpar = self.mandatoryparams
911 advpar = self.advisoryparams
915 advpar = self.advisoryparams
912 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
916 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
913 # size
917 # size
914 parsizes = []
918 parsizes = []
915 for key, value in manpar:
919 for key, value in manpar:
916 parsizes.append(len(key))
920 parsizes.append(len(key))
917 parsizes.append(len(value))
921 parsizes.append(len(value))
918 for key, value in advpar:
922 for key, value in advpar:
919 parsizes.append(len(key))
923 parsizes.append(len(key))
920 parsizes.append(len(value))
924 parsizes.append(len(value))
921 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
925 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
922 header.append(paramsizes)
926 header.append(paramsizes)
923 # key, value
927 # key, value
924 for key, value in manpar:
928 for key, value in manpar:
925 header.append(key)
929 header.append(key)
926 header.append(value)
930 header.append(value)
927 for key, value in advpar:
931 for key, value in advpar:
928 header.append(key)
932 header.append(key)
929 header.append(value)
933 header.append(value)
930 ## finalize header
934 ## finalize header
931 headerchunk = ''.join(header)
935 headerchunk = ''.join(header)
932 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
936 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
933 yield _pack(_fpartheadersize, len(headerchunk))
937 yield _pack(_fpartheadersize, len(headerchunk))
934 yield headerchunk
938 yield headerchunk
935 ## payload
939 ## payload
936 try:
940 try:
937 for chunk in self._payloadchunks():
941 for chunk in self._payloadchunks():
938 outdebug(ui, 'payload chunk size: %i' % len(chunk))
942 outdebug(ui, 'payload chunk size: %i' % len(chunk))
939 yield _pack(_fpayloadsize, len(chunk))
943 yield _pack(_fpayloadsize, len(chunk))
940 yield chunk
944 yield chunk
941 except GeneratorExit:
945 except GeneratorExit:
942 # GeneratorExit means that nobody is listening for our
946 # GeneratorExit means that nobody is listening for our
943 # results anyway, so just bail quickly rather than trying
947 # results anyway, so just bail quickly rather than trying
944 # to produce an error part.
948 # to produce an error part.
945 ui.debug('bundle2-generatorexit\n')
949 ui.debug('bundle2-generatorexit\n')
946 raise
950 raise
947 except BaseException as exc:
951 except BaseException as exc:
948 # backup exception data for later
952 # backup exception data for later
949 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
953 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
950 % exc)
954 % exc)
951 exc_info = sys.exc_info()
955 exc_info = sys.exc_info()
952 msg = 'unexpected error: %s' % exc
956 msg = 'unexpected error: %s' % exc
953 interpart = bundlepart('error:abort', [('message', msg)],
957 interpart = bundlepart('error:abort', [('message', msg)],
954 mandatory=False)
958 mandatory=False)
955 interpart.id = 0
959 interpart.id = 0
956 yield _pack(_fpayloadsize, -1)
960 yield _pack(_fpayloadsize, -1)
957 for chunk in interpart.getchunks(ui=ui):
961 for chunk in interpart.getchunks(ui=ui):
958 yield chunk
962 yield chunk
959 outdebug(ui, 'closing payload chunk')
963 outdebug(ui, 'closing payload chunk')
960 # abort current part payload
964 # abort current part payload
961 yield _pack(_fpayloadsize, 0)
965 yield _pack(_fpayloadsize, 0)
962 raise exc_info[0], exc_info[1], exc_info[2]
966 raise exc_info[0], exc_info[1], exc_info[2]
963 # end of payload
967 # end of payload
964 outdebug(ui, 'closing payload chunk')
968 outdebug(ui, 'closing payload chunk')
965 yield _pack(_fpayloadsize, 0)
969 yield _pack(_fpayloadsize, 0)
966 self._generated = True
970 self._generated = True
967
971
968 def _payloadchunks(self):
972 def _payloadchunks(self):
969 """yield chunks of a the part payload
973 """yield chunks of a the part payload
970
974
971 Exists to handle the different methods to provide data to a part."""
975 Exists to handle the different methods to provide data to a part."""
972 # we only support fixed size data now.
976 # we only support fixed size data now.
973 # This will be improved in the future.
977 # This will be improved in the future.
974 if util.safehasattr(self.data, 'next'):
978 if util.safehasattr(self.data, 'next'):
975 buff = util.chunkbuffer(self.data)
979 buff = util.chunkbuffer(self.data)
976 chunk = buff.read(preferedchunksize)
980 chunk = buff.read(preferedchunksize)
977 while chunk:
981 while chunk:
978 yield chunk
982 yield chunk
979 chunk = buff.read(preferedchunksize)
983 chunk = buff.read(preferedchunksize)
980 elif len(self.data):
984 elif len(self.data):
981 yield self.data
985 yield self.data
982
986
983
987
984 flaginterrupt = -1
988 flaginterrupt = -1
985
989
986 class interrupthandler(unpackermixin):
990 class interrupthandler(unpackermixin):
987 """read one part and process it with restricted capability
991 """read one part and process it with restricted capability
988
992
989 This allows to transmit exception raised on the producer size during part
993 This allows to transmit exception raised on the producer size during part
990 iteration while the consumer is reading a part.
994 iteration while the consumer is reading a part.
991
995
992 Part processed in this manner only have access to a ui object,"""
996 Part processed in this manner only have access to a ui object,"""
993
997
994 def __init__(self, ui, fp):
998 def __init__(self, ui, fp):
995 super(interrupthandler, self).__init__(fp)
999 super(interrupthandler, self).__init__(fp)
996 self.ui = ui
1000 self.ui = ui
997
1001
998 def _readpartheader(self):
1002 def _readpartheader(self):
999 """reads a part header size and return the bytes blob
1003 """reads a part header size and return the bytes blob
1000
1004
1001 returns None if empty"""
1005 returns None if empty"""
1002 headersize = self._unpack(_fpartheadersize)[0]
1006 headersize = self._unpack(_fpartheadersize)[0]
1003 if headersize < 0:
1007 if headersize < 0:
1004 raise error.BundleValueError('negative part header size: %i'
1008 raise error.BundleValueError('negative part header size: %i'
1005 % headersize)
1009 % headersize)
1006 indebug(self.ui, 'part header size: %i\n' % headersize)
1010 indebug(self.ui, 'part header size: %i\n' % headersize)
1007 if headersize:
1011 if headersize:
1008 return self._readexact(headersize)
1012 return self._readexact(headersize)
1009 return None
1013 return None
1010
1014
1011 def __call__(self):
1015 def __call__(self):
1012
1016
1013 self.ui.debug('bundle2-input-stream-interrupt:'
1017 self.ui.debug('bundle2-input-stream-interrupt:'
1014 ' opening out of band context\n')
1018 ' opening out of band context\n')
1015 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1019 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1016 headerblock = self._readpartheader()
1020 headerblock = self._readpartheader()
1017 if headerblock is None:
1021 if headerblock is None:
1018 indebug(self.ui, 'no part found during interruption.')
1022 indebug(self.ui, 'no part found during interruption.')
1019 return
1023 return
1020 part = unbundlepart(self.ui, headerblock, self._fp)
1024 part = unbundlepart(self.ui, headerblock, self._fp)
1021 op = interruptoperation(self.ui)
1025 op = interruptoperation(self.ui)
1022 _processpart(op, part)
1026 _processpart(op, part)
1023 self.ui.debug('bundle2-input-stream-interrupt:'
1027 self.ui.debug('bundle2-input-stream-interrupt:'
1024 ' closing out of band context\n')
1028 ' closing out of band context\n')
1025
1029
1026 class interruptoperation(object):
1030 class interruptoperation(object):
1027 """A limited operation to be use by part handler during interruption
1031 """A limited operation to be use by part handler during interruption
1028
1032
1029 It only have access to an ui object.
1033 It only have access to an ui object.
1030 """
1034 """
1031
1035
1032 def __init__(self, ui):
1036 def __init__(self, ui):
1033 self.ui = ui
1037 self.ui = ui
1034 self.reply = None
1038 self.reply = None
1035 self.captureoutput = False
1039 self.captureoutput = False
1036
1040
1037 @property
1041 @property
1038 def repo(self):
1042 def repo(self):
1039 raise RuntimeError('no repo access from stream interruption')
1043 raise RuntimeError('no repo access from stream interruption')
1040
1044
1041 def gettransaction(self):
1045 def gettransaction(self):
1042 raise TransactionUnavailable('no repo access from stream interruption')
1046 raise TransactionUnavailable('no repo access from stream interruption')
1043
1047
1044 class unbundlepart(unpackermixin):
1048 class unbundlepart(unpackermixin):
1045 """a bundle part read from a bundle"""
1049 """a bundle part read from a bundle"""
1046
1050
1047 def __init__(self, ui, header, fp):
1051 def __init__(self, ui, header, fp):
1048 super(unbundlepart, self).__init__(fp)
1052 super(unbundlepart, self).__init__(fp)
1049 self.ui = ui
1053 self.ui = ui
1050 # unbundle state attr
1054 # unbundle state attr
1051 self._headerdata = header
1055 self._headerdata = header
1052 self._headeroffset = 0
1056 self._headeroffset = 0
1053 self._initialized = False
1057 self._initialized = False
1054 self.consumed = False
1058 self.consumed = False
1055 # part data
1059 # part data
1056 self.id = None
1060 self.id = None
1057 self.type = None
1061 self.type = None
1058 self.mandatoryparams = None
1062 self.mandatoryparams = None
1059 self.advisoryparams = None
1063 self.advisoryparams = None
1060 self.params = None
1064 self.params = None
1061 self.mandatorykeys = ()
1065 self.mandatorykeys = ()
1062 self._payloadstream = None
1066 self._payloadstream = None
1063 self._readheader()
1067 self._readheader()
1064 self._mandatory = None
1068 self._mandatory = None
1065 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1069 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1066 self._pos = 0
1070 self._pos = 0
1067
1071
1068 def _fromheader(self, size):
1072 def _fromheader(self, size):
1069 """return the next <size> byte from the header"""
1073 """return the next <size> byte from the header"""
1070 offset = self._headeroffset
1074 offset = self._headeroffset
1071 data = self._headerdata[offset:(offset + size)]
1075 data = self._headerdata[offset:(offset + size)]
1072 self._headeroffset = offset + size
1076 self._headeroffset = offset + size
1073 return data
1077 return data
1074
1078
1075 def _unpackheader(self, format):
1079 def _unpackheader(self, format):
1076 """read given format from header
1080 """read given format from header
1077
1081
1078 This automatically compute the size of the format to read."""
1082 This automatically compute the size of the format to read."""
1079 data = self._fromheader(struct.calcsize(format))
1083 data = self._fromheader(struct.calcsize(format))
1080 return _unpack(format, data)
1084 return _unpack(format, data)
1081
1085
1082 def _initparams(self, mandatoryparams, advisoryparams):
1086 def _initparams(self, mandatoryparams, advisoryparams):
1083 """internal function to setup all logic related parameters"""
1087 """internal function to setup all logic related parameters"""
1084 # make it read only to prevent people touching it by mistake.
1088 # make it read only to prevent people touching it by mistake.
1085 self.mandatoryparams = tuple(mandatoryparams)
1089 self.mandatoryparams = tuple(mandatoryparams)
1086 self.advisoryparams = tuple(advisoryparams)
1090 self.advisoryparams = tuple(advisoryparams)
1087 # user friendly UI
1091 # user friendly UI
1088 self.params = dict(self.mandatoryparams)
1092 self.params = dict(self.mandatoryparams)
1089 self.params.update(dict(self.advisoryparams))
1093 self.params.update(dict(self.advisoryparams))
1090 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1094 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1091
1095
1092 def _payloadchunks(self, chunknum=0):
1096 def _payloadchunks(self, chunknum=0):
1093 '''seek to specified chunk and start yielding data'''
1097 '''seek to specified chunk and start yielding data'''
1094 if len(self._chunkindex) == 0:
1098 if len(self._chunkindex) == 0:
1095 assert chunknum == 0, 'Must start with chunk 0'
1099 assert chunknum == 0, 'Must start with chunk 0'
1096 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1100 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1097 else:
1101 else:
1098 assert chunknum < len(self._chunkindex), \
1102 assert chunknum < len(self._chunkindex), \
1099 'Unknown chunk %d' % chunknum
1103 'Unknown chunk %d' % chunknum
1100 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1104 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1101
1105
1102 pos = self._chunkindex[chunknum][0]
1106 pos = self._chunkindex[chunknum][0]
1103 payloadsize = self._unpack(_fpayloadsize)[0]
1107 payloadsize = self._unpack(_fpayloadsize)[0]
1104 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1108 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1105 while payloadsize:
1109 while payloadsize:
1106 if payloadsize == flaginterrupt:
1110 if payloadsize == flaginterrupt:
1107 # interruption detection, the handler will now read a
1111 # interruption detection, the handler will now read a
1108 # single part and process it.
1112 # single part and process it.
1109 interrupthandler(self.ui, self._fp)()
1113 interrupthandler(self.ui, self._fp)()
1110 elif payloadsize < 0:
1114 elif payloadsize < 0:
1111 msg = 'negative payload chunk size: %i' % payloadsize
1115 msg = 'negative payload chunk size: %i' % payloadsize
1112 raise error.BundleValueError(msg)
1116 raise error.BundleValueError(msg)
1113 else:
1117 else:
1114 result = self._readexact(payloadsize)
1118 result = self._readexact(payloadsize)
1115 chunknum += 1
1119 chunknum += 1
1116 pos += payloadsize
1120 pos += payloadsize
1117 if chunknum == len(self._chunkindex):
1121 if chunknum == len(self._chunkindex):
1118 self._chunkindex.append((pos,
1122 self._chunkindex.append((pos,
1119 super(unbundlepart, self).tell()))
1123 super(unbundlepart, self).tell()))
1120 yield result
1124 yield result
1121 payloadsize = self._unpack(_fpayloadsize)[0]
1125 payloadsize = self._unpack(_fpayloadsize)[0]
1122 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1126 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1123
1127
1124 def _findchunk(self, pos):
1128 def _findchunk(self, pos):
1125 '''for a given payload position, return a chunk number and offset'''
1129 '''for a given payload position, return a chunk number and offset'''
1126 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1130 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1127 if ppos == pos:
1131 if ppos == pos:
1128 return chunk, 0
1132 return chunk, 0
1129 elif ppos > pos:
1133 elif ppos > pos:
1130 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1134 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1131 raise ValueError('Unknown chunk')
1135 raise ValueError('Unknown chunk')
1132
1136
1133 def _readheader(self):
1137 def _readheader(self):
1134 """read the header and setup the object"""
1138 """read the header and setup the object"""
1135 typesize = self._unpackheader(_fparttypesize)[0]
1139 typesize = self._unpackheader(_fparttypesize)[0]
1136 self.type = self._fromheader(typesize)
1140 self.type = self._fromheader(typesize)
1137 indebug(self.ui, 'part type: "%s"' % self.type)
1141 indebug(self.ui, 'part type: "%s"' % self.type)
1138 self.id = self._unpackheader(_fpartid)[0]
1142 self.id = self._unpackheader(_fpartid)[0]
1139 indebug(self.ui, 'part id: "%s"' % self.id)
1143 indebug(self.ui, 'part id: "%s"' % self.id)
1140 # extract mandatory bit from type
1144 # extract mandatory bit from type
1141 self.mandatory = (self.type != self.type.lower())
1145 self.mandatory = (self.type != self.type.lower())
1142 self.type = self.type.lower()
1146 self.type = self.type.lower()
1143 ## reading parameters
1147 ## reading parameters
1144 # param count
1148 # param count
1145 mancount, advcount = self._unpackheader(_fpartparamcount)
1149 mancount, advcount = self._unpackheader(_fpartparamcount)
1146 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1150 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1147 # param size
1151 # param size
1148 fparamsizes = _makefpartparamsizes(mancount + advcount)
1152 fparamsizes = _makefpartparamsizes(mancount + advcount)
1149 paramsizes = self._unpackheader(fparamsizes)
1153 paramsizes = self._unpackheader(fparamsizes)
1150 # make it a list of couple again
1154 # make it a list of couple again
1151 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1155 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1152 # split mandatory from advisory
1156 # split mandatory from advisory
1153 mansizes = paramsizes[:mancount]
1157 mansizes = paramsizes[:mancount]
1154 advsizes = paramsizes[mancount:]
1158 advsizes = paramsizes[mancount:]
1155 # retrieve param value
1159 # retrieve param value
1156 manparams = []
1160 manparams = []
1157 for key, value in mansizes:
1161 for key, value in mansizes:
1158 manparams.append((self._fromheader(key), self._fromheader(value)))
1162 manparams.append((self._fromheader(key), self._fromheader(value)))
1159 advparams = []
1163 advparams = []
1160 for key, value in advsizes:
1164 for key, value in advsizes:
1161 advparams.append((self._fromheader(key), self._fromheader(value)))
1165 advparams.append((self._fromheader(key), self._fromheader(value)))
1162 self._initparams(manparams, advparams)
1166 self._initparams(manparams, advparams)
1163 ## part payload
1167 ## part payload
1164 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1168 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1165 # we read the data, tell it
1169 # we read the data, tell it
1166 self._initialized = True
1170 self._initialized = True
1167
1171
1168 def read(self, size=None):
1172 def read(self, size=None):
1169 """read payload data"""
1173 """read payload data"""
1170 if not self._initialized:
1174 if not self._initialized:
1171 self._readheader()
1175 self._readheader()
1172 if size is None:
1176 if size is None:
1173 data = self._payloadstream.read()
1177 data = self._payloadstream.read()
1174 else:
1178 else:
1175 data = self._payloadstream.read(size)
1179 data = self._payloadstream.read(size)
1176 self._pos += len(data)
1180 self._pos += len(data)
1177 if size is None or len(data) < size:
1181 if size is None or len(data) < size:
1178 if not self.consumed and self._pos:
1182 if not self.consumed and self._pos:
1179 self.ui.debug('bundle2-input-part: total payload size %i\n'
1183 self.ui.debug('bundle2-input-part: total payload size %i\n'
1180 % self._pos)
1184 % self._pos)
1181 self.consumed = True
1185 self.consumed = True
1182 return data
1186 return data
1183
1187
1184 def tell(self):
1188 def tell(self):
1185 return self._pos
1189 return self._pos
1186
1190
1187 def seek(self, offset, whence=0):
1191 def seek(self, offset, whence=0):
1188 if whence == 0:
1192 if whence == 0:
1189 newpos = offset
1193 newpos = offset
1190 elif whence == 1:
1194 elif whence == 1:
1191 newpos = self._pos + offset
1195 newpos = self._pos + offset
1192 elif whence == 2:
1196 elif whence == 2:
1193 if not self.consumed:
1197 if not self.consumed:
1194 self.read()
1198 self.read()
1195 newpos = self._chunkindex[-1][0] - offset
1199 newpos = self._chunkindex[-1][0] - offset
1196 else:
1200 else:
1197 raise ValueError('Unknown whence value: %r' % (whence,))
1201 raise ValueError('Unknown whence value: %r' % (whence,))
1198
1202
1199 if newpos > self._chunkindex[-1][0] and not self.consumed:
1203 if newpos > self._chunkindex[-1][0] and not self.consumed:
1200 self.read()
1204 self.read()
1201 if not 0 <= newpos <= self._chunkindex[-1][0]:
1205 if not 0 <= newpos <= self._chunkindex[-1][0]:
1202 raise ValueError('Offset out of range')
1206 raise ValueError('Offset out of range')
1203
1207
1204 if self._pos != newpos:
1208 if self._pos != newpos:
1205 chunk, internaloffset = self._findchunk(newpos)
1209 chunk, internaloffset = self._findchunk(newpos)
1206 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1210 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1207 adjust = self.read(internaloffset)
1211 adjust = self.read(internaloffset)
1208 if len(adjust) != internaloffset:
1212 if len(adjust) != internaloffset:
1209 raise error.Abort(_('Seek failed\n'))
1213 raise error.Abort(_('Seek failed\n'))
1210 self._pos = newpos
1214 self._pos = newpos
1211
1215
1212 # These are only the static capabilities.
1216 # These are only the static capabilities.
1213 # Check the 'getrepocaps' function for the rest.
1217 # Check the 'getrepocaps' function for the rest.
1214 capabilities = {'HG20': (),
1218 capabilities = {'HG20': (),
1215 'error': ('abort', 'unsupportedcontent', 'pushraced',
1219 'error': ('abort', 'unsupportedcontent', 'pushraced',
1216 'pushkey'),
1220 'pushkey'),
1217 'listkeys': (),
1221 'listkeys': (),
1218 'pushkey': (),
1222 'pushkey': (),
1219 'digests': tuple(sorted(util.DIGESTS.keys())),
1223 'digests': tuple(sorted(util.DIGESTS.keys())),
1220 'remote-changegroup': ('http', 'https'),
1224 'remote-changegroup': ('http', 'https'),
1221 'hgtagsfnodes': (),
1225 'hgtagsfnodes': (),
1222 }
1226 }
1223
1227
1224 def getrepocaps(repo, allowpushback=False):
1228 def getrepocaps(repo, allowpushback=False):
1225 """return the bundle2 capabilities for a given repo
1229 """return the bundle2 capabilities for a given repo
1226
1230
1227 Exists to allow extensions (like evolution) to mutate the capabilities.
1231 Exists to allow extensions (like evolution) to mutate the capabilities.
1228 """
1232 """
1229 caps = capabilities.copy()
1233 caps = capabilities.copy()
1230 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1234 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1231 if obsolete.isenabled(repo, obsolete.exchangeopt):
1235 if obsolete.isenabled(repo, obsolete.exchangeopt):
1232 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1236 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1233 caps['obsmarkers'] = supportedformat
1237 caps['obsmarkers'] = supportedformat
1234 if allowpushback:
1238 if allowpushback:
1235 caps['pushback'] = ()
1239 caps['pushback'] = ()
1236 return caps
1240 return caps
1237
1241
1238 def bundle2caps(remote):
1242 def bundle2caps(remote):
1239 """return the bundle capabilities of a peer as dict"""
1243 """return the bundle capabilities of a peer as dict"""
1240 raw = remote.capable('bundle2')
1244 raw = remote.capable('bundle2')
1241 if not raw and raw != '':
1245 if not raw and raw != '':
1242 return {}
1246 return {}
1243 capsblob = urllib.unquote(remote.capable('bundle2'))
1247 capsblob = urllib.unquote(remote.capable('bundle2'))
1244 return decodecaps(capsblob)
1248 return decodecaps(capsblob)
1245
1249
1246 def obsmarkersversion(caps):
1250 def obsmarkersversion(caps):
1247 """extract the list of supported obsmarkers versions from a bundle2caps dict
1251 """extract the list of supported obsmarkers versions from a bundle2caps dict
1248 """
1252 """
1249 obscaps = caps.get('obsmarkers', ())
1253 obscaps = caps.get('obsmarkers', ())
1250 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1254 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1251
1255
1252 @parthandler('changegroup', ('version', 'nbchanges'))
1256 @parthandler('changegroup', ('version', 'nbchanges'))
1253 def handlechangegroup(op, inpart):
1257 def handlechangegroup(op, inpart):
1254 """apply a changegroup part on the repo
1258 """apply a changegroup part on the repo
1255
1259
1256 This is a very early implementation that will massive rework before being
1260 This is a very early implementation that will massive rework before being
1257 inflicted to any end-user.
1261 inflicted to any end-user.
1258 """
1262 """
1259 # Make sure we trigger a transaction creation
1263 # Make sure we trigger a transaction creation
1260 #
1264 #
1261 # The addchangegroup function will get a transaction object by itself, but
1265 # The addchangegroup function will get a transaction object by itself, but
1262 # we need to make sure we trigger the creation of a transaction object used
1266 # we need to make sure we trigger the creation of a transaction object used
1263 # for the whole processing scope.
1267 # for the whole processing scope.
1264 op.gettransaction()
1268 op.gettransaction()
1265 unpackerversion = inpart.params.get('version', '01')
1269 unpackerversion = inpart.params.get('version', '01')
1266 # We should raise an appropriate exception here
1270 # We should raise an appropriate exception here
1267 unpacker = changegroup.packermap[unpackerversion][1]
1271 unpacker = changegroup.packermap[unpackerversion][1]
1268 cg = unpacker(inpart, None)
1272 cg = unpacker(inpart, None)
1269 # the source and url passed here are overwritten by the one contained in
1273 # the source and url passed here are overwritten by the one contained in
1270 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1274 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1271 nbchangesets = None
1275 nbchangesets = None
1272 if 'nbchanges' in inpart.params:
1276 if 'nbchanges' in inpart.params:
1273 nbchangesets = int(inpart.params.get('nbchanges'))
1277 nbchangesets = int(inpart.params.get('nbchanges'))
1274 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1278 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1275 op.records.add('changegroup', {'return': ret})
1279 op.records.add('changegroup', {'return': ret})
1276 if op.reply is not None:
1280 if op.reply is not None:
1277 # This is definitely not the final form of this
1281 # This is definitely not the final form of this
1278 # return. But one need to start somewhere.
1282 # return. But one need to start somewhere.
1279 part = op.reply.newpart('reply:changegroup', mandatory=False)
1283 part = op.reply.newpart('reply:changegroup', mandatory=False)
1280 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1284 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1281 part.addparam('return', '%i' % ret, mandatory=False)
1285 part.addparam('return', '%i' % ret, mandatory=False)
1282 assert not inpart.read()
1286 assert not inpart.read()
1283
1287
1284 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1288 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1285 ['digest:%s' % k for k in util.DIGESTS.keys()])
1289 ['digest:%s' % k for k in util.DIGESTS.keys()])
1286 @parthandler('remote-changegroup', _remotechangegroupparams)
1290 @parthandler('remote-changegroup', _remotechangegroupparams)
1287 def handleremotechangegroup(op, inpart):
1291 def handleremotechangegroup(op, inpart):
1288 """apply a bundle10 on the repo, given an url and validation information
1292 """apply a bundle10 on the repo, given an url and validation information
1289
1293
1290 All the information about the remote bundle to import are given as
1294 All the information about the remote bundle to import are given as
1291 parameters. The parameters include:
1295 parameters. The parameters include:
1292 - url: the url to the bundle10.
1296 - url: the url to the bundle10.
1293 - size: the bundle10 file size. It is used to validate what was
1297 - size: the bundle10 file size. It is used to validate what was
1294 retrieved by the client matches the server knowledge about the bundle.
1298 retrieved by the client matches the server knowledge about the bundle.
1295 - digests: a space separated list of the digest types provided as
1299 - digests: a space separated list of the digest types provided as
1296 parameters.
1300 parameters.
1297 - digest:<digest-type>: the hexadecimal representation of the digest with
1301 - digest:<digest-type>: the hexadecimal representation of the digest with
1298 that name. Like the size, it is used to validate what was retrieved by
1302 that name. Like the size, it is used to validate what was retrieved by
1299 the client matches what the server knows about the bundle.
1303 the client matches what the server knows about the bundle.
1300
1304
1301 When multiple digest types are given, all of them are checked.
1305 When multiple digest types are given, all of them are checked.
1302 """
1306 """
1303 try:
1307 try:
1304 raw_url = inpart.params['url']
1308 raw_url = inpart.params['url']
1305 except KeyError:
1309 except KeyError:
1306 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1310 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1307 parsed_url = util.url(raw_url)
1311 parsed_url = util.url(raw_url)
1308 if parsed_url.scheme not in capabilities['remote-changegroup']:
1312 if parsed_url.scheme not in capabilities['remote-changegroup']:
1309 raise error.Abort(_('remote-changegroup does not support %s urls') %
1313 raise error.Abort(_('remote-changegroup does not support %s urls') %
1310 parsed_url.scheme)
1314 parsed_url.scheme)
1311
1315
1312 try:
1316 try:
1313 size = int(inpart.params['size'])
1317 size = int(inpart.params['size'])
1314 except ValueError:
1318 except ValueError:
1315 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1319 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1316 % 'size')
1320 % 'size')
1317 except KeyError:
1321 except KeyError:
1318 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1322 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1319
1323
1320 digests = {}
1324 digests = {}
1321 for typ in inpart.params.get('digests', '').split():
1325 for typ in inpart.params.get('digests', '').split():
1322 param = 'digest:%s' % typ
1326 param = 'digest:%s' % typ
1323 try:
1327 try:
1324 value = inpart.params[param]
1328 value = inpart.params[param]
1325 except KeyError:
1329 except KeyError:
1326 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1330 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1327 param)
1331 param)
1328 digests[typ] = value
1332 digests[typ] = value
1329
1333
1330 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1334 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1331
1335
1332 # Make sure we trigger a transaction creation
1336 # Make sure we trigger a transaction creation
1333 #
1337 #
1334 # The addchangegroup function will get a transaction object by itself, but
1338 # The addchangegroup function will get a transaction object by itself, but
1335 # we need to make sure we trigger the creation of a transaction object used
1339 # we need to make sure we trigger the creation of a transaction object used
1336 # for the whole processing scope.
1340 # for the whole processing scope.
1337 op.gettransaction()
1341 op.gettransaction()
1338 from . import exchange
1342 from . import exchange
1339 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1343 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1340 if not isinstance(cg, changegroup.cg1unpacker):
1344 if not isinstance(cg, changegroup.cg1unpacker):
1341 raise error.Abort(_('%s: not a bundle version 1.0') %
1345 raise error.Abort(_('%s: not a bundle version 1.0') %
1342 util.hidepassword(raw_url))
1346 util.hidepassword(raw_url))
1343 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1347 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1344 op.records.add('changegroup', {'return': ret})
1348 op.records.add('changegroup', {'return': ret})
1345 if op.reply is not None:
1349 if op.reply is not None:
1346 # This is definitely not the final form of this
1350 # This is definitely not the final form of this
1347 # return. But one need to start somewhere.
1351 # return. But one need to start somewhere.
1348 part = op.reply.newpart('reply:changegroup')
1352 part = op.reply.newpart('reply:changegroup')
1349 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1353 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1350 part.addparam('return', '%i' % ret, mandatory=False)
1354 part.addparam('return', '%i' % ret, mandatory=False)
1351 try:
1355 try:
1352 real_part.validate()
1356 real_part.validate()
1353 except error.Abort as e:
1357 except error.Abort as e:
1354 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1358 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1355 (util.hidepassword(raw_url), str(e)))
1359 (util.hidepassword(raw_url), str(e)))
1356 assert not inpart.read()
1360 assert not inpart.read()
1357
1361
1358 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1362 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1359 def handlereplychangegroup(op, inpart):
1363 def handlereplychangegroup(op, inpart):
1360 ret = int(inpart.params['return'])
1364 ret = int(inpart.params['return'])
1361 replyto = int(inpart.params['in-reply-to'])
1365 replyto = int(inpart.params['in-reply-to'])
1362 op.records.add('changegroup', {'return': ret}, replyto)
1366 op.records.add('changegroup', {'return': ret}, replyto)
1363
1367
1364 @parthandler('check:heads')
1368 @parthandler('check:heads')
1365 def handlecheckheads(op, inpart):
1369 def handlecheckheads(op, inpart):
1366 """check that head of the repo did not change
1370 """check that head of the repo did not change
1367
1371
1368 This is used to detect a push race when using unbundle.
1372 This is used to detect a push race when using unbundle.
1369 This replaces the "heads" argument of unbundle."""
1373 This replaces the "heads" argument of unbundle."""
1370 h = inpart.read(20)
1374 h = inpart.read(20)
1371 heads = []
1375 heads = []
1372 while len(h) == 20:
1376 while len(h) == 20:
1373 heads.append(h)
1377 heads.append(h)
1374 h = inpart.read(20)
1378 h = inpart.read(20)
1375 assert not h
1379 assert not h
1376 # Trigger a transaction so that we are guaranteed to have the lock now.
1380 # Trigger a transaction so that we are guaranteed to have the lock now.
1377 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1381 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1378 op.gettransaction()
1382 op.gettransaction()
1379 if heads != op.repo.heads():
1383 if heads != op.repo.heads():
1380 raise error.PushRaced('repository changed while pushing - '
1384 raise error.PushRaced('repository changed while pushing - '
1381 'please try again')
1385 'please try again')
1382
1386
1383 @parthandler('output')
1387 @parthandler('output')
1384 def handleoutput(op, inpart):
1388 def handleoutput(op, inpart):
1385 """forward output captured on the server to the client"""
1389 """forward output captured on the server to the client"""
1386 for line in inpart.read().splitlines():
1390 for line in inpart.read().splitlines():
1387 op.ui.status(('remote: %s\n' % line))
1391 op.ui.status(('remote: %s\n' % line))
1388
1392
1389 @parthandler('replycaps')
1393 @parthandler('replycaps')
1390 def handlereplycaps(op, inpart):
1394 def handlereplycaps(op, inpart):
1391 """Notify that a reply bundle should be created
1395 """Notify that a reply bundle should be created
1392
1396
1393 The payload contains the capabilities information for the reply"""
1397 The payload contains the capabilities information for the reply"""
1394 caps = decodecaps(inpart.read())
1398 caps = decodecaps(inpart.read())
1395 if op.reply is None:
1399 if op.reply is None:
1396 op.reply = bundle20(op.ui, caps)
1400 op.reply = bundle20(op.ui, caps)
1397
1401
1398 @parthandler('error:abort', ('message', 'hint'))
1402 @parthandler('error:abort', ('message', 'hint'))
1399 def handleerrorabort(op, inpart):
1403 def handleerrorabort(op, inpart):
1400 """Used to transmit abort error over the wire"""
1404 """Used to transmit abort error over the wire"""
1401 raise error.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1405 raise error.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1402
1406
1403 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1407 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1404 'in-reply-to'))
1408 'in-reply-to'))
1405 def handleerrorpushkey(op, inpart):
1409 def handleerrorpushkey(op, inpart):
1406 """Used to transmit failure of a mandatory pushkey over the wire"""
1410 """Used to transmit failure of a mandatory pushkey over the wire"""
1407 kwargs = {}
1411 kwargs = {}
1408 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1412 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1409 value = inpart.params.get(name)
1413 value = inpart.params.get(name)
1410 if value is not None:
1414 if value is not None:
1411 kwargs[name] = value
1415 kwargs[name] = value
1412 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1416 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1413
1417
1414 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1418 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1415 def handleerrorunsupportedcontent(op, inpart):
1419 def handleerrorunsupportedcontent(op, inpart):
1416 """Used to transmit unknown content error over the wire"""
1420 """Used to transmit unknown content error over the wire"""
1417 kwargs = {}
1421 kwargs = {}
1418 parttype = inpart.params.get('parttype')
1422 parttype = inpart.params.get('parttype')
1419 if parttype is not None:
1423 if parttype is not None:
1420 kwargs['parttype'] = parttype
1424 kwargs['parttype'] = parttype
1421 params = inpart.params.get('params')
1425 params = inpart.params.get('params')
1422 if params is not None:
1426 if params is not None:
1423 kwargs['params'] = params.split('\0')
1427 kwargs['params'] = params.split('\0')
1424
1428
1425 raise error.BundleUnknownFeatureError(**kwargs)
1429 raise error.BundleUnknownFeatureError(**kwargs)
1426
1430
1427 @parthandler('error:pushraced', ('message',))
1431 @parthandler('error:pushraced', ('message',))
1428 def handleerrorpushraced(op, inpart):
1432 def handleerrorpushraced(op, inpart):
1429 """Used to transmit push race error over the wire"""
1433 """Used to transmit push race error over the wire"""
1430 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1434 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1431
1435
1432 @parthandler('listkeys', ('namespace',))
1436 @parthandler('listkeys', ('namespace',))
1433 def handlelistkeys(op, inpart):
1437 def handlelistkeys(op, inpart):
1434 """retrieve pushkey namespace content stored in a bundle2"""
1438 """retrieve pushkey namespace content stored in a bundle2"""
1435 namespace = inpart.params['namespace']
1439 namespace = inpart.params['namespace']
1436 r = pushkey.decodekeys(inpart.read())
1440 r = pushkey.decodekeys(inpart.read())
1437 op.records.add('listkeys', (namespace, r))
1441 op.records.add('listkeys', (namespace, r))
1438
1442
1439 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1443 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1440 def handlepushkey(op, inpart):
1444 def handlepushkey(op, inpart):
1441 """process a pushkey request"""
1445 """process a pushkey request"""
1442 dec = pushkey.decode
1446 dec = pushkey.decode
1443 namespace = dec(inpart.params['namespace'])
1447 namespace = dec(inpart.params['namespace'])
1444 key = dec(inpart.params['key'])
1448 key = dec(inpart.params['key'])
1445 old = dec(inpart.params['old'])
1449 old = dec(inpart.params['old'])
1446 new = dec(inpart.params['new'])
1450 new = dec(inpart.params['new'])
1447 # Grab the transaction to ensure that we have the lock before performing the
1451 # Grab the transaction to ensure that we have the lock before performing the
1448 # pushkey.
1452 # pushkey.
1449 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1453 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1450 op.gettransaction()
1454 op.gettransaction()
1451 ret = op.repo.pushkey(namespace, key, old, new)
1455 ret = op.repo.pushkey(namespace, key, old, new)
1452 record = {'namespace': namespace,
1456 record = {'namespace': namespace,
1453 'key': key,
1457 'key': key,
1454 'old': old,
1458 'old': old,
1455 'new': new}
1459 'new': new}
1456 op.records.add('pushkey', record)
1460 op.records.add('pushkey', record)
1457 if op.reply is not None:
1461 if op.reply is not None:
1458 rpart = op.reply.newpart('reply:pushkey')
1462 rpart = op.reply.newpart('reply:pushkey')
1459 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1463 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1460 rpart.addparam('return', '%i' % ret, mandatory=False)
1464 rpart.addparam('return', '%i' % ret, mandatory=False)
1461 if inpart.mandatory and not ret:
1465 if inpart.mandatory and not ret:
1462 kwargs = {}
1466 kwargs = {}
1463 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1467 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1464 if key in inpart.params:
1468 if key in inpart.params:
1465 kwargs[key] = inpart.params[key]
1469 kwargs[key] = inpart.params[key]
1466 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1470 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1467
1471
1468 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1472 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1469 def handlepushkeyreply(op, inpart):
1473 def handlepushkeyreply(op, inpart):
1470 """retrieve the result of a pushkey request"""
1474 """retrieve the result of a pushkey request"""
1471 ret = int(inpart.params['return'])
1475 ret = int(inpart.params['return'])
1472 partid = int(inpart.params['in-reply-to'])
1476 partid = int(inpart.params['in-reply-to'])
1473 op.records.add('pushkey', {'return': ret}, partid)
1477 op.records.add('pushkey', {'return': ret}, partid)
1474
1478
1475 @parthandler('obsmarkers')
1479 @parthandler('obsmarkers')
1476 def handleobsmarker(op, inpart):
1480 def handleobsmarker(op, inpart):
1477 """add a stream of obsmarkers to the repo"""
1481 """add a stream of obsmarkers to the repo"""
1478 tr = op.gettransaction()
1482 tr = op.gettransaction()
1479 markerdata = inpart.read()
1483 markerdata = inpart.read()
1480 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1484 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1481 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1485 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1482 % len(markerdata))
1486 % len(markerdata))
1483 # The mergemarkers call will crash if marker creation is not enabled.
1487 # The mergemarkers call will crash if marker creation is not enabled.
1484 # we want to avoid this if the part is advisory.
1488 # we want to avoid this if the part is advisory.
1485 if not inpart.mandatory and op.repo.obsstore.readonly:
1489 if not inpart.mandatory and op.repo.obsstore.readonly:
1486 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1490 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1487 return
1491 return
1488 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1492 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1489 if new:
1493 if new:
1490 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1494 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1491 op.records.add('obsmarkers', {'new': new})
1495 op.records.add('obsmarkers', {'new': new})
1492 if op.reply is not None:
1496 if op.reply is not None:
1493 rpart = op.reply.newpart('reply:obsmarkers')
1497 rpart = op.reply.newpart('reply:obsmarkers')
1494 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1498 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1495 rpart.addparam('new', '%i' % new, mandatory=False)
1499 rpart.addparam('new', '%i' % new, mandatory=False)
1496
1500
1497
1501
1498 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1502 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1499 def handleobsmarkerreply(op, inpart):
1503 def handleobsmarkerreply(op, inpart):
1500 """retrieve the result of a pushkey request"""
1504 """retrieve the result of a pushkey request"""
1501 ret = int(inpart.params['new'])
1505 ret = int(inpart.params['new'])
1502 partid = int(inpart.params['in-reply-to'])
1506 partid = int(inpart.params['in-reply-to'])
1503 op.records.add('obsmarkers', {'new': ret}, partid)
1507 op.records.add('obsmarkers', {'new': ret}, partid)
1504
1508
1505 @parthandler('hgtagsfnodes')
1509 @parthandler('hgtagsfnodes')
1506 def handlehgtagsfnodes(op, inpart):
1510 def handlehgtagsfnodes(op, inpart):
1507 """Applies .hgtags fnodes cache entries to the local repo.
1511 """Applies .hgtags fnodes cache entries to the local repo.
1508
1512
1509 Payload is pairs of 20 byte changeset nodes and filenodes.
1513 Payload is pairs of 20 byte changeset nodes and filenodes.
1510 """
1514 """
1511 # Grab the transaction so we ensure that we have the lock at this point.
1515 # Grab the transaction so we ensure that we have the lock at this point.
1512 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1516 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1513 op.gettransaction()
1517 op.gettransaction()
1514 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1518 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1515
1519
1516 count = 0
1520 count = 0
1517 while True:
1521 while True:
1518 node = inpart.read(20)
1522 node = inpart.read(20)
1519 fnode = inpart.read(20)
1523 fnode = inpart.read(20)
1520 if len(node) < 20 or len(fnode) < 20:
1524 if len(node) < 20 or len(fnode) < 20:
1521 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1525 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1522 break
1526 break
1523 cache.setfnode(node, fnode)
1527 cache.setfnode(node, fnode)
1524 count += 1
1528 count += 1
1525
1529
1526 cache.write()
1530 cache.write()
1527 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1531 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now