##// END OF EJS Templates
applybundle: set 'bundle2=1' env for all transaction...
Pierre-Yves David -
r26792:a84e0cac default
parent child Browse files
Show More
@@ -1,1531 +1,1532 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155 import urllib
155 import urllib
156
156
157 from .i18n import _
157 from .i18n import _
158 from . import (
158 from . import (
159 changegroup,
159 changegroup,
160 error,
160 error,
161 obsolete,
161 obsolete,
162 pushkey,
162 pushkey,
163 tags,
163 tags,
164 url,
164 url,
165 util,
165 util,
166 )
166 )
167
167
168 _pack = struct.pack
168 _pack = struct.pack
169 _unpack = struct.unpack
169 _unpack = struct.unpack
170
170
171 _fstreamparamsize = '>i'
171 _fstreamparamsize = '>i'
172 _fpartheadersize = '>i'
172 _fpartheadersize = '>i'
173 _fparttypesize = '>B'
173 _fparttypesize = '>B'
174 _fpartid = '>I'
174 _fpartid = '>I'
175 _fpayloadsize = '>i'
175 _fpayloadsize = '>i'
176 _fpartparamcount = '>BB'
176 _fpartparamcount = '>BB'
177
177
178 preferedchunksize = 4096
178 preferedchunksize = 4096
179
179
180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
181
181
182 def outdebug(ui, message):
182 def outdebug(ui, message):
183 """debug regarding output stream (bundling)"""
183 """debug regarding output stream (bundling)"""
184 if ui.configbool('devel', 'bundle2.debug', False):
184 if ui.configbool('devel', 'bundle2.debug', False):
185 ui.debug('bundle2-output: %s\n' % message)
185 ui.debug('bundle2-output: %s\n' % message)
186
186
187 def indebug(ui, message):
187 def indebug(ui, message):
188 """debug on input stream (unbundling)"""
188 """debug on input stream (unbundling)"""
189 if ui.configbool('devel', 'bundle2.debug', False):
189 if ui.configbool('devel', 'bundle2.debug', False):
190 ui.debug('bundle2-input: %s\n' % message)
190 ui.debug('bundle2-input: %s\n' % message)
191
191
192 def validateparttype(parttype):
192 def validateparttype(parttype):
193 """raise ValueError if a parttype contains invalid character"""
193 """raise ValueError if a parttype contains invalid character"""
194 if _parttypeforbidden.search(parttype):
194 if _parttypeforbidden.search(parttype):
195 raise ValueError(parttype)
195 raise ValueError(parttype)
196
196
197 def _makefpartparamsizes(nbparams):
197 def _makefpartparamsizes(nbparams):
198 """return a struct format to read part parameter sizes
198 """return a struct format to read part parameter sizes
199
199
200 The number parameters is variable so we need to build that format
200 The number parameters is variable so we need to build that format
201 dynamically.
201 dynamically.
202 """
202 """
203 return '>'+('BB'*nbparams)
203 return '>'+('BB'*nbparams)
204
204
205 parthandlermapping = {}
205 parthandlermapping = {}
206
206
207 def parthandler(parttype, params=()):
207 def parthandler(parttype, params=()):
208 """decorator that register a function as a bundle2 part handler
208 """decorator that register a function as a bundle2 part handler
209
209
210 eg::
210 eg::
211
211
212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
213 def myparttypehandler(...):
213 def myparttypehandler(...):
214 '''process a part of type "my part".'''
214 '''process a part of type "my part".'''
215 ...
215 ...
216 """
216 """
217 validateparttype(parttype)
217 validateparttype(parttype)
218 def _decorator(func):
218 def _decorator(func):
219 lparttype = parttype.lower() # enforce lower case matching.
219 lparttype = parttype.lower() # enforce lower case matching.
220 assert lparttype not in parthandlermapping
220 assert lparttype not in parthandlermapping
221 parthandlermapping[lparttype] = func
221 parthandlermapping[lparttype] = func
222 func.params = frozenset(params)
222 func.params = frozenset(params)
223 return func
223 return func
224 return _decorator
224 return _decorator
225
225
226 class unbundlerecords(object):
226 class unbundlerecords(object):
227 """keep record of what happens during and unbundle
227 """keep record of what happens during and unbundle
228
228
229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
230 category of record and obj is an arbitrary object.
230 category of record and obj is an arbitrary object.
231
231
232 `records['cat']` will return all entries of this category 'cat'.
232 `records['cat']` will return all entries of this category 'cat'.
233
233
234 Iterating on the object itself will yield `('category', obj)` tuples
234 Iterating on the object itself will yield `('category', obj)` tuples
235 for all entries.
235 for all entries.
236
236
237 All iterations happens in chronological order.
237 All iterations happens in chronological order.
238 """
238 """
239
239
240 def __init__(self):
240 def __init__(self):
241 self._categories = {}
241 self._categories = {}
242 self._sequences = []
242 self._sequences = []
243 self._replies = {}
243 self._replies = {}
244
244
245 def add(self, category, entry, inreplyto=None):
245 def add(self, category, entry, inreplyto=None):
246 """add a new record of a given category.
246 """add a new record of a given category.
247
247
248 The entry can then be retrieved in the list returned by
248 The entry can then be retrieved in the list returned by
249 self['category']."""
249 self['category']."""
250 self._categories.setdefault(category, []).append(entry)
250 self._categories.setdefault(category, []).append(entry)
251 self._sequences.append((category, entry))
251 self._sequences.append((category, entry))
252 if inreplyto is not None:
252 if inreplyto is not None:
253 self.getreplies(inreplyto).add(category, entry)
253 self.getreplies(inreplyto).add(category, entry)
254
254
255 def getreplies(self, partid):
255 def getreplies(self, partid):
256 """get the records that are replies to a specific part"""
256 """get the records that are replies to a specific part"""
257 return self._replies.setdefault(partid, unbundlerecords())
257 return self._replies.setdefault(partid, unbundlerecords())
258
258
259 def __getitem__(self, cat):
259 def __getitem__(self, cat):
260 return tuple(self._categories.get(cat, ()))
260 return tuple(self._categories.get(cat, ()))
261
261
262 def __iter__(self):
262 def __iter__(self):
263 return iter(self._sequences)
263 return iter(self._sequences)
264
264
265 def __len__(self):
265 def __len__(self):
266 return len(self._sequences)
266 return len(self._sequences)
267
267
268 def __nonzero__(self):
268 def __nonzero__(self):
269 return bool(self._sequences)
269 return bool(self._sequences)
270
270
271 class bundleoperation(object):
271 class bundleoperation(object):
272 """an object that represents a single bundling process
272 """an object that represents a single bundling process
273
273
274 Its purpose is to carry unbundle-related objects and states.
274 Its purpose is to carry unbundle-related objects and states.
275
275
276 A new object should be created at the beginning of each bundle processing.
276 A new object should be created at the beginning of each bundle processing.
277 The object is to be returned by the processing function.
277 The object is to be returned by the processing function.
278
278
279 The object has very little content now it will ultimately contain:
279 The object has very little content now it will ultimately contain:
280 * an access to the repo the bundle is applied to,
280 * an access to the repo the bundle is applied to,
281 * a ui object,
281 * a ui object,
282 * a way to retrieve a transaction to add changes to the repo,
282 * a way to retrieve a transaction to add changes to the repo,
283 * a way to record the result of processing each part,
283 * a way to record the result of processing each part,
284 * a way to construct a bundle response when applicable.
284 * a way to construct a bundle response when applicable.
285 """
285 """
286
286
287 def __init__(self, repo, transactiongetter, captureoutput=True):
287 def __init__(self, repo, transactiongetter, captureoutput=True):
288 self.repo = repo
288 self.repo = repo
289 self.ui = repo.ui
289 self.ui = repo.ui
290 self.records = unbundlerecords()
290 self.records = unbundlerecords()
291 self.gettransaction = transactiongetter
291 self.gettransaction = transactiongetter
292 self.reply = None
292 self.reply = None
293 self.captureoutput = captureoutput
293 self.captureoutput = captureoutput
294
294
295 class TransactionUnavailable(RuntimeError):
295 class TransactionUnavailable(RuntimeError):
296 pass
296 pass
297
297
298 def _notransaction():
298 def _notransaction():
299 """default method to get a transaction while processing a bundle
299 """default method to get a transaction while processing a bundle
300
300
301 Raise an exception to highlight the fact that no transaction was expected
301 Raise an exception to highlight the fact that no transaction was expected
302 to be created"""
302 to be created"""
303 raise TransactionUnavailable()
303 raise TransactionUnavailable()
304
304
305 def applybundle(repo, unbundler, tr, op=None):
305 def applybundle(repo, unbundler, tr, op=None):
306 # transform me into unbundler.apply() as soon as the freeze is lifted
306 # transform me into unbundler.apply() as soon as the freeze is lifted
307 tr.hookargs['bundle2'] = '1'
307 return processbundle(repo, unbundler, lambda: tr, op=op)
308 return processbundle(repo, unbundler, lambda: tr, op=op)
308
309
309 def processbundle(repo, unbundler, transactiongetter=None, op=None):
310 def processbundle(repo, unbundler, transactiongetter=None, op=None):
310 """This function process a bundle, apply effect to/from a repo
311 """This function process a bundle, apply effect to/from a repo
311
312
312 It iterates over each part then searches for and uses the proper handling
313 It iterates over each part then searches for and uses the proper handling
313 code to process the part. Parts are processed in order.
314 code to process the part. Parts are processed in order.
314
315
315 This is very early version of this function that will be strongly reworked
316 This is very early version of this function that will be strongly reworked
316 before final usage.
317 before final usage.
317
318
318 Unknown Mandatory part will abort the process.
319 Unknown Mandatory part will abort the process.
319
320
320 It is temporarily possible to provide a prebuilt bundleoperation to the
321 It is temporarily possible to provide a prebuilt bundleoperation to the
321 function. This is used to ensure output is properly propagated in case of
322 function. This is used to ensure output is properly propagated in case of
322 an error during the unbundling. This output capturing part will likely be
323 an error during the unbundling. This output capturing part will likely be
323 reworked and this ability will probably go away in the process.
324 reworked and this ability will probably go away in the process.
324 """
325 """
325 if op is None:
326 if op is None:
326 if transactiongetter is None:
327 if transactiongetter is None:
327 transactiongetter = _notransaction
328 transactiongetter = _notransaction
328 op = bundleoperation(repo, transactiongetter)
329 op = bundleoperation(repo, transactiongetter)
329 # todo:
330 # todo:
330 # - replace this is a init function soon.
331 # - replace this is a init function soon.
331 # - exception catching
332 # - exception catching
332 unbundler.params
333 unbundler.params
333 if repo.ui.debugflag:
334 if repo.ui.debugflag:
334 msg = ['bundle2-input-bundle:']
335 msg = ['bundle2-input-bundle:']
335 if unbundler.params:
336 if unbundler.params:
336 msg.append(' %i params')
337 msg.append(' %i params')
337 if op.gettransaction is None:
338 if op.gettransaction is None:
338 msg.append(' no-transaction')
339 msg.append(' no-transaction')
339 else:
340 else:
340 msg.append(' with-transaction')
341 msg.append(' with-transaction')
341 msg.append('\n')
342 msg.append('\n')
342 repo.ui.debug(''.join(msg))
343 repo.ui.debug(''.join(msg))
343 iterparts = enumerate(unbundler.iterparts())
344 iterparts = enumerate(unbundler.iterparts())
344 part = None
345 part = None
345 nbpart = 0
346 nbpart = 0
346 try:
347 try:
347 for nbpart, part in iterparts:
348 for nbpart, part in iterparts:
348 _processpart(op, part)
349 _processpart(op, part)
349 except BaseException as exc:
350 except BaseException as exc:
350 for nbpart, part in iterparts:
351 for nbpart, part in iterparts:
351 # consume the bundle content
352 # consume the bundle content
352 part.seek(0, 2)
353 part.seek(0, 2)
353 # Small hack to let caller code distinguish exceptions from bundle2
354 # Small hack to let caller code distinguish exceptions from bundle2
354 # processing from processing the old format. This is mostly
355 # processing from processing the old format. This is mostly
355 # needed to handle different return codes to unbundle according to the
356 # needed to handle different return codes to unbundle according to the
356 # type of bundle. We should probably clean up or drop this return code
357 # type of bundle. We should probably clean up or drop this return code
357 # craziness in a future version.
358 # craziness in a future version.
358 exc.duringunbundle2 = True
359 exc.duringunbundle2 = True
359 salvaged = []
360 salvaged = []
360 replycaps = None
361 replycaps = None
361 if op.reply is not None:
362 if op.reply is not None:
362 salvaged = op.reply.salvageoutput()
363 salvaged = op.reply.salvageoutput()
363 replycaps = op.reply.capabilities
364 replycaps = op.reply.capabilities
364 exc._replycaps = replycaps
365 exc._replycaps = replycaps
365 exc._bundle2salvagedoutput = salvaged
366 exc._bundle2salvagedoutput = salvaged
366 raise
367 raise
367 finally:
368 finally:
368 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
369 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
369
370
370 return op
371 return op
371
372
372 def _processpart(op, part):
373 def _processpart(op, part):
373 """process a single part from a bundle
374 """process a single part from a bundle
374
375
375 The part is guaranteed to have been fully consumed when the function exits
376 The part is guaranteed to have been fully consumed when the function exits
376 (even if an exception is raised)."""
377 (even if an exception is raised)."""
377 status = 'unknown' # used by debug output
378 status = 'unknown' # used by debug output
378 try:
379 try:
379 try:
380 try:
380 handler = parthandlermapping.get(part.type)
381 handler = parthandlermapping.get(part.type)
381 if handler is None:
382 if handler is None:
382 status = 'unsupported-type'
383 status = 'unsupported-type'
383 raise error.BundleUnknownFeatureError(parttype=part.type)
384 raise error.BundleUnknownFeatureError(parttype=part.type)
384 indebug(op.ui, 'found a handler for part %r' % part.type)
385 indebug(op.ui, 'found a handler for part %r' % part.type)
385 unknownparams = part.mandatorykeys - handler.params
386 unknownparams = part.mandatorykeys - handler.params
386 if unknownparams:
387 if unknownparams:
387 unknownparams = list(unknownparams)
388 unknownparams = list(unknownparams)
388 unknownparams.sort()
389 unknownparams.sort()
389 status = 'unsupported-params (%s)' % unknownparams
390 status = 'unsupported-params (%s)' % unknownparams
390 raise error.BundleUnknownFeatureError(parttype=part.type,
391 raise error.BundleUnknownFeatureError(parttype=part.type,
391 params=unknownparams)
392 params=unknownparams)
392 status = 'supported'
393 status = 'supported'
393 except error.BundleUnknownFeatureError as exc:
394 except error.BundleUnknownFeatureError as exc:
394 if part.mandatory: # mandatory parts
395 if part.mandatory: # mandatory parts
395 raise
396 raise
396 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
397 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
397 return # skip to part processing
398 return # skip to part processing
398 finally:
399 finally:
399 if op.ui.debugflag:
400 if op.ui.debugflag:
400 msg = ['bundle2-input-part: "%s"' % part.type]
401 msg = ['bundle2-input-part: "%s"' % part.type]
401 if not part.mandatory:
402 if not part.mandatory:
402 msg.append(' (advisory)')
403 msg.append(' (advisory)')
403 nbmp = len(part.mandatorykeys)
404 nbmp = len(part.mandatorykeys)
404 nbap = len(part.params) - nbmp
405 nbap = len(part.params) - nbmp
405 if nbmp or nbap:
406 if nbmp or nbap:
406 msg.append(' (params:')
407 msg.append(' (params:')
407 if nbmp:
408 if nbmp:
408 msg.append(' %i mandatory' % nbmp)
409 msg.append(' %i mandatory' % nbmp)
409 if nbap:
410 if nbap:
410 msg.append(' %i advisory' % nbmp)
411 msg.append(' %i advisory' % nbmp)
411 msg.append(')')
412 msg.append(')')
412 msg.append(' %s\n' % status)
413 msg.append(' %s\n' % status)
413 op.ui.debug(''.join(msg))
414 op.ui.debug(''.join(msg))
414
415
415 # handler is called outside the above try block so that we don't
416 # handler is called outside the above try block so that we don't
416 # risk catching KeyErrors from anything other than the
417 # risk catching KeyErrors from anything other than the
417 # parthandlermapping lookup (any KeyError raised by handler()
418 # parthandlermapping lookup (any KeyError raised by handler()
418 # itself represents a defect of a different variety).
419 # itself represents a defect of a different variety).
419 output = None
420 output = None
420 if op.captureoutput and op.reply is not None:
421 if op.captureoutput and op.reply is not None:
421 op.ui.pushbuffer(error=True, subproc=True)
422 op.ui.pushbuffer(error=True, subproc=True)
422 output = ''
423 output = ''
423 try:
424 try:
424 handler(op, part)
425 handler(op, part)
425 finally:
426 finally:
426 if output is not None:
427 if output is not None:
427 output = op.ui.popbuffer()
428 output = op.ui.popbuffer()
428 if output:
429 if output:
429 outpart = op.reply.newpart('output', data=output,
430 outpart = op.reply.newpart('output', data=output,
430 mandatory=False)
431 mandatory=False)
431 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
432 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
432 finally:
433 finally:
433 # consume the part content to not corrupt the stream.
434 # consume the part content to not corrupt the stream.
434 part.seek(0, 2)
435 part.seek(0, 2)
435
436
436
437
437 def decodecaps(blob):
438 def decodecaps(blob):
438 """decode a bundle2 caps bytes blob into a dictionary
439 """decode a bundle2 caps bytes blob into a dictionary
439
440
440 The blob is a list of capabilities (one per line)
441 The blob is a list of capabilities (one per line)
441 Capabilities may have values using a line of the form::
442 Capabilities may have values using a line of the form::
442
443
443 capability=value1,value2,value3
444 capability=value1,value2,value3
444
445
445 The values are always a list."""
446 The values are always a list."""
446 caps = {}
447 caps = {}
447 for line in blob.splitlines():
448 for line in blob.splitlines():
448 if not line:
449 if not line:
449 continue
450 continue
450 if '=' not in line:
451 if '=' not in line:
451 key, vals = line, ()
452 key, vals = line, ()
452 else:
453 else:
453 key, vals = line.split('=', 1)
454 key, vals = line.split('=', 1)
454 vals = vals.split(',')
455 vals = vals.split(',')
455 key = urllib.unquote(key)
456 key = urllib.unquote(key)
456 vals = [urllib.unquote(v) for v in vals]
457 vals = [urllib.unquote(v) for v in vals]
457 caps[key] = vals
458 caps[key] = vals
458 return caps
459 return caps
459
460
460 def encodecaps(caps):
461 def encodecaps(caps):
461 """encode a bundle2 caps dictionary into a bytes blob"""
462 """encode a bundle2 caps dictionary into a bytes blob"""
462 chunks = []
463 chunks = []
463 for ca in sorted(caps):
464 for ca in sorted(caps):
464 vals = caps[ca]
465 vals = caps[ca]
465 ca = urllib.quote(ca)
466 ca = urllib.quote(ca)
466 vals = [urllib.quote(v) for v in vals]
467 vals = [urllib.quote(v) for v in vals]
467 if vals:
468 if vals:
468 ca = "%s=%s" % (ca, ','.join(vals))
469 ca = "%s=%s" % (ca, ','.join(vals))
469 chunks.append(ca)
470 chunks.append(ca)
470 return '\n'.join(chunks)
471 return '\n'.join(chunks)
471
472
472 class bundle20(object):
473 class bundle20(object):
473 """represent an outgoing bundle2 container
474 """represent an outgoing bundle2 container
474
475
475 Use the `addparam` method to add stream level parameter. and `newpart` to
476 Use the `addparam` method to add stream level parameter. and `newpart` to
476 populate it. Then call `getchunks` to retrieve all the binary chunks of
477 populate it. Then call `getchunks` to retrieve all the binary chunks of
477 data that compose the bundle2 container."""
478 data that compose the bundle2 container."""
478
479
479 _magicstring = 'HG20'
480 _magicstring = 'HG20'
480
481
481 def __init__(self, ui, capabilities=()):
482 def __init__(self, ui, capabilities=()):
482 self.ui = ui
483 self.ui = ui
483 self._params = []
484 self._params = []
484 self._parts = []
485 self._parts = []
485 self.capabilities = dict(capabilities)
486 self.capabilities = dict(capabilities)
486 self._compressor = util.compressors[None]()
487 self._compressor = util.compressors[None]()
487
488
488 def setcompression(self, alg):
489 def setcompression(self, alg):
489 """setup core part compression to <alg>"""
490 """setup core part compression to <alg>"""
490 if alg is None:
491 if alg is None:
491 return
492 return
492 assert not any(n.lower() == 'Compression' for n, v in self._params)
493 assert not any(n.lower() == 'Compression' for n, v in self._params)
493 self.addparam('Compression', alg)
494 self.addparam('Compression', alg)
494 self._compressor = util.compressors[alg]()
495 self._compressor = util.compressors[alg]()
495
496
496 @property
497 @property
497 def nbparts(self):
498 def nbparts(self):
498 """total number of parts added to the bundler"""
499 """total number of parts added to the bundler"""
499 return len(self._parts)
500 return len(self._parts)
500
501
501 # methods used to defines the bundle2 content
502 # methods used to defines the bundle2 content
502 def addparam(self, name, value=None):
503 def addparam(self, name, value=None):
503 """add a stream level parameter"""
504 """add a stream level parameter"""
504 if not name:
505 if not name:
505 raise ValueError('empty parameter name')
506 raise ValueError('empty parameter name')
506 if name[0] not in string.letters:
507 if name[0] not in string.letters:
507 raise ValueError('non letter first character: %r' % name)
508 raise ValueError('non letter first character: %r' % name)
508 self._params.append((name, value))
509 self._params.append((name, value))
509
510
510 def addpart(self, part):
511 def addpart(self, part):
511 """add a new part to the bundle2 container
512 """add a new part to the bundle2 container
512
513
513 Parts contains the actual applicative payload."""
514 Parts contains the actual applicative payload."""
514 assert part.id is None
515 assert part.id is None
515 part.id = len(self._parts) # very cheap counter
516 part.id = len(self._parts) # very cheap counter
516 self._parts.append(part)
517 self._parts.append(part)
517
518
518 def newpart(self, typeid, *args, **kwargs):
519 def newpart(self, typeid, *args, **kwargs):
519 """create a new part and add it to the containers
520 """create a new part and add it to the containers
520
521
521 As the part is directly added to the containers. For now, this means
522 As the part is directly added to the containers. For now, this means
522 that any failure to properly initialize the part after calling
523 that any failure to properly initialize the part after calling
523 ``newpart`` should result in a failure of the whole bundling process.
524 ``newpart`` should result in a failure of the whole bundling process.
524
525
525 You can still fall back to manually create and add if you need better
526 You can still fall back to manually create and add if you need better
526 control."""
527 control."""
527 part = bundlepart(typeid, *args, **kwargs)
528 part = bundlepart(typeid, *args, **kwargs)
528 self.addpart(part)
529 self.addpart(part)
529 return part
530 return part
530
531
531 # methods used to generate the bundle2 stream
532 # methods used to generate the bundle2 stream
532 def getchunks(self):
533 def getchunks(self):
533 if self.ui.debugflag:
534 if self.ui.debugflag:
534 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
535 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
535 if self._params:
536 if self._params:
536 msg.append(' (%i params)' % len(self._params))
537 msg.append(' (%i params)' % len(self._params))
537 msg.append(' %i parts total\n' % len(self._parts))
538 msg.append(' %i parts total\n' % len(self._parts))
538 self.ui.debug(''.join(msg))
539 self.ui.debug(''.join(msg))
539 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
540 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
540 yield self._magicstring
541 yield self._magicstring
541 param = self._paramchunk()
542 param = self._paramchunk()
542 outdebug(self.ui, 'bundle parameter: %s' % param)
543 outdebug(self.ui, 'bundle parameter: %s' % param)
543 yield _pack(_fstreamparamsize, len(param))
544 yield _pack(_fstreamparamsize, len(param))
544 if param:
545 if param:
545 yield param
546 yield param
546 # starting compression
547 # starting compression
547 for chunk in self._getcorechunk():
548 for chunk in self._getcorechunk():
548 yield self._compressor.compress(chunk)
549 yield self._compressor.compress(chunk)
549 yield self._compressor.flush()
550 yield self._compressor.flush()
550
551
551 def _paramchunk(self):
552 def _paramchunk(self):
552 """return a encoded version of all stream parameters"""
553 """return a encoded version of all stream parameters"""
553 blocks = []
554 blocks = []
554 for par, value in self._params:
555 for par, value in self._params:
555 par = urllib.quote(par)
556 par = urllib.quote(par)
556 if value is not None:
557 if value is not None:
557 value = urllib.quote(value)
558 value = urllib.quote(value)
558 par = '%s=%s' % (par, value)
559 par = '%s=%s' % (par, value)
559 blocks.append(par)
560 blocks.append(par)
560 return ' '.join(blocks)
561 return ' '.join(blocks)
561
562
562 def _getcorechunk(self):
563 def _getcorechunk(self):
563 """yield chunk for the core part of the bundle
564 """yield chunk for the core part of the bundle
564
565
565 (all but headers and parameters)"""
566 (all but headers and parameters)"""
566 outdebug(self.ui, 'start of parts')
567 outdebug(self.ui, 'start of parts')
567 for part in self._parts:
568 for part in self._parts:
568 outdebug(self.ui, 'bundle part: "%s"' % part.type)
569 outdebug(self.ui, 'bundle part: "%s"' % part.type)
569 for chunk in part.getchunks(ui=self.ui):
570 for chunk in part.getchunks(ui=self.ui):
570 yield chunk
571 yield chunk
571 outdebug(self.ui, 'end of bundle')
572 outdebug(self.ui, 'end of bundle')
572 yield _pack(_fpartheadersize, 0)
573 yield _pack(_fpartheadersize, 0)
573
574
574
575
575 def salvageoutput(self):
576 def salvageoutput(self):
576 """return a list with a copy of all output parts in the bundle
577 """return a list with a copy of all output parts in the bundle
577
578
578 This is meant to be used during error handling to make sure we preserve
579 This is meant to be used during error handling to make sure we preserve
579 server output"""
580 server output"""
580 salvaged = []
581 salvaged = []
581 for part in self._parts:
582 for part in self._parts:
582 if part.type.startswith('output'):
583 if part.type.startswith('output'):
583 salvaged.append(part.copy())
584 salvaged.append(part.copy())
584 return salvaged
585 return salvaged
585
586
586
587
587 class unpackermixin(object):
588 class unpackermixin(object):
588 """A mixin to extract bytes and struct data from a stream"""
589 """A mixin to extract bytes and struct data from a stream"""
589
590
590 def __init__(self, fp):
591 def __init__(self, fp):
591 self._fp = fp
592 self._fp = fp
592 self._seekable = (util.safehasattr(fp, 'seek') and
593 self._seekable = (util.safehasattr(fp, 'seek') and
593 util.safehasattr(fp, 'tell'))
594 util.safehasattr(fp, 'tell'))
594
595
595 def _unpack(self, format):
596 def _unpack(self, format):
596 """unpack this struct format from the stream"""
597 """unpack this struct format from the stream"""
597 data = self._readexact(struct.calcsize(format))
598 data = self._readexact(struct.calcsize(format))
598 return _unpack(format, data)
599 return _unpack(format, data)
599
600
600 def _readexact(self, size):
601 def _readexact(self, size):
601 """read exactly <size> bytes from the stream"""
602 """read exactly <size> bytes from the stream"""
602 return changegroup.readexactly(self._fp, size)
603 return changegroup.readexactly(self._fp, size)
603
604
604 def seek(self, offset, whence=0):
605 def seek(self, offset, whence=0):
605 """move the underlying file pointer"""
606 """move the underlying file pointer"""
606 if self._seekable:
607 if self._seekable:
607 return self._fp.seek(offset, whence)
608 return self._fp.seek(offset, whence)
608 else:
609 else:
609 raise NotImplementedError(_('File pointer is not seekable'))
610 raise NotImplementedError(_('File pointer is not seekable'))
610
611
611 def tell(self):
612 def tell(self):
612 """return the file offset, or None if file is not seekable"""
613 """return the file offset, or None if file is not seekable"""
613 if self._seekable:
614 if self._seekable:
614 try:
615 try:
615 return self._fp.tell()
616 return self._fp.tell()
616 except IOError as e:
617 except IOError as e:
617 if e.errno == errno.ESPIPE:
618 if e.errno == errno.ESPIPE:
618 self._seekable = False
619 self._seekable = False
619 else:
620 else:
620 raise
621 raise
621 return None
622 return None
622
623
623 def close(self):
624 def close(self):
624 """close underlying file"""
625 """close underlying file"""
625 if util.safehasattr(self._fp, 'close'):
626 if util.safehasattr(self._fp, 'close'):
626 return self._fp.close()
627 return self._fp.close()
627
628
628 def getunbundler(ui, fp, magicstring=None):
629 def getunbundler(ui, fp, magicstring=None):
629 """return a valid unbundler object for a given magicstring"""
630 """return a valid unbundler object for a given magicstring"""
630 if magicstring is None:
631 if magicstring is None:
631 magicstring = changegroup.readexactly(fp, 4)
632 magicstring = changegroup.readexactly(fp, 4)
632 magic, version = magicstring[0:2], magicstring[2:4]
633 magic, version = magicstring[0:2], magicstring[2:4]
633 if magic != 'HG':
634 if magic != 'HG':
634 raise error.Abort(_('not a Mercurial bundle'))
635 raise error.Abort(_('not a Mercurial bundle'))
635 unbundlerclass = formatmap.get(version)
636 unbundlerclass = formatmap.get(version)
636 if unbundlerclass is None:
637 if unbundlerclass is None:
637 raise error.Abort(_('unknown bundle version %s') % version)
638 raise error.Abort(_('unknown bundle version %s') % version)
638 unbundler = unbundlerclass(ui, fp)
639 unbundler = unbundlerclass(ui, fp)
639 indebug(ui, 'start processing of %s stream' % magicstring)
640 indebug(ui, 'start processing of %s stream' % magicstring)
640 return unbundler
641 return unbundler
641
642
642 class unbundle20(unpackermixin):
643 class unbundle20(unpackermixin):
643 """interpret a bundle2 stream
644 """interpret a bundle2 stream
644
645
645 This class is fed with a binary stream and yields parts through its
646 This class is fed with a binary stream and yields parts through its
646 `iterparts` methods."""
647 `iterparts` methods."""
647
648
648 _magicstring = 'HG20'
649 _magicstring = 'HG20'
649
650
650 def __init__(self, ui, fp):
651 def __init__(self, ui, fp):
651 """If header is specified, we do not read it out of the stream."""
652 """If header is specified, we do not read it out of the stream."""
652 self.ui = ui
653 self.ui = ui
653 self._decompressor = util.decompressors[None]
654 self._decompressor = util.decompressors[None]
654 super(unbundle20, self).__init__(fp)
655 super(unbundle20, self).__init__(fp)
655
656
656 @util.propertycache
657 @util.propertycache
657 def params(self):
658 def params(self):
658 """dictionary of stream level parameters"""
659 """dictionary of stream level parameters"""
659 indebug(self.ui, 'reading bundle2 stream parameters')
660 indebug(self.ui, 'reading bundle2 stream parameters')
660 params = {}
661 params = {}
661 paramssize = self._unpack(_fstreamparamsize)[0]
662 paramssize = self._unpack(_fstreamparamsize)[0]
662 if paramssize < 0:
663 if paramssize < 0:
663 raise error.BundleValueError('negative bundle param size: %i'
664 raise error.BundleValueError('negative bundle param size: %i'
664 % paramssize)
665 % paramssize)
665 if paramssize:
666 if paramssize:
666 params = self._readexact(paramssize)
667 params = self._readexact(paramssize)
667 params = self._processallparams(params)
668 params = self._processallparams(params)
668 return params
669 return params
669
670
670 def _processallparams(self, paramsblock):
671 def _processallparams(self, paramsblock):
671 """"""
672 """"""
672 params = {}
673 params = {}
673 for p in paramsblock.split(' '):
674 for p in paramsblock.split(' '):
674 p = p.split('=', 1)
675 p = p.split('=', 1)
675 p = [urllib.unquote(i) for i in p]
676 p = [urllib.unquote(i) for i in p]
676 if len(p) < 2:
677 if len(p) < 2:
677 p.append(None)
678 p.append(None)
678 self._processparam(*p)
679 self._processparam(*p)
679 params[p[0]] = p[1]
680 params[p[0]] = p[1]
680 return params
681 return params
681
682
682
683
683 def _processparam(self, name, value):
684 def _processparam(self, name, value):
684 """process a parameter, applying its effect if needed
685 """process a parameter, applying its effect if needed
685
686
686 Parameter starting with a lower case letter are advisory and will be
687 Parameter starting with a lower case letter are advisory and will be
687 ignored when unknown. Those starting with an upper case letter are
688 ignored when unknown. Those starting with an upper case letter are
688 mandatory and will this function will raise a KeyError when unknown.
689 mandatory and will this function will raise a KeyError when unknown.
689
690
690 Note: no option are currently supported. Any input will be either
691 Note: no option are currently supported. Any input will be either
691 ignored or failing.
692 ignored or failing.
692 """
693 """
693 if not name:
694 if not name:
694 raise ValueError('empty parameter name')
695 raise ValueError('empty parameter name')
695 if name[0] not in string.letters:
696 if name[0] not in string.letters:
696 raise ValueError('non letter first character: %r' % name)
697 raise ValueError('non letter first character: %r' % name)
697 try:
698 try:
698 handler = b2streamparamsmap[name.lower()]
699 handler = b2streamparamsmap[name.lower()]
699 except KeyError:
700 except KeyError:
700 if name[0].islower():
701 if name[0].islower():
701 indebug(self.ui, "ignoring unknown parameter %r" % name)
702 indebug(self.ui, "ignoring unknown parameter %r" % name)
702 else:
703 else:
703 raise error.BundleUnknownFeatureError(params=(name,))
704 raise error.BundleUnknownFeatureError(params=(name,))
704 else:
705 else:
705 handler(self, name, value)
706 handler(self, name, value)
706
707
707 def _forwardchunks(self):
708 def _forwardchunks(self):
708 """utility to transfer a bundle2 as binary
709 """utility to transfer a bundle2 as binary
709
710
710 This is made necessary by the fact the 'getbundle' command over 'ssh'
711 This is made necessary by the fact the 'getbundle' command over 'ssh'
711 have no way to know then the reply end, relying on the bundle to be
712 have no way to know then the reply end, relying on the bundle to be
712 interpreted to know its end. This is terrible and we are sorry, but we
713 interpreted to know its end. This is terrible and we are sorry, but we
713 needed to move forward to get general delta enabled.
714 needed to move forward to get general delta enabled.
714 """
715 """
715 yield self._magicstring
716 yield self._magicstring
716 assert 'params' not in vars(self)
717 assert 'params' not in vars(self)
717 paramssize = self._unpack(_fstreamparamsize)[0]
718 paramssize = self._unpack(_fstreamparamsize)[0]
718 if paramssize < 0:
719 if paramssize < 0:
719 raise error.BundleValueError('negative bundle param size: %i'
720 raise error.BundleValueError('negative bundle param size: %i'
720 % paramssize)
721 % paramssize)
721 yield _pack(_fstreamparamsize, paramssize)
722 yield _pack(_fstreamparamsize, paramssize)
722 if paramssize:
723 if paramssize:
723 params = self._readexact(paramssize)
724 params = self._readexact(paramssize)
724 self._processallparams(params)
725 self._processallparams(params)
725 yield params
726 yield params
726 assert self._decompressor is util.decompressors[None]
727 assert self._decompressor is util.decompressors[None]
727 # From there, payload might need to be decompressed
728 # From there, payload might need to be decompressed
728 self._fp = self._decompressor(self._fp)
729 self._fp = self._decompressor(self._fp)
729 emptycount = 0
730 emptycount = 0
730 while emptycount < 2:
731 while emptycount < 2:
731 # so we can brainlessly loop
732 # so we can brainlessly loop
732 assert _fpartheadersize == _fpayloadsize
733 assert _fpartheadersize == _fpayloadsize
733 size = self._unpack(_fpartheadersize)[0]
734 size = self._unpack(_fpartheadersize)[0]
734 yield _pack(_fpartheadersize, size)
735 yield _pack(_fpartheadersize, size)
735 if size:
736 if size:
736 emptycount = 0
737 emptycount = 0
737 else:
738 else:
738 emptycount += 1
739 emptycount += 1
739 continue
740 continue
740 if size == flaginterrupt:
741 if size == flaginterrupt:
741 continue
742 continue
742 elif size < 0:
743 elif size < 0:
743 raise error.BundleValueError('negative chunk size: %i')
744 raise error.BundleValueError('negative chunk size: %i')
744 yield self._readexact(size)
745 yield self._readexact(size)
745
746
746
747
747 def iterparts(self):
748 def iterparts(self):
748 """yield all parts contained in the stream"""
749 """yield all parts contained in the stream"""
749 # make sure param have been loaded
750 # make sure param have been loaded
750 self.params
751 self.params
751 # From there, payload need to be decompressed
752 # From there, payload need to be decompressed
752 self._fp = self._decompressor(self._fp)
753 self._fp = self._decompressor(self._fp)
753 indebug(self.ui, 'start extraction of bundle2 parts')
754 indebug(self.ui, 'start extraction of bundle2 parts')
754 headerblock = self._readpartheader()
755 headerblock = self._readpartheader()
755 while headerblock is not None:
756 while headerblock is not None:
756 part = unbundlepart(self.ui, headerblock, self._fp)
757 part = unbundlepart(self.ui, headerblock, self._fp)
757 yield part
758 yield part
758 part.seek(0, 2)
759 part.seek(0, 2)
759 headerblock = self._readpartheader()
760 headerblock = self._readpartheader()
760 indebug(self.ui, 'end of bundle2 stream')
761 indebug(self.ui, 'end of bundle2 stream')
761
762
762 def _readpartheader(self):
763 def _readpartheader(self):
763 """reads a part header size and return the bytes blob
764 """reads a part header size and return the bytes blob
764
765
765 returns None if empty"""
766 returns None if empty"""
766 headersize = self._unpack(_fpartheadersize)[0]
767 headersize = self._unpack(_fpartheadersize)[0]
767 if headersize < 0:
768 if headersize < 0:
768 raise error.BundleValueError('negative part header size: %i'
769 raise error.BundleValueError('negative part header size: %i'
769 % headersize)
770 % headersize)
770 indebug(self.ui, 'part header size: %i' % headersize)
771 indebug(self.ui, 'part header size: %i' % headersize)
771 if headersize:
772 if headersize:
772 return self._readexact(headersize)
773 return self._readexact(headersize)
773 return None
774 return None
774
775
775 def compressed(self):
776 def compressed(self):
776 return False
777 return False
777
778
778 formatmap = {'20': unbundle20}
779 formatmap = {'20': unbundle20}
779
780
780 b2streamparamsmap = {}
781 b2streamparamsmap = {}
781
782
782 def b2streamparamhandler(name):
783 def b2streamparamhandler(name):
783 """register a handler for a stream level parameter"""
784 """register a handler for a stream level parameter"""
784 def decorator(func):
785 def decorator(func):
785 assert name not in formatmap
786 assert name not in formatmap
786 b2streamparamsmap[name] = func
787 b2streamparamsmap[name] = func
787 return func
788 return func
788 return decorator
789 return decorator
789
790
790 @b2streamparamhandler('compression')
791 @b2streamparamhandler('compression')
791 def processcompression(unbundler, param, value):
792 def processcompression(unbundler, param, value):
792 """read compression parameter and install payload decompression"""
793 """read compression parameter and install payload decompression"""
793 if value not in util.decompressors:
794 if value not in util.decompressors:
794 raise error.BundleUnknownFeatureError(params=(param,),
795 raise error.BundleUnknownFeatureError(params=(param,),
795 values=(value,))
796 values=(value,))
796 unbundler._decompressor = util.decompressors[value]
797 unbundler._decompressor = util.decompressors[value]
797
798
798 class bundlepart(object):
799 class bundlepart(object):
799 """A bundle2 part contains application level payload
800 """A bundle2 part contains application level payload
800
801
801 The part `type` is used to route the part to the application level
802 The part `type` is used to route the part to the application level
802 handler.
803 handler.
803
804
804 The part payload is contained in ``part.data``. It could be raw bytes or a
805 The part payload is contained in ``part.data``. It could be raw bytes or a
805 generator of byte chunks.
806 generator of byte chunks.
806
807
807 You can add parameters to the part using the ``addparam`` method.
808 You can add parameters to the part using the ``addparam`` method.
808 Parameters can be either mandatory (default) or advisory. Remote side
809 Parameters can be either mandatory (default) or advisory. Remote side
809 should be able to safely ignore the advisory ones.
810 should be able to safely ignore the advisory ones.
810
811
811 Both data and parameters cannot be modified after the generation has begun.
812 Both data and parameters cannot be modified after the generation has begun.
812 """
813 """
813
814
814 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
815 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
815 data='', mandatory=True):
816 data='', mandatory=True):
816 validateparttype(parttype)
817 validateparttype(parttype)
817 self.id = None
818 self.id = None
818 self.type = parttype
819 self.type = parttype
819 self._data = data
820 self._data = data
820 self._mandatoryparams = list(mandatoryparams)
821 self._mandatoryparams = list(mandatoryparams)
821 self._advisoryparams = list(advisoryparams)
822 self._advisoryparams = list(advisoryparams)
822 # checking for duplicated entries
823 # checking for duplicated entries
823 self._seenparams = set()
824 self._seenparams = set()
824 for pname, __ in self._mandatoryparams + self._advisoryparams:
825 for pname, __ in self._mandatoryparams + self._advisoryparams:
825 if pname in self._seenparams:
826 if pname in self._seenparams:
826 raise RuntimeError('duplicated params: %s' % pname)
827 raise RuntimeError('duplicated params: %s' % pname)
827 self._seenparams.add(pname)
828 self._seenparams.add(pname)
828 # status of the part's generation:
829 # status of the part's generation:
829 # - None: not started,
830 # - None: not started,
830 # - False: currently generated,
831 # - False: currently generated,
831 # - True: generation done.
832 # - True: generation done.
832 self._generated = None
833 self._generated = None
833 self.mandatory = mandatory
834 self.mandatory = mandatory
834
835
835 def copy(self):
836 def copy(self):
836 """return a copy of the part
837 """return a copy of the part
837
838
838 The new part have the very same content but no partid assigned yet.
839 The new part have the very same content but no partid assigned yet.
839 Parts with generated data cannot be copied."""
840 Parts with generated data cannot be copied."""
840 assert not util.safehasattr(self.data, 'next')
841 assert not util.safehasattr(self.data, 'next')
841 return self.__class__(self.type, self._mandatoryparams,
842 return self.__class__(self.type, self._mandatoryparams,
842 self._advisoryparams, self._data, self.mandatory)
843 self._advisoryparams, self._data, self.mandatory)
843
844
844 # methods used to defines the part content
845 # methods used to defines the part content
845 def __setdata(self, data):
846 def __setdata(self, data):
846 if self._generated is not None:
847 if self._generated is not None:
847 raise error.ReadOnlyPartError('part is being generated')
848 raise error.ReadOnlyPartError('part is being generated')
848 self._data = data
849 self._data = data
849 def __getdata(self):
850 def __getdata(self):
850 return self._data
851 return self._data
851 data = property(__getdata, __setdata)
852 data = property(__getdata, __setdata)
852
853
853 @property
854 @property
854 def mandatoryparams(self):
855 def mandatoryparams(self):
855 # make it an immutable tuple to force people through ``addparam``
856 # make it an immutable tuple to force people through ``addparam``
856 return tuple(self._mandatoryparams)
857 return tuple(self._mandatoryparams)
857
858
858 @property
859 @property
859 def advisoryparams(self):
860 def advisoryparams(self):
860 # make it an immutable tuple to force people through ``addparam``
861 # make it an immutable tuple to force people through ``addparam``
861 return tuple(self._advisoryparams)
862 return tuple(self._advisoryparams)
862
863
863 def addparam(self, name, value='', mandatory=True):
864 def addparam(self, name, value='', mandatory=True):
864 if self._generated is not None:
865 if self._generated is not None:
865 raise error.ReadOnlyPartError('part is being generated')
866 raise error.ReadOnlyPartError('part is being generated')
866 if name in self._seenparams:
867 if name in self._seenparams:
867 raise ValueError('duplicated params: %s' % name)
868 raise ValueError('duplicated params: %s' % name)
868 self._seenparams.add(name)
869 self._seenparams.add(name)
869 params = self._advisoryparams
870 params = self._advisoryparams
870 if mandatory:
871 if mandatory:
871 params = self._mandatoryparams
872 params = self._mandatoryparams
872 params.append((name, value))
873 params.append((name, value))
873
874
874 # methods used to generates the bundle2 stream
875 # methods used to generates the bundle2 stream
875 def getchunks(self, ui):
876 def getchunks(self, ui):
876 if self._generated is not None:
877 if self._generated is not None:
877 raise RuntimeError('part can only be consumed once')
878 raise RuntimeError('part can only be consumed once')
878 self._generated = False
879 self._generated = False
879
880
880 if ui.debugflag:
881 if ui.debugflag:
881 msg = ['bundle2-output-part: "%s"' % self.type]
882 msg = ['bundle2-output-part: "%s"' % self.type]
882 if not self.mandatory:
883 if not self.mandatory:
883 msg.append(' (advisory)')
884 msg.append(' (advisory)')
884 nbmp = len(self.mandatoryparams)
885 nbmp = len(self.mandatoryparams)
885 nbap = len(self.advisoryparams)
886 nbap = len(self.advisoryparams)
886 if nbmp or nbap:
887 if nbmp or nbap:
887 msg.append(' (params:')
888 msg.append(' (params:')
888 if nbmp:
889 if nbmp:
889 msg.append(' %i mandatory' % nbmp)
890 msg.append(' %i mandatory' % nbmp)
890 if nbap:
891 if nbap:
891 msg.append(' %i advisory' % nbmp)
892 msg.append(' %i advisory' % nbmp)
892 msg.append(')')
893 msg.append(')')
893 if not self.data:
894 if not self.data:
894 msg.append(' empty payload')
895 msg.append(' empty payload')
895 elif util.safehasattr(self.data, 'next'):
896 elif util.safehasattr(self.data, 'next'):
896 msg.append(' streamed payload')
897 msg.append(' streamed payload')
897 else:
898 else:
898 msg.append(' %i bytes payload' % len(self.data))
899 msg.append(' %i bytes payload' % len(self.data))
899 msg.append('\n')
900 msg.append('\n')
900 ui.debug(''.join(msg))
901 ui.debug(''.join(msg))
901
902
902 #### header
903 #### header
903 if self.mandatory:
904 if self.mandatory:
904 parttype = self.type.upper()
905 parttype = self.type.upper()
905 else:
906 else:
906 parttype = self.type.lower()
907 parttype = self.type.lower()
907 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
908 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
908 ## parttype
909 ## parttype
909 header = [_pack(_fparttypesize, len(parttype)),
910 header = [_pack(_fparttypesize, len(parttype)),
910 parttype, _pack(_fpartid, self.id),
911 parttype, _pack(_fpartid, self.id),
911 ]
912 ]
912 ## parameters
913 ## parameters
913 # count
914 # count
914 manpar = self.mandatoryparams
915 manpar = self.mandatoryparams
915 advpar = self.advisoryparams
916 advpar = self.advisoryparams
916 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
917 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
917 # size
918 # size
918 parsizes = []
919 parsizes = []
919 for key, value in manpar:
920 for key, value in manpar:
920 parsizes.append(len(key))
921 parsizes.append(len(key))
921 parsizes.append(len(value))
922 parsizes.append(len(value))
922 for key, value in advpar:
923 for key, value in advpar:
923 parsizes.append(len(key))
924 parsizes.append(len(key))
924 parsizes.append(len(value))
925 parsizes.append(len(value))
925 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
926 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
926 header.append(paramsizes)
927 header.append(paramsizes)
927 # key, value
928 # key, value
928 for key, value in manpar:
929 for key, value in manpar:
929 header.append(key)
930 header.append(key)
930 header.append(value)
931 header.append(value)
931 for key, value in advpar:
932 for key, value in advpar:
932 header.append(key)
933 header.append(key)
933 header.append(value)
934 header.append(value)
934 ## finalize header
935 ## finalize header
935 headerchunk = ''.join(header)
936 headerchunk = ''.join(header)
936 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
937 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
937 yield _pack(_fpartheadersize, len(headerchunk))
938 yield _pack(_fpartheadersize, len(headerchunk))
938 yield headerchunk
939 yield headerchunk
939 ## payload
940 ## payload
940 try:
941 try:
941 for chunk in self._payloadchunks():
942 for chunk in self._payloadchunks():
942 outdebug(ui, 'payload chunk size: %i' % len(chunk))
943 outdebug(ui, 'payload chunk size: %i' % len(chunk))
943 yield _pack(_fpayloadsize, len(chunk))
944 yield _pack(_fpayloadsize, len(chunk))
944 yield chunk
945 yield chunk
945 except GeneratorExit:
946 except GeneratorExit:
946 # GeneratorExit means that nobody is listening for our
947 # GeneratorExit means that nobody is listening for our
947 # results anyway, so just bail quickly rather than trying
948 # results anyway, so just bail quickly rather than trying
948 # to produce an error part.
949 # to produce an error part.
949 ui.debug('bundle2-generatorexit\n')
950 ui.debug('bundle2-generatorexit\n')
950 raise
951 raise
951 except BaseException as exc:
952 except BaseException as exc:
952 # backup exception data for later
953 # backup exception data for later
953 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
954 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
954 % exc)
955 % exc)
955 exc_info = sys.exc_info()
956 exc_info = sys.exc_info()
956 msg = 'unexpected error: %s' % exc
957 msg = 'unexpected error: %s' % exc
957 interpart = bundlepart('error:abort', [('message', msg)],
958 interpart = bundlepart('error:abort', [('message', msg)],
958 mandatory=False)
959 mandatory=False)
959 interpart.id = 0
960 interpart.id = 0
960 yield _pack(_fpayloadsize, -1)
961 yield _pack(_fpayloadsize, -1)
961 for chunk in interpart.getchunks(ui=ui):
962 for chunk in interpart.getchunks(ui=ui):
962 yield chunk
963 yield chunk
963 outdebug(ui, 'closing payload chunk')
964 outdebug(ui, 'closing payload chunk')
964 # abort current part payload
965 # abort current part payload
965 yield _pack(_fpayloadsize, 0)
966 yield _pack(_fpayloadsize, 0)
966 raise exc_info[0], exc_info[1], exc_info[2]
967 raise exc_info[0], exc_info[1], exc_info[2]
967 # end of payload
968 # end of payload
968 outdebug(ui, 'closing payload chunk')
969 outdebug(ui, 'closing payload chunk')
969 yield _pack(_fpayloadsize, 0)
970 yield _pack(_fpayloadsize, 0)
970 self._generated = True
971 self._generated = True
971
972
972 def _payloadchunks(self):
973 def _payloadchunks(self):
973 """yield chunks of a the part payload
974 """yield chunks of a the part payload
974
975
975 Exists to handle the different methods to provide data to a part."""
976 Exists to handle the different methods to provide data to a part."""
976 # we only support fixed size data now.
977 # we only support fixed size data now.
977 # This will be improved in the future.
978 # This will be improved in the future.
978 if util.safehasattr(self.data, 'next'):
979 if util.safehasattr(self.data, 'next'):
979 buff = util.chunkbuffer(self.data)
980 buff = util.chunkbuffer(self.data)
980 chunk = buff.read(preferedchunksize)
981 chunk = buff.read(preferedchunksize)
981 while chunk:
982 while chunk:
982 yield chunk
983 yield chunk
983 chunk = buff.read(preferedchunksize)
984 chunk = buff.read(preferedchunksize)
984 elif len(self.data):
985 elif len(self.data):
985 yield self.data
986 yield self.data
986
987
987
988
988 flaginterrupt = -1
989 flaginterrupt = -1
989
990
990 class interrupthandler(unpackermixin):
991 class interrupthandler(unpackermixin):
991 """read one part and process it with restricted capability
992 """read one part and process it with restricted capability
992
993
993 This allows to transmit exception raised on the producer size during part
994 This allows to transmit exception raised on the producer size during part
994 iteration while the consumer is reading a part.
995 iteration while the consumer is reading a part.
995
996
996 Part processed in this manner only have access to a ui object,"""
997 Part processed in this manner only have access to a ui object,"""
997
998
998 def __init__(self, ui, fp):
999 def __init__(self, ui, fp):
999 super(interrupthandler, self).__init__(fp)
1000 super(interrupthandler, self).__init__(fp)
1000 self.ui = ui
1001 self.ui = ui
1001
1002
1002 def _readpartheader(self):
1003 def _readpartheader(self):
1003 """reads a part header size and return the bytes blob
1004 """reads a part header size and return the bytes blob
1004
1005
1005 returns None if empty"""
1006 returns None if empty"""
1006 headersize = self._unpack(_fpartheadersize)[0]
1007 headersize = self._unpack(_fpartheadersize)[0]
1007 if headersize < 0:
1008 if headersize < 0:
1008 raise error.BundleValueError('negative part header size: %i'
1009 raise error.BundleValueError('negative part header size: %i'
1009 % headersize)
1010 % headersize)
1010 indebug(self.ui, 'part header size: %i\n' % headersize)
1011 indebug(self.ui, 'part header size: %i\n' % headersize)
1011 if headersize:
1012 if headersize:
1012 return self._readexact(headersize)
1013 return self._readexact(headersize)
1013 return None
1014 return None
1014
1015
1015 def __call__(self):
1016 def __call__(self):
1016
1017
1017 self.ui.debug('bundle2-input-stream-interrupt:'
1018 self.ui.debug('bundle2-input-stream-interrupt:'
1018 ' opening out of band context\n')
1019 ' opening out of band context\n')
1019 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1020 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1020 headerblock = self._readpartheader()
1021 headerblock = self._readpartheader()
1021 if headerblock is None:
1022 if headerblock is None:
1022 indebug(self.ui, 'no part found during interruption.')
1023 indebug(self.ui, 'no part found during interruption.')
1023 return
1024 return
1024 part = unbundlepart(self.ui, headerblock, self._fp)
1025 part = unbundlepart(self.ui, headerblock, self._fp)
1025 op = interruptoperation(self.ui)
1026 op = interruptoperation(self.ui)
1026 _processpart(op, part)
1027 _processpart(op, part)
1027 self.ui.debug('bundle2-input-stream-interrupt:'
1028 self.ui.debug('bundle2-input-stream-interrupt:'
1028 ' closing out of band context\n')
1029 ' closing out of band context\n')
1029
1030
1030 class interruptoperation(object):
1031 class interruptoperation(object):
1031 """A limited operation to be use by part handler during interruption
1032 """A limited operation to be use by part handler during interruption
1032
1033
1033 It only have access to an ui object.
1034 It only have access to an ui object.
1034 """
1035 """
1035
1036
1036 def __init__(self, ui):
1037 def __init__(self, ui):
1037 self.ui = ui
1038 self.ui = ui
1038 self.reply = None
1039 self.reply = None
1039 self.captureoutput = False
1040 self.captureoutput = False
1040
1041
1041 @property
1042 @property
1042 def repo(self):
1043 def repo(self):
1043 raise RuntimeError('no repo access from stream interruption')
1044 raise RuntimeError('no repo access from stream interruption')
1044
1045
1045 def gettransaction(self):
1046 def gettransaction(self):
1046 raise TransactionUnavailable('no repo access from stream interruption')
1047 raise TransactionUnavailable('no repo access from stream interruption')
1047
1048
1048 class unbundlepart(unpackermixin):
1049 class unbundlepart(unpackermixin):
1049 """a bundle part read from a bundle"""
1050 """a bundle part read from a bundle"""
1050
1051
1051 def __init__(self, ui, header, fp):
1052 def __init__(self, ui, header, fp):
1052 super(unbundlepart, self).__init__(fp)
1053 super(unbundlepart, self).__init__(fp)
1053 self.ui = ui
1054 self.ui = ui
1054 # unbundle state attr
1055 # unbundle state attr
1055 self._headerdata = header
1056 self._headerdata = header
1056 self._headeroffset = 0
1057 self._headeroffset = 0
1057 self._initialized = False
1058 self._initialized = False
1058 self.consumed = False
1059 self.consumed = False
1059 # part data
1060 # part data
1060 self.id = None
1061 self.id = None
1061 self.type = None
1062 self.type = None
1062 self.mandatoryparams = None
1063 self.mandatoryparams = None
1063 self.advisoryparams = None
1064 self.advisoryparams = None
1064 self.params = None
1065 self.params = None
1065 self.mandatorykeys = ()
1066 self.mandatorykeys = ()
1066 self._payloadstream = None
1067 self._payloadstream = None
1067 self._readheader()
1068 self._readheader()
1068 self._mandatory = None
1069 self._mandatory = None
1069 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1070 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1070 self._pos = 0
1071 self._pos = 0
1071
1072
1072 def _fromheader(self, size):
1073 def _fromheader(self, size):
1073 """return the next <size> byte from the header"""
1074 """return the next <size> byte from the header"""
1074 offset = self._headeroffset
1075 offset = self._headeroffset
1075 data = self._headerdata[offset:(offset + size)]
1076 data = self._headerdata[offset:(offset + size)]
1076 self._headeroffset = offset + size
1077 self._headeroffset = offset + size
1077 return data
1078 return data
1078
1079
1079 def _unpackheader(self, format):
1080 def _unpackheader(self, format):
1080 """read given format from header
1081 """read given format from header
1081
1082
1082 This automatically compute the size of the format to read."""
1083 This automatically compute the size of the format to read."""
1083 data = self._fromheader(struct.calcsize(format))
1084 data = self._fromheader(struct.calcsize(format))
1084 return _unpack(format, data)
1085 return _unpack(format, data)
1085
1086
1086 def _initparams(self, mandatoryparams, advisoryparams):
1087 def _initparams(self, mandatoryparams, advisoryparams):
1087 """internal function to setup all logic related parameters"""
1088 """internal function to setup all logic related parameters"""
1088 # make it read only to prevent people touching it by mistake.
1089 # make it read only to prevent people touching it by mistake.
1089 self.mandatoryparams = tuple(mandatoryparams)
1090 self.mandatoryparams = tuple(mandatoryparams)
1090 self.advisoryparams = tuple(advisoryparams)
1091 self.advisoryparams = tuple(advisoryparams)
1091 # user friendly UI
1092 # user friendly UI
1092 self.params = dict(self.mandatoryparams)
1093 self.params = dict(self.mandatoryparams)
1093 self.params.update(dict(self.advisoryparams))
1094 self.params.update(dict(self.advisoryparams))
1094 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1095 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1095
1096
1096 def _payloadchunks(self, chunknum=0):
1097 def _payloadchunks(self, chunknum=0):
1097 '''seek to specified chunk and start yielding data'''
1098 '''seek to specified chunk and start yielding data'''
1098 if len(self._chunkindex) == 0:
1099 if len(self._chunkindex) == 0:
1099 assert chunknum == 0, 'Must start with chunk 0'
1100 assert chunknum == 0, 'Must start with chunk 0'
1100 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1101 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1101 else:
1102 else:
1102 assert chunknum < len(self._chunkindex), \
1103 assert chunknum < len(self._chunkindex), \
1103 'Unknown chunk %d' % chunknum
1104 'Unknown chunk %d' % chunknum
1104 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1105 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1105
1106
1106 pos = self._chunkindex[chunknum][0]
1107 pos = self._chunkindex[chunknum][0]
1107 payloadsize = self._unpack(_fpayloadsize)[0]
1108 payloadsize = self._unpack(_fpayloadsize)[0]
1108 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1109 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1109 while payloadsize:
1110 while payloadsize:
1110 if payloadsize == flaginterrupt:
1111 if payloadsize == flaginterrupt:
1111 # interruption detection, the handler will now read a
1112 # interruption detection, the handler will now read a
1112 # single part and process it.
1113 # single part and process it.
1113 interrupthandler(self.ui, self._fp)()
1114 interrupthandler(self.ui, self._fp)()
1114 elif payloadsize < 0:
1115 elif payloadsize < 0:
1115 msg = 'negative payload chunk size: %i' % payloadsize
1116 msg = 'negative payload chunk size: %i' % payloadsize
1116 raise error.BundleValueError(msg)
1117 raise error.BundleValueError(msg)
1117 else:
1118 else:
1118 result = self._readexact(payloadsize)
1119 result = self._readexact(payloadsize)
1119 chunknum += 1
1120 chunknum += 1
1120 pos += payloadsize
1121 pos += payloadsize
1121 if chunknum == len(self._chunkindex):
1122 if chunknum == len(self._chunkindex):
1122 self._chunkindex.append((pos,
1123 self._chunkindex.append((pos,
1123 super(unbundlepart, self).tell()))
1124 super(unbundlepart, self).tell()))
1124 yield result
1125 yield result
1125 payloadsize = self._unpack(_fpayloadsize)[0]
1126 payloadsize = self._unpack(_fpayloadsize)[0]
1126 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1127 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1127
1128
1128 def _findchunk(self, pos):
1129 def _findchunk(self, pos):
1129 '''for a given payload position, return a chunk number and offset'''
1130 '''for a given payload position, return a chunk number and offset'''
1130 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1131 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1131 if ppos == pos:
1132 if ppos == pos:
1132 return chunk, 0
1133 return chunk, 0
1133 elif ppos > pos:
1134 elif ppos > pos:
1134 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1135 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1135 raise ValueError('Unknown chunk')
1136 raise ValueError('Unknown chunk')
1136
1137
1137 def _readheader(self):
1138 def _readheader(self):
1138 """read the header and setup the object"""
1139 """read the header and setup the object"""
1139 typesize = self._unpackheader(_fparttypesize)[0]
1140 typesize = self._unpackheader(_fparttypesize)[0]
1140 self.type = self._fromheader(typesize)
1141 self.type = self._fromheader(typesize)
1141 indebug(self.ui, 'part type: "%s"' % self.type)
1142 indebug(self.ui, 'part type: "%s"' % self.type)
1142 self.id = self._unpackheader(_fpartid)[0]
1143 self.id = self._unpackheader(_fpartid)[0]
1143 indebug(self.ui, 'part id: "%s"' % self.id)
1144 indebug(self.ui, 'part id: "%s"' % self.id)
1144 # extract mandatory bit from type
1145 # extract mandatory bit from type
1145 self.mandatory = (self.type != self.type.lower())
1146 self.mandatory = (self.type != self.type.lower())
1146 self.type = self.type.lower()
1147 self.type = self.type.lower()
1147 ## reading parameters
1148 ## reading parameters
1148 # param count
1149 # param count
1149 mancount, advcount = self._unpackheader(_fpartparamcount)
1150 mancount, advcount = self._unpackheader(_fpartparamcount)
1150 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1151 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1151 # param size
1152 # param size
1152 fparamsizes = _makefpartparamsizes(mancount + advcount)
1153 fparamsizes = _makefpartparamsizes(mancount + advcount)
1153 paramsizes = self._unpackheader(fparamsizes)
1154 paramsizes = self._unpackheader(fparamsizes)
1154 # make it a list of couple again
1155 # make it a list of couple again
1155 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1156 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1156 # split mandatory from advisory
1157 # split mandatory from advisory
1157 mansizes = paramsizes[:mancount]
1158 mansizes = paramsizes[:mancount]
1158 advsizes = paramsizes[mancount:]
1159 advsizes = paramsizes[mancount:]
1159 # retrieve param value
1160 # retrieve param value
1160 manparams = []
1161 manparams = []
1161 for key, value in mansizes:
1162 for key, value in mansizes:
1162 manparams.append((self._fromheader(key), self._fromheader(value)))
1163 manparams.append((self._fromheader(key), self._fromheader(value)))
1163 advparams = []
1164 advparams = []
1164 for key, value in advsizes:
1165 for key, value in advsizes:
1165 advparams.append((self._fromheader(key), self._fromheader(value)))
1166 advparams.append((self._fromheader(key), self._fromheader(value)))
1166 self._initparams(manparams, advparams)
1167 self._initparams(manparams, advparams)
1167 ## part payload
1168 ## part payload
1168 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1169 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1169 # we read the data, tell it
1170 # we read the data, tell it
1170 self._initialized = True
1171 self._initialized = True
1171
1172
1172 def read(self, size=None):
1173 def read(self, size=None):
1173 """read payload data"""
1174 """read payload data"""
1174 if not self._initialized:
1175 if not self._initialized:
1175 self._readheader()
1176 self._readheader()
1176 if size is None:
1177 if size is None:
1177 data = self._payloadstream.read()
1178 data = self._payloadstream.read()
1178 else:
1179 else:
1179 data = self._payloadstream.read(size)
1180 data = self._payloadstream.read(size)
1180 self._pos += len(data)
1181 self._pos += len(data)
1181 if size is None or len(data) < size:
1182 if size is None or len(data) < size:
1182 if not self.consumed and self._pos:
1183 if not self.consumed and self._pos:
1183 self.ui.debug('bundle2-input-part: total payload size %i\n'
1184 self.ui.debug('bundle2-input-part: total payload size %i\n'
1184 % self._pos)
1185 % self._pos)
1185 self.consumed = True
1186 self.consumed = True
1186 return data
1187 return data
1187
1188
1188 def tell(self):
1189 def tell(self):
1189 return self._pos
1190 return self._pos
1190
1191
1191 def seek(self, offset, whence=0):
1192 def seek(self, offset, whence=0):
1192 if whence == 0:
1193 if whence == 0:
1193 newpos = offset
1194 newpos = offset
1194 elif whence == 1:
1195 elif whence == 1:
1195 newpos = self._pos + offset
1196 newpos = self._pos + offset
1196 elif whence == 2:
1197 elif whence == 2:
1197 if not self.consumed:
1198 if not self.consumed:
1198 self.read()
1199 self.read()
1199 newpos = self._chunkindex[-1][0] - offset
1200 newpos = self._chunkindex[-1][0] - offset
1200 else:
1201 else:
1201 raise ValueError('Unknown whence value: %r' % (whence,))
1202 raise ValueError('Unknown whence value: %r' % (whence,))
1202
1203
1203 if newpos > self._chunkindex[-1][0] and not self.consumed:
1204 if newpos > self._chunkindex[-1][0] and not self.consumed:
1204 self.read()
1205 self.read()
1205 if not 0 <= newpos <= self._chunkindex[-1][0]:
1206 if not 0 <= newpos <= self._chunkindex[-1][0]:
1206 raise ValueError('Offset out of range')
1207 raise ValueError('Offset out of range')
1207
1208
1208 if self._pos != newpos:
1209 if self._pos != newpos:
1209 chunk, internaloffset = self._findchunk(newpos)
1210 chunk, internaloffset = self._findchunk(newpos)
1210 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1211 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1211 adjust = self.read(internaloffset)
1212 adjust = self.read(internaloffset)
1212 if len(adjust) != internaloffset:
1213 if len(adjust) != internaloffset:
1213 raise error.Abort(_('Seek failed\n'))
1214 raise error.Abort(_('Seek failed\n'))
1214 self._pos = newpos
1215 self._pos = newpos
1215
1216
1216 # These are only the static capabilities.
1217 # These are only the static capabilities.
1217 # Check the 'getrepocaps' function for the rest.
1218 # Check the 'getrepocaps' function for the rest.
1218 capabilities = {'HG20': (),
1219 capabilities = {'HG20': (),
1219 'error': ('abort', 'unsupportedcontent', 'pushraced',
1220 'error': ('abort', 'unsupportedcontent', 'pushraced',
1220 'pushkey'),
1221 'pushkey'),
1221 'listkeys': (),
1222 'listkeys': (),
1222 'pushkey': (),
1223 'pushkey': (),
1223 'digests': tuple(sorted(util.DIGESTS.keys())),
1224 'digests': tuple(sorted(util.DIGESTS.keys())),
1224 'remote-changegroup': ('http', 'https'),
1225 'remote-changegroup': ('http', 'https'),
1225 'hgtagsfnodes': (),
1226 'hgtagsfnodes': (),
1226 }
1227 }
1227
1228
1228 def getrepocaps(repo, allowpushback=False):
1229 def getrepocaps(repo, allowpushback=False):
1229 """return the bundle2 capabilities for a given repo
1230 """return the bundle2 capabilities for a given repo
1230
1231
1231 Exists to allow extensions (like evolution) to mutate the capabilities.
1232 Exists to allow extensions (like evolution) to mutate the capabilities.
1232 """
1233 """
1233 caps = capabilities.copy()
1234 caps = capabilities.copy()
1234 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1235 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1235 if obsolete.isenabled(repo, obsolete.exchangeopt):
1236 if obsolete.isenabled(repo, obsolete.exchangeopt):
1236 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1237 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1237 caps['obsmarkers'] = supportedformat
1238 caps['obsmarkers'] = supportedformat
1238 if allowpushback:
1239 if allowpushback:
1239 caps['pushback'] = ()
1240 caps['pushback'] = ()
1240 return caps
1241 return caps
1241
1242
1242 def bundle2caps(remote):
1243 def bundle2caps(remote):
1243 """return the bundle capabilities of a peer as dict"""
1244 """return the bundle capabilities of a peer as dict"""
1244 raw = remote.capable('bundle2')
1245 raw = remote.capable('bundle2')
1245 if not raw and raw != '':
1246 if not raw and raw != '':
1246 return {}
1247 return {}
1247 capsblob = urllib.unquote(remote.capable('bundle2'))
1248 capsblob = urllib.unquote(remote.capable('bundle2'))
1248 return decodecaps(capsblob)
1249 return decodecaps(capsblob)
1249
1250
1250 def obsmarkersversion(caps):
1251 def obsmarkersversion(caps):
1251 """extract the list of supported obsmarkers versions from a bundle2caps dict
1252 """extract the list of supported obsmarkers versions from a bundle2caps dict
1252 """
1253 """
1253 obscaps = caps.get('obsmarkers', ())
1254 obscaps = caps.get('obsmarkers', ())
1254 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1255 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1255
1256
1256 @parthandler('changegroup', ('version', 'nbchanges'))
1257 @parthandler('changegroup', ('version', 'nbchanges'))
1257 def handlechangegroup(op, inpart):
1258 def handlechangegroup(op, inpart):
1258 """apply a changegroup part on the repo
1259 """apply a changegroup part on the repo
1259
1260
1260 This is a very early implementation that will massive rework before being
1261 This is a very early implementation that will massive rework before being
1261 inflicted to any end-user.
1262 inflicted to any end-user.
1262 """
1263 """
1263 # Make sure we trigger a transaction creation
1264 # Make sure we trigger a transaction creation
1264 #
1265 #
1265 # The addchangegroup function will get a transaction object by itself, but
1266 # The addchangegroup function will get a transaction object by itself, but
1266 # we need to make sure we trigger the creation of a transaction object used
1267 # we need to make sure we trigger the creation of a transaction object used
1267 # for the whole processing scope.
1268 # for the whole processing scope.
1268 op.gettransaction()
1269 op.gettransaction()
1269 unpackerversion = inpart.params.get('version', '01')
1270 unpackerversion = inpart.params.get('version', '01')
1270 # We should raise an appropriate exception here
1271 # We should raise an appropriate exception here
1271 unpacker = changegroup.packermap[unpackerversion][1]
1272 unpacker = changegroup.packermap[unpackerversion][1]
1272 cg = unpacker(inpart, None)
1273 cg = unpacker(inpart, None)
1273 # the source and url passed here are overwritten by the one contained in
1274 # the source and url passed here are overwritten by the one contained in
1274 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1275 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1275 nbchangesets = None
1276 nbchangesets = None
1276 if 'nbchanges' in inpart.params:
1277 if 'nbchanges' in inpart.params:
1277 nbchangesets = int(inpart.params.get('nbchanges'))
1278 nbchangesets = int(inpart.params.get('nbchanges'))
1278 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1279 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1279 op.records.add('changegroup', {'return': ret})
1280 op.records.add('changegroup', {'return': ret})
1280 if op.reply is not None:
1281 if op.reply is not None:
1281 # This is definitely not the final form of this
1282 # This is definitely not the final form of this
1282 # return. But one need to start somewhere.
1283 # return. But one need to start somewhere.
1283 part = op.reply.newpart('reply:changegroup', mandatory=False)
1284 part = op.reply.newpart('reply:changegroup', mandatory=False)
1284 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1285 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1285 part.addparam('return', '%i' % ret, mandatory=False)
1286 part.addparam('return', '%i' % ret, mandatory=False)
1286 assert not inpart.read()
1287 assert not inpart.read()
1287
1288
1288 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1289 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1289 ['digest:%s' % k for k in util.DIGESTS.keys()])
1290 ['digest:%s' % k for k in util.DIGESTS.keys()])
1290 @parthandler('remote-changegroup', _remotechangegroupparams)
1291 @parthandler('remote-changegroup', _remotechangegroupparams)
1291 def handleremotechangegroup(op, inpart):
1292 def handleremotechangegroup(op, inpart):
1292 """apply a bundle10 on the repo, given an url and validation information
1293 """apply a bundle10 on the repo, given an url and validation information
1293
1294
1294 All the information about the remote bundle to import are given as
1295 All the information about the remote bundle to import are given as
1295 parameters. The parameters include:
1296 parameters. The parameters include:
1296 - url: the url to the bundle10.
1297 - url: the url to the bundle10.
1297 - size: the bundle10 file size. It is used to validate what was
1298 - size: the bundle10 file size. It is used to validate what was
1298 retrieved by the client matches the server knowledge about the bundle.
1299 retrieved by the client matches the server knowledge about the bundle.
1299 - digests: a space separated list of the digest types provided as
1300 - digests: a space separated list of the digest types provided as
1300 parameters.
1301 parameters.
1301 - digest:<digest-type>: the hexadecimal representation of the digest with
1302 - digest:<digest-type>: the hexadecimal representation of the digest with
1302 that name. Like the size, it is used to validate what was retrieved by
1303 that name. Like the size, it is used to validate what was retrieved by
1303 the client matches what the server knows about the bundle.
1304 the client matches what the server knows about the bundle.
1304
1305
1305 When multiple digest types are given, all of them are checked.
1306 When multiple digest types are given, all of them are checked.
1306 """
1307 """
1307 try:
1308 try:
1308 raw_url = inpart.params['url']
1309 raw_url = inpart.params['url']
1309 except KeyError:
1310 except KeyError:
1310 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1311 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1311 parsed_url = util.url(raw_url)
1312 parsed_url = util.url(raw_url)
1312 if parsed_url.scheme not in capabilities['remote-changegroup']:
1313 if parsed_url.scheme not in capabilities['remote-changegroup']:
1313 raise error.Abort(_('remote-changegroup does not support %s urls') %
1314 raise error.Abort(_('remote-changegroup does not support %s urls') %
1314 parsed_url.scheme)
1315 parsed_url.scheme)
1315
1316
1316 try:
1317 try:
1317 size = int(inpart.params['size'])
1318 size = int(inpart.params['size'])
1318 except ValueError:
1319 except ValueError:
1319 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1320 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1320 % 'size')
1321 % 'size')
1321 except KeyError:
1322 except KeyError:
1322 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1323 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1323
1324
1324 digests = {}
1325 digests = {}
1325 for typ in inpart.params.get('digests', '').split():
1326 for typ in inpart.params.get('digests', '').split():
1326 param = 'digest:%s' % typ
1327 param = 'digest:%s' % typ
1327 try:
1328 try:
1328 value = inpart.params[param]
1329 value = inpart.params[param]
1329 except KeyError:
1330 except KeyError:
1330 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1331 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1331 param)
1332 param)
1332 digests[typ] = value
1333 digests[typ] = value
1333
1334
1334 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1335 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1335
1336
1336 # Make sure we trigger a transaction creation
1337 # Make sure we trigger a transaction creation
1337 #
1338 #
1338 # The addchangegroup function will get a transaction object by itself, but
1339 # The addchangegroup function will get a transaction object by itself, but
1339 # we need to make sure we trigger the creation of a transaction object used
1340 # we need to make sure we trigger the creation of a transaction object used
1340 # for the whole processing scope.
1341 # for the whole processing scope.
1341 op.gettransaction()
1342 op.gettransaction()
1342 from . import exchange
1343 from . import exchange
1343 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1344 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1344 if not isinstance(cg, changegroup.cg1unpacker):
1345 if not isinstance(cg, changegroup.cg1unpacker):
1345 raise error.Abort(_('%s: not a bundle version 1.0') %
1346 raise error.Abort(_('%s: not a bundle version 1.0') %
1346 util.hidepassword(raw_url))
1347 util.hidepassword(raw_url))
1347 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1348 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1348 op.records.add('changegroup', {'return': ret})
1349 op.records.add('changegroup', {'return': ret})
1349 if op.reply is not None:
1350 if op.reply is not None:
1350 # This is definitely not the final form of this
1351 # This is definitely not the final form of this
1351 # return. But one need to start somewhere.
1352 # return. But one need to start somewhere.
1352 part = op.reply.newpart('reply:changegroup')
1353 part = op.reply.newpart('reply:changegroup')
1353 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1354 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1354 part.addparam('return', '%i' % ret, mandatory=False)
1355 part.addparam('return', '%i' % ret, mandatory=False)
1355 try:
1356 try:
1356 real_part.validate()
1357 real_part.validate()
1357 except error.Abort as e:
1358 except error.Abort as e:
1358 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1359 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1359 (util.hidepassword(raw_url), str(e)))
1360 (util.hidepassword(raw_url), str(e)))
1360 assert not inpart.read()
1361 assert not inpart.read()
1361
1362
1362 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1363 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1363 def handlereplychangegroup(op, inpart):
1364 def handlereplychangegroup(op, inpart):
1364 ret = int(inpart.params['return'])
1365 ret = int(inpart.params['return'])
1365 replyto = int(inpart.params['in-reply-to'])
1366 replyto = int(inpart.params['in-reply-to'])
1366 op.records.add('changegroup', {'return': ret}, replyto)
1367 op.records.add('changegroup', {'return': ret}, replyto)
1367
1368
1368 @parthandler('check:heads')
1369 @parthandler('check:heads')
1369 def handlecheckheads(op, inpart):
1370 def handlecheckheads(op, inpart):
1370 """check that head of the repo did not change
1371 """check that head of the repo did not change
1371
1372
1372 This is used to detect a push race when using unbundle.
1373 This is used to detect a push race when using unbundle.
1373 This replaces the "heads" argument of unbundle."""
1374 This replaces the "heads" argument of unbundle."""
1374 h = inpart.read(20)
1375 h = inpart.read(20)
1375 heads = []
1376 heads = []
1376 while len(h) == 20:
1377 while len(h) == 20:
1377 heads.append(h)
1378 heads.append(h)
1378 h = inpart.read(20)
1379 h = inpart.read(20)
1379 assert not h
1380 assert not h
1380 # Trigger a transaction so that we are guaranteed to have the lock now.
1381 # Trigger a transaction so that we are guaranteed to have the lock now.
1381 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1382 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1382 op.gettransaction()
1383 op.gettransaction()
1383 if heads != op.repo.heads():
1384 if heads != op.repo.heads():
1384 raise error.PushRaced('repository changed while pushing - '
1385 raise error.PushRaced('repository changed while pushing - '
1385 'please try again')
1386 'please try again')
1386
1387
1387 @parthandler('output')
1388 @parthandler('output')
1388 def handleoutput(op, inpart):
1389 def handleoutput(op, inpart):
1389 """forward output captured on the server to the client"""
1390 """forward output captured on the server to the client"""
1390 for line in inpart.read().splitlines():
1391 for line in inpart.read().splitlines():
1391 op.ui.status(('remote: %s\n' % line))
1392 op.ui.status(('remote: %s\n' % line))
1392
1393
1393 @parthandler('replycaps')
1394 @parthandler('replycaps')
1394 def handlereplycaps(op, inpart):
1395 def handlereplycaps(op, inpart):
1395 """Notify that a reply bundle should be created
1396 """Notify that a reply bundle should be created
1396
1397
1397 The payload contains the capabilities information for the reply"""
1398 The payload contains the capabilities information for the reply"""
1398 caps = decodecaps(inpart.read())
1399 caps = decodecaps(inpart.read())
1399 if op.reply is None:
1400 if op.reply is None:
1400 op.reply = bundle20(op.ui, caps)
1401 op.reply = bundle20(op.ui, caps)
1401
1402
1402 @parthandler('error:abort', ('message', 'hint'))
1403 @parthandler('error:abort', ('message', 'hint'))
1403 def handleerrorabort(op, inpart):
1404 def handleerrorabort(op, inpart):
1404 """Used to transmit abort error over the wire"""
1405 """Used to transmit abort error over the wire"""
1405 raise error.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1406 raise error.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1406
1407
1407 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1408 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1408 'in-reply-to'))
1409 'in-reply-to'))
1409 def handleerrorpushkey(op, inpart):
1410 def handleerrorpushkey(op, inpart):
1410 """Used to transmit failure of a mandatory pushkey over the wire"""
1411 """Used to transmit failure of a mandatory pushkey over the wire"""
1411 kwargs = {}
1412 kwargs = {}
1412 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1413 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1413 value = inpart.params.get(name)
1414 value = inpart.params.get(name)
1414 if value is not None:
1415 if value is not None:
1415 kwargs[name] = value
1416 kwargs[name] = value
1416 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1417 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1417
1418
1418 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1419 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1419 def handleerrorunsupportedcontent(op, inpart):
1420 def handleerrorunsupportedcontent(op, inpart):
1420 """Used to transmit unknown content error over the wire"""
1421 """Used to transmit unknown content error over the wire"""
1421 kwargs = {}
1422 kwargs = {}
1422 parttype = inpart.params.get('parttype')
1423 parttype = inpart.params.get('parttype')
1423 if parttype is not None:
1424 if parttype is not None:
1424 kwargs['parttype'] = parttype
1425 kwargs['parttype'] = parttype
1425 params = inpart.params.get('params')
1426 params = inpart.params.get('params')
1426 if params is not None:
1427 if params is not None:
1427 kwargs['params'] = params.split('\0')
1428 kwargs['params'] = params.split('\0')
1428
1429
1429 raise error.BundleUnknownFeatureError(**kwargs)
1430 raise error.BundleUnknownFeatureError(**kwargs)
1430
1431
1431 @parthandler('error:pushraced', ('message',))
1432 @parthandler('error:pushraced', ('message',))
1432 def handleerrorpushraced(op, inpart):
1433 def handleerrorpushraced(op, inpart):
1433 """Used to transmit push race error over the wire"""
1434 """Used to transmit push race error over the wire"""
1434 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1435 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1435
1436
1436 @parthandler('listkeys', ('namespace',))
1437 @parthandler('listkeys', ('namespace',))
1437 def handlelistkeys(op, inpart):
1438 def handlelistkeys(op, inpart):
1438 """retrieve pushkey namespace content stored in a bundle2"""
1439 """retrieve pushkey namespace content stored in a bundle2"""
1439 namespace = inpart.params['namespace']
1440 namespace = inpart.params['namespace']
1440 r = pushkey.decodekeys(inpart.read())
1441 r = pushkey.decodekeys(inpart.read())
1441 op.records.add('listkeys', (namespace, r))
1442 op.records.add('listkeys', (namespace, r))
1442
1443
1443 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1444 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1444 def handlepushkey(op, inpart):
1445 def handlepushkey(op, inpart):
1445 """process a pushkey request"""
1446 """process a pushkey request"""
1446 dec = pushkey.decode
1447 dec = pushkey.decode
1447 namespace = dec(inpart.params['namespace'])
1448 namespace = dec(inpart.params['namespace'])
1448 key = dec(inpart.params['key'])
1449 key = dec(inpart.params['key'])
1449 old = dec(inpart.params['old'])
1450 old = dec(inpart.params['old'])
1450 new = dec(inpart.params['new'])
1451 new = dec(inpart.params['new'])
1451 # Grab the transaction to ensure that we have the lock before performing the
1452 # Grab the transaction to ensure that we have the lock before performing the
1452 # pushkey.
1453 # pushkey.
1453 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1454 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1454 op.gettransaction()
1455 op.gettransaction()
1455 ret = op.repo.pushkey(namespace, key, old, new)
1456 ret = op.repo.pushkey(namespace, key, old, new)
1456 record = {'namespace': namespace,
1457 record = {'namespace': namespace,
1457 'key': key,
1458 'key': key,
1458 'old': old,
1459 'old': old,
1459 'new': new}
1460 'new': new}
1460 op.records.add('pushkey', record)
1461 op.records.add('pushkey', record)
1461 if op.reply is not None:
1462 if op.reply is not None:
1462 rpart = op.reply.newpart('reply:pushkey')
1463 rpart = op.reply.newpart('reply:pushkey')
1463 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1464 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1464 rpart.addparam('return', '%i' % ret, mandatory=False)
1465 rpart.addparam('return', '%i' % ret, mandatory=False)
1465 if inpart.mandatory and not ret:
1466 if inpart.mandatory and not ret:
1466 kwargs = {}
1467 kwargs = {}
1467 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1468 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1468 if key in inpart.params:
1469 if key in inpart.params:
1469 kwargs[key] = inpart.params[key]
1470 kwargs[key] = inpart.params[key]
1470 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1471 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1471
1472
1472 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1473 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1473 def handlepushkeyreply(op, inpart):
1474 def handlepushkeyreply(op, inpart):
1474 """retrieve the result of a pushkey request"""
1475 """retrieve the result of a pushkey request"""
1475 ret = int(inpart.params['return'])
1476 ret = int(inpart.params['return'])
1476 partid = int(inpart.params['in-reply-to'])
1477 partid = int(inpart.params['in-reply-to'])
1477 op.records.add('pushkey', {'return': ret}, partid)
1478 op.records.add('pushkey', {'return': ret}, partid)
1478
1479
1479 @parthandler('obsmarkers')
1480 @parthandler('obsmarkers')
1480 def handleobsmarker(op, inpart):
1481 def handleobsmarker(op, inpart):
1481 """add a stream of obsmarkers to the repo"""
1482 """add a stream of obsmarkers to the repo"""
1482 tr = op.gettransaction()
1483 tr = op.gettransaction()
1483 markerdata = inpart.read()
1484 markerdata = inpart.read()
1484 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1485 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1485 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1486 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1486 % len(markerdata))
1487 % len(markerdata))
1487 # The mergemarkers call will crash if marker creation is not enabled.
1488 # The mergemarkers call will crash if marker creation is not enabled.
1488 # we want to avoid this if the part is advisory.
1489 # we want to avoid this if the part is advisory.
1489 if not inpart.mandatory and op.repo.obsstore.readonly:
1490 if not inpart.mandatory and op.repo.obsstore.readonly:
1490 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1491 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1491 return
1492 return
1492 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1493 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1493 if new:
1494 if new:
1494 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1495 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1495 op.records.add('obsmarkers', {'new': new})
1496 op.records.add('obsmarkers', {'new': new})
1496 if op.reply is not None:
1497 if op.reply is not None:
1497 rpart = op.reply.newpart('reply:obsmarkers')
1498 rpart = op.reply.newpart('reply:obsmarkers')
1498 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1499 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1499 rpart.addparam('new', '%i' % new, mandatory=False)
1500 rpart.addparam('new', '%i' % new, mandatory=False)
1500
1501
1501
1502
1502 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1503 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1503 def handleobsmarkerreply(op, inpart):
1504 def handleobsmarkerreply(op, inpart):
1504 """retrieve the result of a pushkey request"""
1505 """retrieve the result of a pushkey request"""
1505 ret = int(inpart.params['new'])
1506 ret = int(inpart.params['new'])
1506 partid = int(inpart.params['in-reply-to'])
1507 partid = int(inpart.params['in-reply-to'])
1507 op.records.add('obsmarkers', {'new': ret}, partid)
1508 op.records.add('obsmarkers', {'new': ret}, partid)
1508
1509
1509 @parthandler('hgtagsfnodes')
1510 @parthandler('hgtagsfnodes')
1510 def handlehgtagsfnodes(op, inpart):
1511 def handlehgtagsfnodes(op, inpart):
1511 """Applies .hgtags fnodes cache entries to the local repo.
1512 """Applies .hgtags fnodes cache entries to the local repo.
1512
1513
1513 Payload is pairs of 20 byte changeset nodes and filenodes.
1514 Payload is pairs of 20 byte changeset nodes and filenodes.
1514 """
1515 """
1515 # Grab the transaction so we ensure that we have the lock at this point.
1516 # Grab the transaction so we ensure that we have the lock at this point.
1516 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1517 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1517 op.gettransaction()
1518 op.gettransaction()
1518 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1519 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1519
1520
1520 count = 0
1521 count = 0
1521 while True:
1522 while True:
1522 node = inpart.read(20)
1523 node = inpart.read(20)
1523 fnode = inpart.read(20)
1524 fnode = inpart.read(20)
1524 if len(node) < 20 or len(fnode) < 20:
1525 if len(node) < 20 or len(fnode) < 20:
1525 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1526 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1526 break
1527 break
1527 cache.setfnode(node, fnode)
1528 cache.setfnode(node, fnode)
1528 count += 1
1529 count += 1
1529
1530
1530 cache.write()
1531 cache.write()
1531 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1532 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now