##// END OF EJS Templates
applybundle: take source as argument...
Pierre-Yves David -
r26793:f37de5d4 default
parent child Browse files
Show More
@@ -1,1532 +1,1534
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155 import urllib
155 import urllib
156
156
157 from .i18n import _
157 from .i18n import _
158 from . import (
158 from . import (
159 changegroup,
159 changegroup,
160 error,
160 error,
161 obsolete,
161 obsolete,
162 pushkey,
162 pushkey,
163 tags,
163 tags,
164 url,
164 url,
165 util,
165 util,
166 )
166 )
167
167
168 _pack = struct.pack
168 _pack = struct.pack
169 _unpack = struct.unpack
169 _unpack = struct.unpack
170
170
171 _fstreamparamsize = '>i'
171 _fstreamparamsize = '>i'
172 _fpartheadersize = '>i'
172 _fpartheadersize = '>i'
173 _fparttypesize = '>B'
173 _fparttypesize = '>B'
174 _fpartid = '>I'
174 _fpartid = '>I'
175 _fpayloadsize = '>i'
175 _fpayloadsize = '>i'
176 _fpartparamcount = '>BB'
176 _fpartparamcount = '>BB'
177
177
178 preferedchunksize = 4096
178 preferedchunksize = 4096
179
179
180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
181
181
182 def outdebug(ui, message):
182 def outdebug(ui, message):
183 """debug regarding output stream (bundling)"""
183 """debug regarding output stream (bundling)"""
184 if ui.configbool('devel', 'bundle2.debug', False):
184 if ui.configbool('devel', 'bundle2.debug', False):
185 ui.debug('bundle2-output: %s\n' % message)
185 ui.debug('bundle2-output: %s\n' % message)
186
186
187 def indebug(ui, message):
187 def indebug(ui, message):
188 """debug on input stream (unbundling)"""
188 """debug on input stream (unbundling)"""
189 if ui.configbool('devel', 'bundle2.debug', False):
189 if ui.configbool('devel', 'bundle2.debug', False):
190 ui.debug('bundle2-input: %s\n' % message)
190 ui.debug('bundle2-input: %s\n' % message)
191
191
192 def validateparttype(parttype):
192 def validateparttype(parttype):
193 """raise ValueError if a parttype contains invalid character"""
193 """raise ValueError if a parttype contains invalid character"""
194 if _parttypeforbidden.search(parttype):
194 if _parttypeforbidden.search(parttype):
195 raise ValueError(parttype)
195 raise ValueError(parttype)
196
196
197 def _makefpartparamsizes(nbparams):
197 def _makefpartparamsizes(nbparams):
198 """return a struct format to read part parameter sizes
198 """return a struct format to read part parameter sizes
199
199
200 The number parameters is variable so we need to build that format
200 The number parameters is variable so we need to build that format
201 dynamically.
201 dynamically.
202 """
202 """
203 return '>'+('BB'*nbparams)
203 return '>'+('BB'*nbparams)
204
204
205 parthandlermapping = {}
205 parthandlermapping = {}
206
206
207 def parthandler(parttype, params=()):
207 def parthandler(parttype, params=()):
208 """decorator that register a function as a bundle2 part handler
208 """decorator that register a function as a bundle2 part handler
209
209
210 eg::
210 eg::
211
211
212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
213 def myparttypehandler(...):
213 def myparttypehandler(...):
214 '''process a part of type "my part".'''
214 '''process a part of type "my part".'''
215 ...
215 ...
216 """
216 """
217 validateparttype(parttype)
217 validateparttype(parttype)
218 def _decorator(func):
218 def _decorator(func):
219 lparttype = parttype.lower() # enforce lower case matching.
219 lparttype = parttype.lower() # enforce lower case matching.
220 assert lparttype not in parthandlermapping
220 assert lparttype not in parthandlermapping
221 parthandlermapping[lparttype] = func
221 parthandlermapping[lparttype] = func
222 func.params = frozenset(params)
222 func.params = frozenset(params)
223 return func
223 return func
224 return _decorator
224 return _decorator
225
225
226 class unbundlerecords(object):
226 class unbundlerecords(object):
227 """keep record of what happens during and unbundle
227 """keep record of what happens during and unbundle
228
228
229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
230 category of record and obj is an arbitrary object.
230 category of record and obj is an arbitrary object.
231
231
232 `records['cat']` will return all entries of this category 'cat'.
232 `records['cat']` will return all entries of this category 'cat'.
233
233
234 Iterating on the object itself will yield `('category', obj)` tuples
234 Iterating on the object itself will yield `('category', obj)` tuples
235 for all entries.
235 for all entries.
236
236
237 All iterations happens in chronological order.
237 All iterations happens in chronological order.
238 """
238 """
239
239
240 def __init__(self):
240 def __init__(self):
241 self._categories = {}
241 self._categories = {}
242 self._sequences = []
242 self._sequences = []
243 self._replies = {}
243 self._replies = {}
244
244
245 def add(self, category, entry, inreplyto=None):
245 def add(self, category, entry, inreplyto=None):
246 """add a new record of a given category.
246 """add a new record of a given category.
247
247
248 The entry can then be retrieved in the list returned by
248 The entry can then be retrieved in the list returned by
249 self['category']."""
249 self['category']."""
250 self._categories.setdefault(category, []).append(entry)
250 self._categories.setdefault(category, []).append(entry)
251 self._sequences.append((category, entry))
251 self._sequences.append((category, entry))
252 if inreplyto is not None:
252 if inreplyto is not None:
253 self.getreplies(inreplyto).add(category, entry)
253 self.getreplies(inreplyto).add(category, entry)
254
254
255 def getreplies(self, partid):
255 def getreplies(self, partid):
256 """get the records that are replies to a specific part"""
256 """get the records that are replies to a specific part"""
257 return self._replies.setdefault(partid, unbundlerecords())
257 return self._replies.setdefault(partid, unbundlerecords())
258
258
259 def __getitem__(self, cat):
259 def __getitem__(self, cat):
260 return tuple(self._categories.get(cat, ()))
260 return tuple(self._categories.get(cat, ()))
261
261
262 def __iter__(self):
262 def __iter__(self):
263 return iter(self._sequences)
263 return iter(self._sequences)
264
264
265 def __len__(self):
265 def __len__(self):
266 return len(self._sequences)
266 return len(self._sequences)
267
267
268 def __nonzero__(self):
268 def __nonzero__(self):
269 return bool(self._sequences)
269 return bool(self._sequences)
270
270
271 class bundleoperation(object):
271 class bundleoperation(object):
272 """an object that represents a single bundling process
272 """an object that represents a single bundling process
273
273
274 Its purpose is to carry unbundle-related objects and states.
274 Its purpose is to carry unbundle-related objects and states.
275
275
276 A new object should be created at the beginning of each bundle processing.
276 A new object should be created at the beginning of each bundle processing.
277 The object is to be returned by the processing function.
277 The object is to be returned by the processing function.
278
278
279 The object has very little content now it will ultimately contain:
279 The object has very little content now it will ultimately contain:
280 * an access to the repo the bundle is applied to,
280 * an access to the repo the bundle is applied to,
281 * a ui object,
281 * a ui object,
282 * a way to retrieve a transaction to add changes to the repo,
282 * a way to retrieve a transaction to add changes to the repo,
283 * a way to record the result of processing each part,
283 * a way to record the result of processing each part,
284 * a way to construct a bundle response when applicable.
284 * a way to construct a bundle response when applicable.
285 """
285 """
286
286
287 def __init__(self, repo, transactiongetter, captureoutput=True):
287 def __init__(self, repo, transactiongetter, captureoutput=True):
288 self.repo = repo
288 self.repo = repo
289 self.ui = repo.ui
289 self.ui = repo.ui
290 self.records = unbundlerecords()
290 self.records = unbundlerecords()
291 self.gettransaction = transactiongetter
291 self.gettransaction = transactiongetter
292 self.reply = None
292 self.reply = None
293 self.captureoutput = captureoutput
293 self.captureoutput = captureoutput
294
294
295 class TransactionUnavailable(RuntimeError):
295 class TransactionUnavailable(RuntimeError):
296 pass
296 pass
297
297
298 def _notransaction():
298 def _notransaction():
299 """default method to get a transaction while processing a bundle
299 """default method to get a transaction while processing a bundle
300
300
301 Raise an exception to highlight the fact that no transaction was expected
301 Raise an exception to highlight the fact that no transaction was expected
302 to be created"""
302 to be created"""
303 raise TransactionUnavailable()
303 raise TransactionUnavailable()
304
304
305 def applybundle(repo, unbundler, tr, op=None):
305 def applybundle(repo, unbundler, tr, source=None, op=None):
306 # transform me into unbundler.apply() as soon as the freeze is lifted
306 # transform me into unbundler.apply() as soon as the freeze is lifted
307 tr.hookargs['bundle2'] = '1'
307 tr.hookargs['bundle2'] = '1'
308 if source is not None and 'source' not in tr.hookargs:
309 tr.hookargs['source'] = source
308 return processbundle(repo, unbundler, lambda: tr, op=op)
310 return processbundle(repo, unbundler, lambda: tr, op=op)
309
311
310 def processbundle(repo, unbundler, transactiongetter=None, op=None):
312 def processbundle(repo, unbundler, transactiongetter=None, op=None):
311 """This function process a bundle, apply effect to/from a repo
313 """This function process a bundle, apply effect to/from a repo
312
314
313 It iterates over each part then searches for and uses the proper handling
315 It iterates over each part then searches for and uses the proper handling
314 code to process the part. Parts are processed in order.
316 code to process the part. Parts are processed in order.
315
317
316 This is very early version of this function that will be strongly reworked
318 This is very early version of this function that will be strongly reworked
317 before final usage.
319 before final usage.
318
320
319 Unknown Mandatory part will abort the process.
321 Unknown Mandatory part will abort the process.
320
322
321 It is temporarily possible to provide a prebuilt bundleoperation to the
323 It is temporarily possible to provide a prebuilt bundleoperation to the
322 function. This is used to ensure output is properly propagated in case of
324 function. This is used to ensure output is properly propagated in case of
323 an error during the unbundling. This output capturing part will likely be
325 an error during the unbundling. This output capturing part will likely be
324 reworked and this ability will probably go away in the process.
326 reworked and this ability will probably go away in the process.
325 """
327 """
326 if op is None:
328 if op is None:
327 if transactiongetter is None:
329 if transactiongetter is None:
328 transactiongetter = _notransaction
330 transactiongetter = _notransaction
329 op = bundleoperation(repo, transactiongetter)
331 op = bundleoperation(repo, transactiongetter)
330 # todo:
332 # todo:
331 # - replace this is a init function soon.
333 # - replace this is a init function soon.
332 # - exception catching
334 # - exception catching
333 unbundler.params
335 unbundler.params
334 if repo.ui.debugflag:
336 if repo.ui.debugflag:
335 msg = ['bundle2-input-bundle:']
337 msg = ['bundle2-input-bundle:']
336 if unbundler.params:
338 if unbundler.params:
337 msg.append(' %i params')
339 msg.append(' %i params')
338 if op.gettransaction is None:
340 if op.gettransaction is None:
339 msg.append(' no-transaction')
341 msg.append(' no-transaction')
340 else:
342 else:
341 msg.append(' with-transaction')
343 msg.append(' with-transaction')
342 msg.append('\n')
344 msg.append('\n')
343 repo.ui.debug(''.join(msg))
345 repo.ui.debug(''.join(msg))
344 iterparts = enumerate(unbundler.iterparts())
346 iterparts = enumerate(unbundler.iterparts())
345 part = None
347 part = None
346 nbpart = 0
348 nbpart = 0
347 try:
349 try:
348 for nbpart, part in iterparts:
350 for nbpart, part in iterparts:
349 _processpart(op, part)
351 _processpart(op, part)
350 except BaseException as exc:
352 except BaseException as exc:
351 for nbpart, part in iterparts:
353 for nbpart, part in iterparts:
352 # consume the bundle content
354 # consume the bundle content
353 part.seek(0, 2)
355 part.seek(0, 2)
354 # Small hack to let caller code distinguish exceptions from bundle2
356 # Small hack to let caller code distinguish exceptions from bundle2
355 # processing from processing the old format. This is mostly
357 # processing from processing the old format. This is mostly
356 # needed to handle different return codes to unbundle according to the
358 # needed to handle different return codes to unbundle according to the
357 # type of bundle. We should probably clean up or drop this return code
359 # type of bundle. We should probably clean up or drop this return code
358 # craziness in a future version.
360 # craziness in a future version.
359 exc.duringunbundle2 = True
361 exc.duringunbundle2 = True
360 salvaged = []
362 salvaged = []
361 replycaps = None
363 replycaps = None
362 if op.reply is not None:
364 if op.reply is not None:
363 salvaged = op.reply.salvageoutput()
365 salvaged = op.reply.salvageoutput()
364 replycaps = op.reply.capabilities
366 replycaps = op.reply.capabilities
365 exc._replycaps = replycaps
367 exc._replycaps = replycaps
366 exc._bundle2salvagedoutput = salvaged
368 exc._bundle2salvagedoutput = salvaged
367 raise
369 raise
368 finally:
370 finally:
369 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
371 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
370
372
371 return op
373 return op
372
374
373 def _processpart(op, part):
375 def _processpart(op, part):
374 """process a single part from a bundle
376 """process a single part from a bundle
375
377
376 The part is guaranteed to have been fully consumed when the function exits
378 The part is guaranteed to have been fully consumed when the function exits
377 (even if an exception is raised)."""
379 (even if an exception is raised)."""
378 status = 'unknown' # used by debug output
380 status = 'unknown' # used by debug output
379 try:
381 try:
380 try:
382 try:
381 handler = parthandlermapping.get(part.type)
383 handler = parthandlermapping.get(part.type)
382 if handler is None:
384 if handler is None:
383 status = 'unsupported-type'
385 status = 'unsupported-type'
384 raise error.BundleUnknownFeatureError(parttype=part.type)
386 raise error.BundleUnknownFeatureError(parttype=part.type)
385 indebug(op.ui, 'found a handler for part %r' % part.type)
387 indebug(op.ui, 'found a handler for part %r' % part.type)
386 unknownparams = part.mandatorykeys - handler.params
388 unknownparams = part.mandatorykeys - handler.params
387 if unknownparams:
389 if unknownparams:
388 unknownparams = list(unknownparams)
390 unknownparams = list(unknownparams)
389 unknownparams.sort()
391 unknownparams.sort()
390 status = 'unsupported-params (%s)' % unknownparams
392 status = 'unsupported-params (%s)' % unknownparams
391 raise error.BundleUnknownFeatureError(parttype=part.type,
393 raise error.BundleUnknownFeatureError(parttype=part.type,
392 params=unknownparams)
394 params=unknownparams)
393 status = 'supported'
395 status = 'supported'
394 except error.BundleUnknownFeatureError as exc:
396 except error.BundleUnknownFeatureError as exc:
395 if part.mandatory: # mandatory parts
397 if part.mandatory: # mandatory parts
396 raise
398 raise
397 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
399 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
398 return # skip to part processing
400 return # skip to part processing
399 finally:
401 finally:
400 if op.ui.debugflag:
402 if op.ui.debugflag:
401 msg = ['bundle2-input-part: "%s"' % part.type]
403 msg = ['bundle2-input-part: "%s"' % part.type]
402 if not part.mandatory:
404 if not part.mandatory:
403 msg.append(' (advisory)')
405 msg.append(' (advisory)')
404 nbmp = len(part.mandatorykeys)
406 nbmp = len(part.mandatorykeys)
405 nbap = len(part.params) - nbmp
407 nbap = len(part.params) - nbmp
406 if nbmp or nbap:
408 if nbmp or nbap:
407 msg.append(' (params:')
409 msg.append(' (params:')
408 if nbmp:
410 if nbmp:
409 msg.append(' %i mandatory' % nbmp)
411 msg.append(' %i mandatory' % nbmp)
410 if nbap:
412 if nbap:
411 msg.append(' %i advisory' % nbmp)
413 msg.append(' %i advisory' % nbmp)
412 msg.append(')')
414 msg.append(')')
413 msg.append(' %s\n' % status)
415 msg.append(' %s\n' % status)
414 op.ui.debug(''.join(msg))
416 op.ui.debug(''.join(msg))
415
417
416 # handler is called outside the above try block so that we don't
418 # handler is called outside the above try block so that we don't
417 # risk catching KeyErrors from anything other than the
419 # risk catching KeyErrors from anything other than the
418 # parthandlermapping lookup (any KeyError raised by handler()
420 # parthandlermapping lookup (any KeyError raised by handler()
419 # itself represents a defect of a different variety).
421 # itself represents a defect of a different variety).
420 output = None
422 output = None
421 if op.captureoutput and op.reply is not None:
423 if op.captureoutput and op.reply is not None:
422 op.ui.pushbuffer(error=True, subproc=True)
424 op.ui.pushbuffer(error=True, subproc=True)
423 output = ''
425 output = ''
424 try:
426 try:
425 handler(op, part)
427 handler(op, part)
426 finally:
428 finally:
427 if output is not None:
429 if output is not None:
428 output = op.ui.popbuffer()
430 output = op.ui.popbuffer()
429 if output:
431 if output:
430 outpart = op.reply.newpart('output', data=output,
432 outpart = op.reply.newpart('output', data=output,
431 mandatory=False)
433 mandatory=False)
432 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
434 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
433 finally:
435 finally:
434 # consume the part content to not corrupt the stream.
436 # consume the part content to not corrupt the stream.
435 part.seek(0, 2)
437 part.seek(0, 2)
436
438
437
439
438 def decodecaps(blob):
440 def decodecaps(blob):
439 """decode a bundle2 caps bytes blob into a dictionary
441 """decode a bundle2 caps bytes blob into a dictionary
440
442
441 The blob is a list of capabilities (one per line)
443 The blob is a list of capabilities (one per line)
442 Capabilities may have values using a line of the form::
444 Capabilities may have values using a line of the form::
443
445
444 capability=value1,value2,value3
446 capability=value1,value2,value3
445
447
446 The values are always a list."""
448 The values are always a list."""
447 caps = {}
449 caps = {}
448 for line in blob.splitlines():
450 for line in blob.splitlines():
449 if not line:
451 if not line:
450 continue
452 continue
451 if '=' not in line:
453 if '=' not in line:
452 key, vals = line, ()
454 key, vals = line, ()
453 else:
455 else:
454 key, vals = line.split('=', 1)
456 key, vals = line.split('=', 1)
455 vals = vals.split(',')
457 vals = vals.split(',')
456 key = urllib.unquote(key)
458 key = urllib.unquote(key)
457 vals = [urllib.unquote(v) for v in vals]
459 vals = [urllib.unquote(v) for v in vals]
458 caps[key] = vals
460 caps[key] = vals
459 return caps
461 return caps
460
462
461 def encodecaps(caps):
463 def encodecaps(caps):
462 """encode a bundle2 caps dictionary into a bytes blob"""
464 """encode a bundle2 caps dictionary into a bytes blob"""
463 chunks = []
465 chunks = []
464 for ca in sorted(caps):
466 for ca in sorted(caps):
465 vals = caps[ca]
467 vals = caps[ca]
466 ca = urllib.quote(ca)
468 ca = urllib.quote(ca)
467 vals = [urllib.quote(v) for v in vals]
469 vals = [urllib.quote(v) for v in vals]
468 if vals:
470 if vals:
469 ca = "%s=%s" % (ca, ','.join(vals))
471 ca = "%s=%s" % (ca, ','.join(vals))
470 chunks.append(ca)
472 chunks.append(ca)
471 return '\n'.join(chunks)
473 return '\n'.join(chunks)
472
474
473 class bundle20(object):
475 class bundle20(object):
474 """represent an outgoing bundle2 container
476 """represent an outgoing bundle2 container
475
477
476 Use the `addparam` method to add stream level parameter. and `newpart` to
478 Use the `addparam` method to add stream level parameter. and `newpart` to
477 populate it. Then call `getchunks` to retrieve all the binary chunks of
479 populate it. Then call `getchunks` to retrieve all the binary chunks of
478 data that compose the bundle2 container."""
480 data that compose the bundle2 container."""
479
481
480 _magicstring = 'HG20'
482 _magicstring = 'HG20'
481
483
482 def __init__(self, ui, capabilities=()):
484 def __init__(self, ui, capabilities=()):
483 self.ui = ui
485 self.ui = ui
484 self._params = []
486 self._params = []
485 self._parts = []
487 self._parts = []
486 self.capabilities = dict(capabilities)
488 self.capabilities = dict(capabilities)
487 self._compressor = util.compressors[None]()
489 self._compressor = util.compressors[None]()
488
490
489 def setcompression(self, alg):
491 def setcompression(self, alg):
490 """setup core part compression to <alg>"""
492 """setup core part compression to <alg>"""
491 if alg is None:
493 if alg is None:
492 return
494 return
493 assert not any(n.lower() == 'Compression' for n, v in self._params)
495 assert not any(n.lower() == 'Compression' for n, v in self._params)
494 self.addparam('Compression', alg)
496 self.addparam('Compression', alg)
495 self._compressor = util.compressors[alg]()
497 self._compressor = util.compressors[alg]()
496
498
497 @property
499 @property
498 def nbparts(self):
500 def nbparts(self):
499 """total number of parts added to the bundler"""
501 """total number of parts added to the bundler"""
500 return len(self._parts)
502 return len(self._parts)
501
503
502 # methods used to defines the bundle2 content
504 # methods used to defines the bundle2 content
503 def addparam(self, name, value=None):
505 def addparam(self, name, value=None):
504 """add a stream level parameter"""
506 """add a stream level parameter"""
505 if not name:
507 if not name:
506 raise ValueError('empty parameter name')
508 raise ValueError('empty parameter name')
507 if name[0] not in string.letters:
509 if name[0] not in string.letters:
508 raise ValueError('non letter first character: %r' % name)
510 raise ValueError('non letter first character: %r' % name)
509 self._params.append((name, value))
511 self._params.append((name, value))
510
512
511 def addpart(self, part):
513 def addpart(self, part):
512 """add a new part to the bundle2 container
514 """add a new part to the bundle2 container
513
515
514 Parts contains the actual applicative payload."""
516 Parts contains the actual applicative payload."""
515 assert part.id is None
517 assert part.id is None
516 part.id = len(self._parts) # very cheap counter
518 part.id = len(self._parts) # very cheap counter
517 self._parts.append(part)
519 self._parts.append(part)
518
520
519 def newpart(self, typeid, *args, **kwargs):
521 def newpart(self, typeid, *args, **kwargs):
520 """create a new part and add it to the containers
522 """create a new part and add it to the containers
521
523
522 As the part is directly added to the containers. For now, this means
524 As the part is directly added to the containers. For now, this means
523 that any failure to properly initialize the part after calling
525 that any failure to properly initialize the part after calling
524 ``newpart`` should result in a failure of the whole bundling process.
526 ``newpart`` should result in a failure of the whole bundling process.
525
527
526 You can still fall back to manually create and add if you need better
528 You can still fall back to manually create and add if you need better
527 control."""
529 control."""
528 part = bundlepart(typeid, *args, **kwargs)
530 part = bundlepart(typeid, *args, **kwargs)
529 self.addpart(part)
531 self.addpart(part)
530 return part
532 return part
531
533
532 # methods used to generate the bundle2 stream
534 # methods used to generate the bundle2 stream
533 def getchunks(self):
535 def getchunks(self):
534 if self.ui.debugflag:
536 if self.ui.debugflag:
535 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
537 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
536 if self._params:
538 if self._params:
537 msg.append(' (%i params)' % len(self._params))
539 msg.append(' (%i params)' % len(self._params))
538 msg.append(' %i parts total\n' % len(self._parts))
540 msg.append(' %i parts total\n' % len(self._parts))
539 self.ui.debug(''.join(msg))
541 self.ui.debug(''.join(msg))
540 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
542 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
541 yield self._magicstring
543 yield self._magicstring
542 param = self._paramchunk()
544 param = self._paramchunk()
543 outdebug(self.ui, 'bundle parameter: %s' % param)
545 outdebug(self.ui, 'bundle parameter: %s' % param)
544 yield _pack(_fstreamparamsize, len(param))
546 yield _pack(_fstreamparamsize, len(param))
545 if param:
547 if param:
546 yield param
548 yield param
547 # starting compression
549 # starting compression
548 for chunk in self._getcorechunk():
550 for chunk in self._getcorechunk():
549 yield self._compressor.compress(chunk)
551 yield self._compressor.compress(chunk)
550 yield self._compressor.flush()
552 yield self._compressor.flush()
551
553
552 def _paramchunk(self):
554 def _paramchunk(self):
553 """return a encoded version of all stream parameters"""
555 """return a encoded version of all stream parameters"""
554 blocks = []
556 blocks = []
555 for par, value in self._params:
557 for par, value in self._params:
556 par = urllib.quote(par)
558 par = urllib.quote(par)
557 if value is not None:
559 if value is not None:
558 value = urllib.quote(value)
560 value = urllib.quote(value)
559 par = '%s=%s' % (par, value)
561 par = '%s=%s' % (par, value)
560 blocks.append(par)
562 blocks.append(par)
561 return ' '.join(blocks)
563 return ' '.join(blocks)
562
564
563 def _getcorechunk(self):
565 def _getcorechunk(self):
564 """yield chunk for the core part of the bundle
566 """yield chunk for the core part of the bundle
565
567
566 (all but headers and parameters)"""
568 (all but headers and parameters)"""
567 outdebug(self.ui, 'start of parts')
569 outdebug(self.ui, 'start of parts')
568 for part in self._parts:
570 for part in self._parts:
569 outdebug(self.ui, 'bundle part: "%s"' % part.type)
571 outdebug(self.ui, 'bundle part: "%s"' % part.type)
570 for chunk in part.getchunks(ui=self.ui):
572 for chunk in part.getchunks(ui=self.ui):
571 yield chunk
573 yield chunk
572 outdebug(self.ui, 'end of bundle')
574 outdebug(self.ui, 'end of bundle')
573 yield _pack(_fpartheadersize, 0)
575 yield _pack(_fpartheadersize, 0)
574
576
575
577
576 def salvageoutput(self):
578 def salvageoutput(self):
577 """return a list with a copy of all output parts in the bundle
579 """return a list with a copy of all output parts in the bundle
578
580
579 This is meant to be used during error handling to make sure we preserve
581 This is meant to be used during error handling to make sure we preserve
580 server output"""
582 server output"""
581 salvaged = []
583 salvaged = []
582 for part in self._parts:
584 for part in self._parts:
583 if part.type.startswith('output'):
585 if part.type.startswith('output'):
584 salvaged.append(part.copy())
586 salvaged.append(part.copy())
585 return salvaged
587 return salvaged
586
588
587
589
588 class unpackermixin(object):
590 class unpackermixin(object):
589 """A mixin to extract bytes and struct data from a stream"""
591 """A mixin to extract bytes and struct data from a stream"""
590
592
591 def __init__(self, fp):
593 def __init__(self, fp):
592 self._fp = fp
594 self._fp = fp
593 self._seekable = (util.safehasattr(fp, 'seek') and
595 self._seekable = (util.safehasattr(fp, 'seek') and
594 util.safehasattr(fp, 'tell'))
596 util.safehasattr(fp, 'tell'))
595
597
596 def _unpack(self, format):
598 def _unpack(self, format):
597 """unpack this struct format from the stream"""
599 """unpack this struct format from the stream"""
598 data = self._readexact(struct.calcsize(format))
600 data = self._readexact(struct.calcsize(format))
599 return _unpack(format, data)
601 return _unpack(format, data)
600
602
601 def _readexact(self, size):
603 def _readexact(self, size):
602 """read exactly <size> bytes from the stream"""
604 """read exactly <size> bytes from the stream"""
603 return changegroup.readexactly(self._fp, size)
605 return changegroup.readexactly(self._fp, size)
604
606
605 def seek(self, offset, whence=0):
607 def seek(self, offset, whence=0):
606 """move the underlying file pointer"""
608 """move the underlying file pointer"""
607 if self._seekable:
609 if self._seekable:
608 return self._fp.seek(offset, whence)
610 return self._fp.seek(offset, whence)
609 else:
611 else:
610 raise NotImplementedError(_('File pointer is not seekable'))
612 raise NotImplementedError(_('File pointer is not seekable'))
611
613
612 def tell(self):
614 def tell(self):
613 """return the file offset, or None if file is not seekable"""
615 """return the file offset, or None if file is not seekable"""
614 if self._seekable:
616 if self._seekable:
615 try:
617 try:
616 return self._fp.tell()
618 return self._fp.tell()
617 except IOError as e:
619 except IOError as e:
618 if e.errno == errno.ESPIPE:
620 if e.errno == errno.ESPIPE:
619 self._seekable = False
621 self._seekable = False
620 else:
622 else:
621 raise
623 raise
622 return None
624 return None
623
625
624 def close(self):
626 def close(self):
625 """close underlying file"""
627 """close underlying file"""
626 if util.safehasattr(self._fp, 'close'):
628 if util.safehasattr(self._fp, 'close'):
627 return self._fp.close()
629 return self._fp.close()
628
630
629 def getunbundler(ui, fp, magicstring=None):
631 def getunbundler(ui, fp, magicstring=None):
630 """return a valid unbundler object for a given magicstring"""
632 """return a valid unbundler object for a given magicstring"""
631 if magicstring is None:
633 if magicstring is None:
632 magicstring = changegroup.readexactly(fp, 4)
634 magicstring = changegroup.readexactly(fp, 4)
633 magic, version = magicstring[0:2], magicstring[2:4]
635 magic, version = magicstring[0:2], magicstring[2:4]
634 if magic != 'HG':
636 if magic != 'HG':
635 raise error.Abort(_('not a Mercurial bundle'))
637 raise error.Abort(_('not a Mercurial bundle'))
636 unbundlerclass = formatmap.get(version)
638 unbundlerclass = formatmap.get(version)
637 if unbundlerclass is None:
639 if unbundlerclass is None:
638 raise error.Abort(_('unknown bundle version %s') % version)
640 raise error.Abort(_('unknown bundle version %s') % version)
639 unbundler = unbundlerclass(ui, fp)
641 unbundler = unbundlerclass(ui, fp)
640 indebug(ui, 'start processing of %s stream' % magicstring)
642 indebug(ui, 'start processing of %s stream' % magicstring)
641 return unbundler
643 return unbundler
642
644
643 class unbundle20(unpackermixin):
645 class unbundle20(unpackermixin):
644 """interpret a bundle2 stream
646 """interpret a bundle2 stream
645
647
646 This class is fed with a binary stream and yields parts through its
648 This class is fed with a binary stream and yields parts through its
647 `iterparts` methods."""
649 `iterparts` methods."""
648
650
649 _magicstring = 'HG20'
651 _magicstring = 'HG20'
650
652
651 def __init__(self, ui, fp):
653 def __init__(self, ui, fp):
652 """If header is specified, we do not read it out of the stream."""
654 """If header is specified, we do not read it out of the stream."""
653 self.ui = ui
655 self.ui = ui
654 self._decompressor = util.decompressors[None]
656 self._decompressor = util.decompressors[None]
655 super(unbundle20, self).__init__(fp)
657 super(unbundle20, self).__init__(fp)
656
658
657 @util.propertycache
659 @util.propertycache
658 def params(self):
660 def params(self):
659 """dictionary of stream level parameters"""
661 """dictionary of stream level parameters"""
660 indebug(self.ui, 'reading bundle2 stream parameters')
662 indebug(self.ui, 'reading bundle2 stream parameters')
661 params = {}
663 params = {}
662 paramssize = self._unpack(_fstreamparamsize)[0]
664 paramssize = self._unpack(_fstreamparamsize)[0]
663 if paramssize < 0:
665 if paramssize < 0:
664 raise error.BundleValueError('negative bundle param size: %i'
666 raise error.BundleValueError('negative bundle param size: %i'
665 % paramssize)
667 % paramssize)
666 if paramssize:
668 if paramssize:
667 params = self._readexact(paramssize)
669 params = self._readexact(paramssize)
668 params = self._processallparams(params)
670 params = self._processallparams(params)
669 return params
671 return params
670
672
671 def _processallparams(self, paramsblock):
673 def _processallparams(self, paramsblock):
672 """"""
674 """"""
673 params = {}
675 params = {}
674 for p in paramsblock.split(' '):
676 for p in paramsblock.split(' '):
675 p = p.split('=', 1)
677 p = p.split('=', 1)
676 p = [urllib.unquote(i) for i in p]
678 p = [urllib.unquote(i) for i in p]
677 if len(p) < 2:
679 if len(p) < 2:
678 p.append(None)
680 p.append(None)
679 self._processparam(*p)
681 self._processparam(*p)
680 params[p[0]] = p[1]
682 params[p[0]] = p[1]
681 return params
683 return params
682
684
683
685
684 def _processparam(self, name, value):
686 def _processparam(self, name, value):
685 """process a parameter, applying its effect if needed
687 """process a parameter, applying its effect if needed
686
688
687 Parameter starting with a lower case letter are advisory and will be
689 Parameter starting with a lower case letter are advisory and will be
688 ignored when unknown. Those starting with an upper case letter are
690 ignored when unknown. Those starting with an upper case letter are
689 mandatory and will this function will raise a KeyError when unknown.
691 mandatory and will this function will raise a KeyError when unknown.
690
692
691 Note: no option are currently supported. Any input will be either
693 Note: no option are currently supported. Any input will be either
692 ignored or failing.
694 ignored or failing.
693 """
695 """
694 if not name:
696 if not name:
695 raise ValueError('empty parameter name')
697 raise ValueError('empty parameter name')
696 if name[0] not in string.letters:
698 if name[0] not in string.letters:
697 raise ValueError('non letter first character: %r' % name)
699 raise ValueError('non letter first character: %r' % name)
698 try:
700 try:
699 handler = b2streamparamsmap[name.lower()]
701 handler = b2streamparamsmap[name.lower()]
700 except KeyError:
702 except KeyError:
701 if name[0].islower():
703 if name[0].islower():
702 indebug(self.ui, "ignoring unknown parameter %r" % name)
704 indebug(self.ui, "ignoring unknown parameter %r" % name)
703 else:
705 else:
704 raise error.BundleUnknownFeatureError(params=(name,))
706 raise error.BundleUnknownFeatureError(params=(name,))
705 else:
707 else:
706 handler(self, name, value)
708 handler(self, name, value)
707
709
708 def _forwardchunks(self):
710 def _forwardchunks(self):
709 """utility to transfer a bundle2 as binary
711 """utility to transfer a bundle2 as binary
710
712
711 This is made necessary by the fact the 'getbundle' command over 'ssh'
713 This is made necessary by the fact the 'getbundle' command over 'ssh'
712 have no way to know then the reply end, relying on the bundle to be
714 have no way to know then the reply end, relying on the bundle to be
713 interpreted to know its end. This is terrible and we are sorry, but we
715 interpreted to know its end. This is terrible and we are sorry, but we
714 needed to move forward to get general delta enabled.
716 needed to move forward to get general delta enabled.
715 """
717 """
716 yield self._magicstring
718 yield self._magicstring
717 assert 'params' not in vars(self)
719 assert 'params' not in vars(self)
718 paramssize = self._unpack(_fstreamparamsize)[0]
720 paramssize = self._unpack(_fstreamparamsize)[0]
719 if paramssize < 0:
721 if paramssize < 0:
720 raise error.BundleValueError('negative bundle param size: %i'
722 raise error.BundleValueError('negative bundle param size: %i'
721 % paramssize)
723 % paramssize)
722 yield _pack(_fstreamparamsize, paramssize)
724 yield _pack(_fstreamparamsize, paramssize)
723 if paramssize:
725 if paramssize:
724 params = self._readexact(paramssize)
726 params = self._readexact(paramssize)
725 self._processallparams(params)
727 self._processallparams(params)
726 yield params
728 yield params
727 assert self._decompressor is util.decompressors[None]
729 assert self._decompressor is util.decompressors[None]
728 # From there, payload might need to be decompressed
730 # From there, payload might need to be decompressed
729 self._fp = self._decompressor(self._fp)
731 self._fp = self._decompressor(self._fp)
730 emptycount = 0
732 emptycount = 0
731 while emptycount < 2:
733 while emptycount < 2:
732 # so we can brainlessly loop
734 # so we can brainlessly loop
733 assert _fpartheadersize == _fpayloadsize
735 assert _fpartheadersize == _fpayloadsize
734 size = self._unpack(_fpartheadersize)[0]
736 size = self._unpack(_fpartheadersize)[0]
735 yield _pack(_fpartheadersize, size)
737 yield _pack(_fpartheadersize, size)
736 if size:
738 if size:
737 emptycount = 0
739 emptycount = 0
738 else:
740 else:
739 emptycount += 1
741 emptycount += 1
740 continue
742 continue
741 if size == flaginterrupt:
743 if size == flaginterrupt:
742 continue
744 continue
743 elif size < 0:
745 elif size < 0:
744 raise error.BundleValueError('negative chunk size: %i')
746 raise error.BundleValueError('negative chunk size: %i')
745 yield self._readexact(size)
747 yield self._readexact(size)
746
748
747
749
748 def iterparts(self):
750 def iterparts(self):
749 """yield all parts contained in the stream"""
751 """yield all parts contained in the stream"""
750 # make sure param have been loaded
752 # make sure param have been loaded
751 self.params
753 self.params
752 # From there, payload need to be decompressed
754 # From there, payload need to be decompressed
753 self._fp = self._decompressor(self._fp)
755 self._fp = self._decompressor(self._fp)
754 indebug(self.ui, 'start extraction of bundle2 parts')
756 indebug(self.ui, 'start extraction of bundle2 parts')
755 headerblock = self._readpartheader()
757 headerblock = self._readpartheader()
756 while headerblock is not None:
758 while headerblock is not None:
757 part = unbundlepart(self.ui, headerblock, self._fp)
759 part = unbundlepart(self.ui, headerblock, self._fp)
758 yield part
760 yield part
759 part.seek(0, 2)
761 part.seek(0, 2)
760 headerblock = self._readpartheader()
762 headerblock = self._readpartheader()
761 indebug(self.ui, 'end of bundle2 stream')
763 indebug(self.ui, 'end of bundle2 stream')
762
764
763 def _readpartheader(self):
765 def _readpartheader(self):
764 """reads a part header size and return the bytes blob
766 """reads a part header size and return the bytes blob
765
767
766 returns None if empty"""
768 returns None if empty"""
767 headersize = self._unpack(_fpartheadersize)[0]
769 headersize = self._unpack(_fpartheadersize)[0]
768 if headersize < 0:
770 if headersize < 0:
769 raise error.BundleValueError('negative part header size: %i'
771 raise error.BundleValueError('negative part header size: %i'
770 % headersize)
772 % headersize)
771 indebug(self.ui, 'part header size: %i' % headersize)
773 indebug(self.ui, 'part header size: %i' % headersize)
772 if headersize:
774 if headersize:
773 return self._readexact(headersize)
775 return self._readexact(headersize)
774 return None
776 return None
775
777
776 def compressed(self):
778 def compressed(self):
777 return False
779 return False
778
780
779 formatmap = {'20': unbundle20}
781 formatmap = {'20': unbundle20}
780
782
781 b2streamparamsmap = {}
783 b2streamparamsmap = {}
782
784
783 def b2streamparamhandler(name):
785 def b2streamparamhandler(name):
784 """register a handler for a stream level parameter"""
786 """register a handler for a stream level parameter"""
785 def decorator(func):
787 def decorator(func):
786 assert name not in formatmap
788 assert name not in formatmap
787 b2streamparamsmap[name] = func
789 b2streamparamsmap[name] = func
788 return func
790 return func
789 return decorator
791 return decorator
790
792
791 @b2streamparamhandler('compression')
793 @b2streamparamhandler('compression')
792 def processcompression(unbundler, param, value):
794 def processcompression(unbundler, param, value):
793 """read compression parameter and install payload decompression"""
795 """read compression parameter and install payload decompression"""
794 if value not in util.decompressors:
796 if value not in util.decompressors:
795 raise error.BundleUnknownFeatureError(params=(param,),
797 raise error.BundleUnknownFeatureError(params=(param,),
796 values=(value,))
798 values=(value,))
797 unbundler._decompressor = util.decompressors[value]
799 unbundler._decompressor = util.decompressors[value]
798
800
799 class bundlepart(object):
801 class bundlepart(object):
800 """A bundle2 part contains application level payload
802 """A bundle2 part contains application level payload
801
803
802 The part `type` is used to route the part to the application level
804 The part `type` is used to route the part to the application level
803 handler.
805 handler.
804
806
805 The part payload is contained in ``part.data``. It could be raw bytes or a
807 The part payload is contained in ``part.data``. It could be raw bytes or a
806 generator of byte chunks.
808 generator of byte chunks.
807
809
808 You can add parameters to the part using the ``addparam`` method.
810 You can add parameters to the part using the ``addparam`` method.
809 Parameters can be either mandatory (default) or advisory. Remote side
811 Parameters can be either mandatory (default) or advisory. Remote side
810 should be able to safely ignore the advisory ones.
812 should be able to safely ignore the advisory ones.
811
813
812 Both data and parameters cannot be modified after the generation has begun.
814 Both data and parameters cannot be modified after the generation has begun.
813 """
815 """
814
816
815 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
817 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
816 data='', mandatory=True):
818 data='', mandatory=True):
817 validateparttype(parttype)
819 validateparttype(parttype)
818 self.id = None
820 self.id = None
819 self.type = parttype
821 self.type = parttype
820 self._data = data
822 self._data = data
821 self._mandatoryparams = list(mandatoryparams)
823 self._mandatoryparams = list(mandatoryparams)
822 self._advisoryparams = list(advisoryparams)
824 self._advisoryparams = list(advisoryparams)
823 # checking for duplicated entries
825 # checking for duplicated entries
824 self._seenparams = set()
826 self._seenparams = set()
825 for pname, __ in self._mandatoryparams + self._advisoryparams:
827 for pname, __ in self._mandatoryparams + self._advisoryparams:
826 if pname in self._seenparams:
828 if pname in self._seenparams:
827 raise RuntimeError('duplicated params: %s' % pname)
829 raise RuntimeError('duplicated params: %s' % pname)
828 self._seenparams.add(pname)
830 self._seenparams.add(pname)
829 # status of the part's generation:
831 # status of the part's generation:
830 # - None: not started,
832 # - None: not started,
831 # - False: currently generated,
833 # - False: currently generated,
832 # - True: generation done.
834 # - True: generation done.
833 self._generated = None
835 self._generated = None
834 self.mandatory = mandatory
836 self.mandatory = mandatory
835
837
836 def copy(self):
838 def copy(self):
837 """return a copy of the part
839 """return a copy of the part
838
840
839 The new part have the very same content but no partid assigned yet.
841 The new part have the very same content but no partid assigned yet.
840 Parts with generated data cannot be copied."""
842 Parts with generated data cannot be copied."""
841 assert not util.safehasattr(self.data, 'next')
843 assert not util.safehasattr(self.data, 'next')
842 return self.__class__(self.type, self._mandatoryparams,
844 return self.__class__(self.type, self._mandatoryparams,
843 self._advisoryparams, self._data, self.mandatory)
845 self._advisoryparams, self._data, self.mandatory)
844
846
845 # methods used to defines the part content
847 # methods used to defines the part content
846 def __setdata(self, data):
848 def __setdata(self, data):
847 if self._generated is not None:
849 if self._generated is not None:
848 raise error.ReadOnlyPartError('part is being generated')
850 raise error.ReadOnlyPartError('part is being generated')
849 self._data = data
851 self._data = data
850 def __getdata(self):
852 def __getdata(self):
851 return self._data
853 return self._data
852 data = property(__getdata, __setdata)
854 data = property(__getdata, __setdata)
853
855
854 @property
856 @property
855 def mandatoryparams(self):
857 def mandatoryparams(self):
856 # make it an immutable tuple to force people through ``addparam``
858 # make it an immutable tuple to force people through ``addparam``
857 return tuple(self._mandatoryparams)
859 return tuple(self._mandatoryparams)
858
860
859 @property
861 @property
860 def advisoryparams(self):
862 def advisoryparams(self):
861 # make it an immutable tuple to force people through ``addparam``
863 # make it an immutable tuple to force people through ``addparam``
862 return tuple(self._advisoryparams)
864 return tuple(self._advisoryparams)
863
865
864 def addparam(self, name, value='', mandatory=True):
866 def addparam(self, name, value='', mandatory=True):
865 if self._generated is not None:
867 if self._generated is not None:
866 raise error.ReadOnlyPartError('part is being generated')
868 raise error.ReadOnlyPartError('part is being generated')
867 if name in self._seenparams:
869 if name in self._seenparams:
868 raise ValueError('duplicated params: %s' % name)
870 raise ValueError('duplicated params: %s' % name)
869 self._seenparams.add(name)
871 self._seenparams.add(name)
870 params = self._advisoryparams
872 params = self._advisoryparams
871 if mandatory:
873 if mandatory:
872 params = self._mandatoryparams
874 params = self._mandatoryparams
873 params.append((name, value))
875 params.append((name, value))
874
876
875 # methods used to generates the bundle2 stream
877 # methods used to generates the bundle2 stream
876 def getchunks(self, ui):
878 def getchunks(self, ui):
877 if self._generated is not None:
879 if self._generated is not None:
878 raise RuntimeError('part can only be consumed once')
880 raise RuntimeError('part can only be consumed once')
879 self._generated = False
881 self._generated = False
880
882
881 if ui.debugflag:
883 if ui.debugflag:
882 msg = ['bundle2-output-part: "%s"' % self.type]
884 msg = ['bundle2-output-part: "%s"' % self.type]
883 if not self.mandatory:
885 if not self.mandatory:
884 msg.append(' (advisory)')
886 msg.append(' (advisory)')
885 nbmp = len(self.mandatoryparams)
887 nbmp = len(self.mandatoryparams)
886 nbap = len(self.advisoryparams)
888 nbap = len(self.advisoryparams)
887 if nbmp or nbap:
889 if nbmp or nbap:
888 msg.append(' (params:')
890 msg.append(' (params:')
889 if nbmp:
891 if nbmp:
890 msg.append(' %i mandatory' % nbmp)
892 msg.append(' %i mandatory' % nbmp)
891 if nbap:
893 if nbap:
892 msg.append(' %i advisory' % nbmp)
894 msg.append(' %i advisory' % nbmp)
893 msg.append(')')
895 msg.append(')')
894 if not self.data:
896 if not self.data:
895 msg.append(' empty payload')
897 msg.append(' empty payload')
896 elif util.safehasattr(self.data, 'next'):
898 elif util.safehasattr(self.data, 'next'):
897 msg.append(' streamed payload')
899 msg.append(' streamed payload')
898 else:
900 else:
899 msg.append(' %i bytes payload' % len(self.data))
901 msg.append(' %i bytes payload' % len(self.data))
900 msg.append('\n')
902 msg.append('\n')
901 ui.debug(''.join(msg))
903 ui.debug(''.join(msg))
902
904
903 #### header
905 #### header
904 if self.mandatory:
906 if self.mandatory:
905 parttype = self.type.upper()
907 parttype = self.type.upper()
906 else:
908 else:
907 parttype = self.type.lower()
909 parttype = self.type.lower()
908 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
910 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
909 ## parttype
911 ## parttype
910 header = [_pack(_fparttypesize, len(parttype)),
912 header = [_pack(_fparttypesize, len(parttype)),
911 parttype, _pack(_fpartid, self.id),
913 parttype, _pack(_fpartid, self.id),
912 ]
914 ]
913 ## parameters
915 ## parameters
914 # count
916 # count
915 manpar = self.mandatoryparams
917 manpar = self.mandatoryparams
916 advpar = self.advisoryparams
918 advpar = self.advisoryparams
917 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
919 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
918 # size
920 # size
919 parsizes = []
921 parsizes = []
920 for key, value in manpar:
922 for key, value in manpar:
921 parsizes.append(len(key))
923 parsizes.append(len(key))
922 parsizes.append(len(value))
924 parsizes.append(len(value))
923 for key, value in advpar:
925 for key, value in advpar:
924 parsizes.append(len(key))
926 parsizes.append(len(key))
925 parsizes.append(len(value))
927 parsizes.append(len(value))
926 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
928 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
927 header.append(paramsizes)
929 header.append(paramsizes)
928 # key, value
930 # key, value
929 for key, value in manpar:
931 for key, value in manpar:
930 header.append(key)
932 header.append(key)
931 header.append(value)
933 header.append(value)
932 for key, value in advpar:
934 for key, value in advpar:
933 header.append(key)
935 header.append(key)
934 header.append(value)
936 header.append(value)
935 ## finalize header
937 ## finalize header
936 headerchunk = ''.join(header)
938 headerchunk = ''.join(header)
937 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
939 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
938 yield _pack(_fpartheadersize, len(headerchunk))
940 yield _pack(_fpartheadersize, len(headerchunk))
939 yield headerchunk
941 yield headerchunk
940 ## payload
942 ## payload
941 try:
943 try:
942 for chunk in self._payloadchunks():
944 for chunk in self._payloadchunks():
943 outdebug(ui, 'payload chunk size: %i' % len(chunk))
945 outdebug(ui, 'payload chunk size: %i' % len(chunk))
944 yield _pack(_fpayloadsize, len(chunk))
946 yield _pack(_fpayloadsize, len(chunk))
945 yield chunk
947 yield chunk
946 except GeneratorExit:
948 except GeneratorExit:
947 # GeneratorExit means that nobody is listening for our
949 # GeneratorExit means that nobody is listening for our
948 # results anyway, so just bail quickly rather than trying
950 # results anyway, so just bail quickly rather than trying
949 # to produce an error part.
951 # to produce an error part.
950 ui.debug('bundle2-generatorexit\n')
952 ui.debug('bundle2-generatorexit\n')
951 raise
953 raise
952 except BaseException as exc:
954 except BaseException as exc:
953 # backup exception data for later
955 # backup exception data for later
954 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
956 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
955 % exc)
957 % exc)
956 exc_info = sys.exc_info()
958 exc_info = sys.exc_info()
957 msg = 'unexpected error: %s' % exc
959 msg = 'unexpected error: %s' % exc
958 interpart = bundlepart('error:abort', [('message', msg)],
960 interpart = bundlepart('error:abort', [('message', msg)],
959 mandatory=False)
961 mandatory=False)
960 interpart.id = 0
962 interpart.id = 0
961 yield _pack(_fpayloadsize, -1)
963 yield _pack(_fpayloadsize, -1)
962 for chunk in interpart.getchunks(ui=ui):
964 for chunk in interpart.getchunks(ui=ui):
963 yield chunk
965 yield chunk
964 outdebug(ui, 'closing payload chunk')
966 outdebug(ui, 'closing payload chunk')
965 # abort current part payload
967 # abort current part payload
966 yield _pack(_fpayloadsize, 0)
968 yield _pack(_fpayloadsize, 0)
967 raise exc_info[0], exc_info[1], exc_info[2]
969 raise exc_info[0], exc_info[1], exc_info[2]
968 # end of payload
970 # end of payload
969 outdebug(ui, 'closing payload chunk')
971 outdebug(ui, 'closing payload chunk')
970 yield _pack(_fpayloadsize, 0)
972 yield _pack(_fpayloadsize, 0)
971 self._generated = True
973 self._generated = True
972
974
973 def _payloadchunks(self):
975 def _payloadchunks(self):
974 """yield chunks of a the part payload
976 """yield chunks of a the part payload
975
977
976 Exists to handle the different methods to provide data to a part."""
978 Exists to handle the different methods to provide data to a part."""
977 # we only support fixed size data now.
979 # we only support fixed size data now.
978 # This will be improved in the future.
980 # This will be improved in the future.
979 if util.safehasattr(self.data, 'next'):
981 if util.safehasattr(self.data, 'next'):
980 buff = util.chunkbuffer(self.data)
982 buff = util.chunkbuffer(self.data)
981 chunk = buff.read(preferedchunksize)
983 chunk = buff.read(preferedchunksize)
982 while chunk:
984 while chunk:
983 yield chunk
985 yield chunk
984 chunk = buff.read(preferedchunksize)
986 chunk = buff.read(preferedchunksize)
985 elif len(self.data):
987 elif len(self.data):
986 yield self.data
988 yield self.data
987
989
988
990
989 flaginterrupt = -1
991 flaginterrupt = -1
990
992
991 class interrupthandler(unpackermixin):
993 class interrupthandler(unpackermixin):
992 """read one part and process it with restricted capability
994 """read one part and process it with restricted capability
993
995
994 This allows to transmit exception raised on the producer size during part
996 This allows to transmit exception raised on the producer size during part
995 iteration while the consumer is reading a part.
997 iteration while the consumer is reading a part.
996
998
997 Part processed in this manner only have access to a ui object,"""
999 Part processed in this manner only have access to a ui object,"""
998
1000
999 def __init__(self, ui, fp):
1001 def __init__(self, ui, fp):
1000 super(interrupthandler, self).__init__(fp)
1002 super(interrupthandler, self).__init__(fp)
1001 self.ui = ui
1003 self.ui = ui
1002
1004
1003 def _readpartheader(self):
1005 def _readpartheader(self):
1004 """reads a part header size and return the bytes blob
1006 """reads a part header size and return the bytes blob
1005
1007
1006 returns None if empty"""
1008 returns None if empty"""
1007 headersize = self._unpack(_fpartheadersize)[0]
1009 headersize = self._unpack(_fpartheadersize)[0]
1008 if headersize < 0:
1010 if headersize < 0:
1009 raise error.BundleValueError('negative part header size: %i'
1011 raise error.BundleValueError('negative part header size: %i'
1010 % headersize)
1012 % headersize)
1011 indebug(self.ui, 'part header size: %i\n' % headersize)
1013 indebug(self.ui, 'part header size: %i\n' % headersize)
1012 if headersize:
1014 if headersize:
1013 return self._readexact(headersize)
1015 return self._readexact(headersize)
1014 return None
1016 return None
1015
1017
1016 def __call__(self):
1018 def __call__(self):
1017
1019
1018 self.ui.debug('bundle2-input-stream-interrupt:'
1020 self.ui.debug('bundle2-input-stream-interrupt:'
1019 ' opening out of band context\n')
1021 ' opening out of band context\n')
1020 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1022 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1021 headerblock = self._readpartheader()
1023 headerblock = self._readpartheader()
1022 if headerblock is None:
1024 if headerblock is None:
1023 indebug(self.ui, 'no part found during interruption.')
1025 indebug(self.ui, 'no part found during interruption.')
1024 return
1026 return
1025 part = unbundlepart(self.ui, headerblock, self._fp)
1027 part = unbundlepart(self.ui, headerblock, self._fp)
1026 op = interruptoperation(self.ui)
1028 op = interruptoperation(self.ui)
1027 _processpart(op, part)
1029 _processpart(op, part)
1028 self.ui.debug('bundle2-input-stream-interrupt:'
1030 self.ui.debug('bundle2-input-stream-interrupt:'
1029 ' closing out of band context\n')
1031 ' closing out of band context\n')
1030
1032
1031 class interruptoperation(object):
1033 class interruptoperation(object):
1032 """A limited operation to be use by part handler during interruption
1034 """A limited operation to be use by part handler during interruption
1033
1035
1034 It only have access to an ui object.
1036 It only have access to an ui object.
1035 """
1037 """
1036
1038
1037 def __init__(self, ui):
1039 def __init__(self, ui):
1038 self.ui = ui
1040 self.ui = ui
1039 self.reply = None
1041 self.reply = None
1040 self.captureoutput = False
1042 self.captureoutput = False
1041
1043
1042 @property
1044 @property
1043 def repo(self):
1045 def repo(self):
1044 raise RuntimeError('no repo access from stream interruption')
1046 raise RuntimeError('no repo access from stream interruption')
1045
1047
1046 def gettransaction(self):
1048 def gettransaction(self):
1047 raise TransactionUnavailable('no repo access from stream interruption')
1049 raise TransactionUnavailable('no repo access from stream interruption')
1048
1050
1049 class unbundlepart(unpackermixin):
1051 class unbundlepart(unpackermixin):
1050 """a bundle part read from a bundle"""
1052 """a bundle part read from a bundle"""
1051
1053
1052 def __init__(self, ui, header, fp):
1054 def __init__(self, ui, header, fp):
1053 super(unbundlepart, self).__init__(fp)
1055 super(unbundlepart, self).__init__(fp)
1054 self.ui = ui
1056 self.ui = ui
1055 # unbundle state attr
1057 # unbundle state attr
1056 self._headerdata = header
1058 self._headerdata = header
1057 self._headeroffset = 0
1059 self._headeroffset = 0
1058 self._initialized = False
1060 self._initialized = False
1059 self.consumed = False
1061 self.consumed = False
1060 # part data
1062 # part data
1061 self.id = None
1063 self.id = None
1062 self.type = None
1064 self.type = None
1063 self.mandatoryparams = None
1065 self.mandatoryparams = None
1064 self.advisoryparams = None
1066 self.advisoryparams = None
1065 self.params = None
1067 self.params = None
1066 self.mandatorykeys = ()
1068 self.mandatorykeys = ()
1067 self._payloadstream = None
1069 self._payloadstream = None
1068 self._readheader()
1070 self._readheader()
1069 self._mandatory = None
1071 self._mandatory = None
1070 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1072 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1071 self._pos = 0
1073 self._pos = 0
1072
1074
1073 def _fromheader(self, size):
1075 def _fromheader(self, size):
1074 """return the next <size> byte from the header"""
1076 """return the next <size> byte from the header"""
1075 offset = self._headeroffset
1077 offset = self._headeroffset
1076 data = self._headerdata[offset:(offset + size)]
1078 data = self._headerdata[offset:(offset + size)]
1077 self._headeroffset = offset + size
1079 self._headeroffset = offset + size
1078 return data
1080 return data
1079
1081
1080 def _unpackheader(self, format):
1082 def _unpackheader(self, format):
1081 """read given format from header
1083 """read given format from header
1082
1084
1083 This automatically compute the size of the format to read."""
1085 This automatically compute the size of the format to read."""
1084 data = self._fromheader(struct.calcsize(format))
1086 data = self._fromheader(struct.calcsize(format))
1085 return _unpack(format, data)
1087 return _unpack(format, data)
1086
1088
1087 def _initparams(self, mandatoryparams, advisoryparams):
1089 def _initparams(self, mandatoryparams, advisoryparams):
1088 """internal function to setup all logic related parameters"""
1090 """internal function to setup all logic related parameters"""
1089 # make it read only to prevent people touching it by mistake.
1091 # make it read only to prevent people touching it by mistake.
1090 self.mandatoryparams = tuple(mandatoryparams)
1092 self.mandatoryparams = tuple(mandatoryparams)
1091 self.advisoryparams = tuple(advisoryparams)
1093 self.advisoryparams = tuple(advisoryparams)
1092 # user friendly UI
1094 # user friendly UI
1093 self.params = dict(self.mandatoryparams)
1095 self.params = dict(self.mandatoryparams)
1094 self.params.update(dict(self.advisoryparams))
1096 self.params.update(dict(self.advisoryparams))
1095 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1097 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1096
1098
1097 def _payloadchunks(self, chunknum=0):
1099 def _payloadchunks(self, chunknum=0):
1098 '''seek to specified chunk and start yielding data'''
1100 '''seek to specified chunk and start yielding data'''
1099 if len(self._chunkindex) == 0:
1101 if len(self._chunkindex) == 0:
1100 assert chunknum == 0, 'Must start with chunk 0'
1102 assert chunknum == 0, 'Must start with chunk 0'
1101 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1103 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1102 else:
1104 else:
1103 assert chunknum < len(self._chunkindex), \
1105 assert chunknum < len(self._chunkindex), \
1104 'Unknown chunk %d' % chunknum
1106 'Unknown chunk %d' % chunknum
1105 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1107 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1106
1108
1107 pos = self._chunkindex[chunknum][0]
1109 pos = self._chunkindex[chunknum][0]
1108 payloadsize = self._unpack(_fpayloadsize)[0]
1110 payloadsize = self._unpack(_fpayloadsize)[0]
1109 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1111 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1110 while payloadsize:
1112 while payloadsize:
1111 if payloadsize == flaginterrupt:
1113 if payloadsize == flaginterrupt:
1112 # interruption detection, the handler will now read a
1114 # interruption detection, the handler will now read a
1113 # single part and process it.
1115 # single part and process it.
1114 interrupthandler(self.ui, self._fp)()
1116 interrupthandler(self.ui, self._fp)()
1115 elif payloadsize < 0:
1117 elif payloadsize < 0:
1116 msg = 'negative payload chunk size: %i' % payloadsize
1118 msg = 'negative payload chunk size: %i' % payloadsize
1117 raise error.BundleValueError(msg)
1119 raise error.BundleValueError(msg)
1118 else:
1120 else:
1119 result = self._readexact(payloadsize)
1121 result = self._readexact(payloadsize)
1120 chunknum += 1
1122 chunknum += 1
1121 pos += payloadsize
1123 pos += payloadsize
1122 if chunknum == len(self._chunkindex):
1124 if chunknum == len(self._chunkindex):
1123 self._chunkindex.append((pos,
1125 self._chunkindex.append((pos,
1124 super(unbundlepart, self).tell()))
1126 super(unbundlepart, self).tell()))
1125 yield result
1127 yield result
1126 payloadsize = self._unpack(_fpayloadsize)[0]
1128 payloadsize = self._unpack(_fpayloadsize)[0]
1127 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1129 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1128
1130
1129 def _findchunk(self, pos):
1131 def _findchunk(self, pos):
1130 '''for a given payload position, return a chunk number and offset'''
1132 '''for a given payload position, return a chunk number and offset'''
1131 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1133 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1132 if ppos == pos:
1134 if ppos == pos:
1133 return chunk, 0
1135 return chunk, 0
1134 elif ppos > pos:
1136 elif ppos > pos:
1135 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1137 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1136 raise ValueError('Unknown chunk')
1138 raise ValueError('Unknown chunk')
1137
1139
1138 def _readheader(self):
1140 def _readheader(self):
1139 """read the header and setup the object"""
1141 """read the header and setup the object"""
1140 typesize = self._unpackheader(_fparttypesize)[0]
1142 typesize = self._unpackheader(_fparttypesize)[0]
1141 self.type = self._fromheader(typesize)
1143 self.type = self._fromheader(typesize)
1142 indebug(self.ui, 'part type: "%s"' % self.type)
1144 indebug(self.ui, 'part type: "%s"' % self.type)
1143 self.id = self._unpackheader(_fpartid)[0]
1145 self.id = self._unpackheader(_fpartid)[0]
1144 indebug(self.ui, 'part id: "%s"' % self.id)
1146 indebug(self.ui, 'part id: "%s"' % self.id)
1145 # extract mandatory bit from type
1147 # extract mandatory bit from type
1146 self.mandatory = (self.type != self.type.lower())
1148 self.mandatory = (self.type != self.type.lower())
1147 self.type = self.type.lower()
1149 self.type = self.type.lower()
1148 ## reading parameters
1150 ## reading parameters
1149 # param count
1151 # param count
1150 mancount, advcount = self._unpackheader(_fpartparamcount)
1152 mancount, advcount = self._unpackheader(_fpartparamcount)
1151 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1153 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1152 # param size
1154 # param size
1153 fparamsizes = _makefpartparamsizes(mancount + advcount)
1155 fparamsizes = _makefpartparamsizes(mancount + advcount)
1154 paramsizes = self._unpackheader(fparamsizes)
1156 paramsizes = self._unpackheader(fparamsizes)
1155 # make it a list of couple again
1157 # make it a list of couple again
1156 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1158 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1157 # split mandatory from advisory
1159 # split mandatory from advisory
1158 mansizes = paramsizes[:mancount]
1160 mansizes = paramsizes[:mancount]
1159 advsizes = paramsizes[mancount:]
1161 advsizes = paramsizes[mancount:]
1160 # retrieve param value
1162 # retrieve param value
1161 manparams = []
1163 manparams = []
1162 for key, value in mansizes:
1164 for key, value in mansizes:
1163 manparams.append((self._fromheader(key), self._fromheader(value)))
1165 manparams.append((self._fromheader(key), self._fromheader(value)))
1164 advparams = []
1166 advparams = []
1165 for key, value in advsizes:
1167 for key, value in advsizes:
1166 advparams.append((self._fromheader(key), self._fromheader(value)))
1168 advparams.append((self._fromheader(key), self._fromheader(value)))
1167 self._initparams(manparams, advparams)
1169 self._initparams(manparams, advparams)
1168 ## part payload
1170 ## part payload
1169 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1171 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1170 # we read the data, tell it
1172 # we read the data, tell it
1171 self._initialized = True
1173 self._initialized = True
1172
1174
1173 def read(self, size=None):
1175 def read(self, size=None):
1174 """read payload data"""
1176 """read payload data"""
1175 if not self._initialized:
1177 if not self._initialized:
1176 self._readheader()
1178 self._readheader()
1177 if size is None:
1179 if size is None:
1178 data = self._payloadstream.read()
1180 data = self._payloadstream.read()
1179 else:
1181 else:
1180 data = self._payloadstream.read(size)
1182 data = self._payloadstream.read(size)
1181 self._pos += len(data)
1183 self._pos += len(data)
1182 if size is None or len(data) < size:
1184 if size is None or len(data) < size:
1183 if not self.consumed and self._pos:
1185 if not self.consumed and self._pos:
1184 self.ui.debug('bundle2-input-part: total payload size %i\n'
1186 self.ui.debug('bundle2-input-part: total payload size %i\n'
1185 % self._pos)
1187 % self._pos)
1186 self.consumed = True
1188 self.consumed = True
1187 return data
1189 return data
1188
1190
1189 def tell(self):
1191 def tell(self):
1190 return self._pos
1192 return self._pos
1191
1193
1192 def seek(self, offset, whence=0):
1194 def seek(self, offset, whence=0):
1193 if whence == 0:
1195 if whence == 0:
1194 newpos = offset
1196 newpos = offset
1195 elif whence == 1:
1197 elif whence == 1:
1196 newpos = self._pos + offset
1198 newpos = self._pos + offset
1197 elif whence == 2:
1199 elif whence == 2:
1198 if not self.consumed:
1200 if not self.consumed:
1199 self.read()
1201 self.read()
1200 newpos = self._chunkindex[-1][0] - offset
1202 newpos = self._chunkindex[-1][0] - offset
1201 else:
1203 else:
1202 raise ValueError('Unknown whence value: %r' % (whence,))
1204 raise ValueError('Unknown whence value: %r' % (whence,))
1203
1205
1204 if newpos > self._chunkindex[-1][0] and not self.consumed:
1206 if newpos > self._chunkindex[-1][0] and not self.consumed:
1205 self.read()
1207 self.read()
1206 if not 0 <= newpos <= self._chunkindex[-1][0]:
1208 if not 0 <= newpos <= self._chunkindex[-1][0]:
1207 raise ValueError('Offset out of range')
1209 raise ValueError('Offset out of range')
1208
1210
1209 if self._pos != newpos:
1211 if self._pos != newpos:
1210 chunk, internaloffset = self._findchunk(newpos)
1212 chunk, internaloffset = self._findchunk(newpos)
1211 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1213 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1212 adjust = self.read(internaloffset)
1214 adjust = self.read(internaloffset)
1213 if len(adjust) != internaloffset:
1215 if len(adjust) != internaloffset:
1214 raise error.Abort(_('Seek failed\n'))
1216 raise error.Abort(_('Seek failed\n'))
1215 self._pos = newpos
1217 self._pos = newpos
1216
1218
1217 # These are only the static capabilities.
1219 # These are only the static capabilities.
1218 # Check the 'getrepocaps' function for the rest.
1220 # Check the 'getrepocaps' function for the rest.
1219 capabilities = {'HG20': (),
1221 capabilities = {'HG20': (),
1220 'error': ('abort', 'unsupportedcontent', 'pushraced',
1222 'error': ('abort', 'unsupportedcontent', 'pushraced',
1221 'pushkey'),
1223 'pushkey'),
1222 'listkeys': (),
1224 'listkeys': (),
1223 'pushkey': (),
1225 'pushkey': (),
1224 'digests': tuple(sorted(util.DIGESTS.keys())),
1226 'digests': tuple(sorted(util.DIGESTS.keys())),
1225 'remote-changegroup': ('http', 'https'),
1227 'remote-changegroup': ('http', 'https'),
1226 'hgtagsfnodes': (),
1228 'hgtagsfnodes': (),
1227 }
1229 }
1228
1230
1229 def getrepocaps(repo, allowpushback=False):
1231 def getrepocaps(repo, allowpushback=False):
1230 """return the bundle2 capabilities for a given repo
1232 """return the bundle2 capabilities for a given repo
1231
1233
1232 Exists to allow extensions (like evolution) to mutate the capabilities.
1234 Exists to allow extensions (like evolution) to mutate the capabilities.
1233 """
1235 """
1234 caps = capabilities.copy()
1236 caps = capabilities.copy()
1235 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1237 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1236 if obsolete.isenabled(repo, obsolete.exchangeopt):
1238 if obsolete.isenabled(repo, obsolete.exchangeopt):
1237 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1239 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1238 caps['obsmarkers'] = supportedformat
1240 caps['obsmarkers'] = supportedformat
1239 if allowpushback:
1241 if allowpushback:
1240 caps['pushback'] = ()
1242 caps['pushback'] = ()
1241 return caps
1243 return caps
1242
1244
1243 def bundle2caps(remote):
1245 def bundle2caps(remote):
1244 """return the bundle capabilities of a peer as dict"""
1246 """return the bundle capabilities of a peer as dict"""
1245 raw = remote.capable('bundle2')
1247 raw = remote.capable('bundle2')
1246 if not raw and raw != '':
1248 if not raw and raw != '':
1247 return {}
1249 return {}
1248 capsblob = urllib.unquote(remote.capable('bundle2'))
1250 capsblob = urllib.unquote(remote.capable('bundle2'))
1249 return decodecaps(capsblob)
1251 return decodecaps(capsblob)
1250
1252
1251 def obsmarkersversion(caps):
1253 def obsmarkersversion(caps):
1252 """extract the list of supported obsmarkers versions from a bundle2caps dict
1254 """extract the list of supported obsmarkers versions from a bundle2caps dict
1253 """
1255 """
1254 obscaps = caps.get('obsmarkers', ())
1256 obscaps = caps.get('obsmarkers', ())
1255 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1257 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1256
1258
1257 @parthandler('changegroup', ('version', 'nbchanges'))
1259 @parthandler('changegroup', ('version', 'nbchanges'))
1258 def handlechangegroup(op, inpart):
1260 def handlechangegroup(op, inpart):
1259 """apply a changegroup part on the repo
1261 """apply a changegroup part on the repo
1260
1262
1261 This is a very early implementation that will massive rework before being
1263 This is a very early implementation that will massive rework before being
1262 inflicted to any end-user.
1264 inflicted to any end-user.
1263 """
1265 """
1264 # Make sure we trigger a transaction creation
1266 # Make sure we trigger a transaction creation
1265 #
1267 #
1266 # The addchangegroup function will get a transaction object by itself, but
1268 # The addchangegroup function will get a transaction object by itself, but
1267 # we need to make sure we trigger the creation of a transaction object used
1269 # we need to make sure we trigger the creation of a transaction object used
1268 # for the whole processing scope.
1270 # for the whole processing scope.
1269 op.gettransaction()
1271 op.gettransaction()
1270 unpackerversion = inpart.params.get('version', '01')
1272 unpackerversion = inpart.params.get('version', '01')
1271 # We should raise an appropriate exception here
1273 # We should raise an appropriate exception here
1272 unpacker = changegroup.packermap[unpackerversion][1]
1274 unpacker = changegroup.packermap[unpackerversion][1]
1273 cg = unpacker(inpart, None)
1275 cg = unpacker(inpart, None)
1274 # the source and url passed here are overwritten by the one contained in
1276 # the source and url passed here are overwritten by the one contained in
1275 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1277 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1276 nbchangesets = None
1278 nbchangesets = None
1277 if 'nbchanges' in inpart.params:
1279 if 'nbchanges' in inpart.params:
1278 nbchangesets = int(inpart.params.get('nbchanges'))
1280 nbchangesets = int(inpart.params.get('nbchanges'))
1279 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1281 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1280 op.records.add('changegroup', {'return': ret})
1282 op.records.add('changegroup', {'return': ret})
1281 if op.reply is not None:
1283 if op.reply is not None:
1282 # This is definitely not the final form of this
1284 # This is definitely not the final form of this
1283 # return. But one need to start somewhere.
1285 # return. But one need to start somewhere.
1284 part = op.reply.newpart('reply:changegroup', mandatory=False)
1286 part = op.reply.newpart('reply:changegroup', mandatory=False)
1285 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1287 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1286 part.addparam('return', '%i' % ret, mandatory=False)
1288 part.addparam('return', '%i' % ret, mandatory=False)
1287 assert not inpart.read()
1289 assert not inpart.read()
1288
1290
1289 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1291 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1290 ['digest:%s' % k for k in util.DIGESTS.keys()])
1292 ['digest:%s' % k for k in util.DIGESTS.keys()])
1291 @parthandler('remote-changegroup', _remotechangegroupparams)
1293 @parthandler('remote-changegroup', _remotechangegroupparams)
1292 def handleremotechangegroup(op, inpart):
1294 def handleremotechangegroup(op, inpart):
1293 """apply a bundle10 on the repo, given an url and validation information
1295 """apply a bundle10 on the repo, given an url and validation information
1294
1296
1295 All the information about the remote bundle to import are given as
1297 All the information about the remote bundle to import are given as
1296 parameters. The parameters include:
1298 parameters. The parameters include:
1297 - url: the url to the bundle10.
1299 - url: the url to the bundle10.
1298 - size: the bundle10 file size. It is used to validate what was
1300 - size: the bundle10 file size. It is used to validate what was
1299 retrieved by the client matches the server knowledge about the bundle.
1301 retrieved by the client matches the server knowledge about the bundle.
1300 - digests: a space separated list of the digest types provided as
1302 - digests: a space separated list of the digest types provided as
1301 parameters.
1303 parameters.
1302 - digest:<digest-type>: the hexadecimal representation of the digest with
1304 - digest:<digest-type>: the hexadecimal representation of the digest with
1303 that name. Like the size, it is used to validate what was retrieved by
1305 that name. Like the size, it is used to validate what was retrieved by
1304 the client matches what the server knows about the bundle.
1306 the client matches what the server knows about the bundle.
1305
1307
1306 When multiple digest types are given, all of them are checked.
1308 When multiple digest types are given, all of them are checked.
1307 """
1309 """
1308 try:
1310 try:
1309 raw_url = inpart.params['url']
1311 raw_url = inpart.params['url']
1310 except KeyError:
1312 except KeyError:
1311 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1313 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1312 parsed_url = util.url(raw_url)
1314 parsed_url = util.url(raw_url)
1313 if parsed_url.scheme not in capabilities['remote-changegroup']:
1315 if parsed_url.scheme not in capabilities['remote-changegroup']:
1314 raise error.Abort(_('remote-changegroup does not support %s urls') %
1316 raise error.Abort(_('remote-changegroup does not support %s urls') %
1315 parsed_url.scheme)
1317 parsed_url.scheme)
1316
1318
1317 try:
1319 try:
1318 size = int(inpart.params['size'])
1320 size = int(inpart.params['size'])
1319 except ValueError:
1321 except ValueError:
1320 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1322 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1321 % 'size')
1323 % 'size')
1322 except KeyError:
1324 except KeyError:
1323 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1325 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1324
1326
1325 digests = {}
1327 digests = {}
1326 for typ in inpart.params.get('digests', '').split():
1328 for typ in inpart.params.get('digests', '').split():
1327 param = 'digest:%s' % typ
1329 param = 'digest:%s' % typ
1328 try:
1330 try:
1329 value = inpart.params[param]
1331 value = inpart.params[param]
1330 except KeyError:
1332 except KeyError:
1331 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1333 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1332 param)
1334 param)
1333 digests[typ] = value
1335 digests[typ] = value
1334
1336
1335 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1337 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1336
1338
1337 # Make sure we trigger a transaction creation
1339 # Make sure we trigger a transaction creation
1338 #
1340 #
1339 # The addchangegroup function will get a transaction object by itself, but
1341 # The addchangegroup function will get a transaction object by itself, but
1340 # we need to make sure we trigger the creation of a transaction object used
1342 # we need to make sure we trigger the creation of a transaction object used
1341 # for the whole processing scope.
1343 # for the whole processing scope.
1342 op.gettransaction()
1344 op.gettransaction()
1343 from . import exchange
1345 from . import exchange
1344 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1346 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1345 if not isinstance(cg, changegroup.cg1unpacker):
1347 if not isinstance(cg, changegroup.cg1unpacker):
1346 raise error.Abort(_('%s: not a bundle version 1.0') %
1348 raise error.Abort(_('%s: not a bundle version 1.0') %
1347 util.hidepassword(raw_url))
1349 util.hidepassword(raw_url))
1348 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1350 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1349 op.records.add('changegroup', {'return': ret})
1351 op.records.add('changegroup', {'return': ret})
1350 if op.reply is not None:
1352 if op.reply is not None:
1351 # This is definitely not the final form of this
1353 # This is definitely not the final form of this
1352 # return. But one need to start somewhere.
1354 # return. But one need to start somewhere.
1353 part = op.reply.newpart('reply:changegroup')
1355 part = op.reply.newpart('reply:changegroup')
1354 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1356 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1355 part.addparam('return', '%i' % ret, mandatory=False)
1357 part.addparam('return', '%i' % ret, mandatory=False)
1356 try:
1358 try:
1357 real_part.validate()
1359 real_part.validate()
1358 except error.Abort as e:
1360 except error.Abort as e:
1359 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1361 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1360 (util.hidepassword(raw_url), str(e)))
1362 (util.hidepassword(raw_url), str(e)))
1361 assert not inpart.read()
1363 assert not inpart.read()
1362
1364
1363 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1365 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1364 def handlereplychangegroup(op, inpart):
1366 def handlereplychangegroup(op, inpart):
1365 ret = int(inpart.params['return'])
1367 ret = int(inpart.params['return'])
1366 replyto = int(inpart.params['in-reply-to'])
1368 replyto = int(inpart.params['in-reply-to'])
1367 op.records.add('changegroup', {'return': ret}, replyto)
1369 op.records.add('changegroup', {'return': ret}, replyto)
1368
1370
1369 @parthandler('check:heads')
1371 @parthandler('check:heads')
1370 def handlecheckheads(op, inpart):
1372 def handlecheckheads(op, inpart):
1371 """check that head of the repo did not change
1373 """check that head of the repo did not change
1372
1374
1373 This is used to detect a push race when using unbundle.
1375 This is used to detect a push race when using unbundle.
1374 This replaces the "heads" argument of unbundle."""
1376 This replaces the "heads" argument of unbundle."""
1375 h = inpart.read(20)
1377 h = inpart.read(20)
1376 heads = []
1378 heads = []
1377 while len(h) == 20:
1379 while len(h) == 20:
1378 heads.append(h)
1380 heads.append(h)
1379 h = inpart.read(20)
1381 h = inpart.read(20)
1380 assert not h
1382 assert not h
1381 # Trigger a transaction so that we are guaranteed to have the lock now.
1383 # Trigger a transaction so that we are guaranteed to have the lock now.
1382 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1384 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1383 op.gettransaction()
1385 op.gettransaction()
1384 if heads != op.repo.heads():
1386 if heads != op.repo.heads():
1385 raise error.PushRaced('repository changed while pushing - '
1387 raise error.PushRaced('repository changed while pushing - '
1386 'please try again')
1388 'please try again')
1387
1389
1388 @parthandler('output')
1390 @parthandler('output')
1389 def handleoutput(op, inpart):
1391 def handleoutput(op, inpart):
1390 """forward output captured on the server to the client"""
1392 """forward output captured on the server to the client"""
1391 for line in inpart.read().splitlines():
1393 for line in inpart.read().splitlines():
1392 op.ui.status(('remote: %s\n' % line))
1394 op.ui.status(('remote: %s\n' % line))
1393
1395
1394 @parthandler('replycaps')
1396 @parthandler('replycaps')
1395 def handlereplycaps(op, inpart):
1397 def handlereplycaps(op, inpart):
1396 """Notify that a reply bundle should be created
1398 """Notify that a reply bundle should be created
1397
1399
1398 The payload contains the capabilities information for the reply"""
1400 The payload contains the capabilities information for the reply"""
1399 caps = decodecaps(inpart.read())
1401 caps = decodecaps(inpart.read())
1400 if op.reply is None:
1402 if op.reply is None:
1401 op.reply = bundle20(op.ui, caps)
1403 op.reply = bundle20(op.ui, caps)
1402
1404
1403 @parthandler('error:abort', ('message', 'hint'))
1405 @parthandler('error:abort', ('message', 'hint'))
1404 def handleerrorabort(op, inpart):
1406 def handleerrorabort(op, inpart):
1405 """Used to transmit abort error over the wire"""
1407 """Used to transmit abort error over the wire"""
1406 raise error.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1408 raise error.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1407
1409
1408 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1410 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1409 'in-reply-to'))
1411 'in-reply-to'))
1410 def handleerrorpushkey(op, inpart):
1412 def handleerrorpushkey(op, inpart):
1411 """Used to transmit failure of a mandatory pushkey over the wire"""
1413 """Used to transmit failure of a mandatory pushkey over the wire"""
1412 kwargs = {}
1414 kwargs = {}
1413 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1415 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1414 value = inpart.params.get(name)
1416 value = inpart.params.get(name)
1415 if value is not None:
1417 if value is not None:
1416 kwargs[name] = value
1418 kwargs[name] = value
1417 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1419 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1418
1420
1419 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1421 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1420 def handleerrorunsupportedcontent(op, inpart):
1422 def handleerrorunsupportedcontent(op, inpart):
1421 """Used to transmit unknown content error over the wire"""
1423 """Used to transmit unknown content error over the wire"""
1422 kwargs = {}
1424 kwargs = {}
1423 parttype = inpart.params.get('parttype')
1425 parttype = inpart.params.get('parttype')
1424 if parttype is not None:
1426 if parttype is not None:
1425 kwargs['parttype'] = parttype
1427 kwargs['parttype'] = parttype
1426 params = inpart.params.get('params')
1428 params = inpart.params.get('params')
1427 if params is not None:
1429 if params is not None:
1428 kwargs['params'] = params.split('\0')
1430 kwargs['params'] = params.split('\0')
1429
1431
1430 raise error.BundleUnknownFeatureError(**kwargs)
1432 raise error.BundleUnknownFeatureError(**kwargs)
1431
1433
1432 @parthandler('error:pushraced', ('message',))
1434 @parthandler('error:pushraced', ('message',))
1433 def handleerrorpushraced(op, inpart):
1435 def handleerrorpushraced(op, inpart):
1434 """Used to transmit push race error over the wire"""
1436 """Used to transmit push race error over the wire"""
1435 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1437 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1436
1438
1437 @parthandler('listkeys', ('namespace',))
1439 @parthandler('listkeys', ('namespace',))
1438 def handlelistkeys(op, inpart):
1440 def handlelistkeys(op, inpart):
1439 """retrieve pushkey namespace content stored in a bundle2"""
1441 """retrieve pushkey namespace content stored in a bundle2"""
1440 namespace = inpart.params['namespace']
1442 namespace = inpart.params['namespace']
1441 r = pushkey.decodekeys(inpart.read())
1443 r = pushkey.decodekeys(inpart.read())
1442 op.records.add('listkeys', (namespace, r))
1444 op.records.add('listkeys', (namespace, r))
1443
1445
1444 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1446 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1445 def handlepushkey(op, inpart):
1447 def handlepushkey(op, inpart):
1446 """process a pushkey request"""
1448 """process a pushkey request"""
1447 dec = pushkey.decode
1449 dec = pushkey.decode
1448 namespace = dec(inpart.params['namespace'])
1450 namespace = dec(inpart.params['namespace'])
1449 key = dec(inpart.params['key'])
1451 key = dec(inpart.params['key'])
1450 old = dec(inpart.params['old'])
1452 old = dec(inpart.params['old'])
1451 new = dec(inpart.params['new'])
1453 new = dec(inpart.params['new'])
1452 # Grab the transaction to ensure that we have the lock before performing the
1454 # Grab the transaction to ensure that we have the lock before performing the
1453 # pushkey.
1455 # pushkey.
1454 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1456 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1455 op.gettransaction()
1457 op.gettransaction()
1456 ret = op.repo.pushkey(namespace, key, old, new)
1458 ret = op.repo.pushkey(namespace, key, old, new)
1457 record = {'namespace': namespace,
1459 record = {'namespace': namespace,
1458 'key': key,
1460 'key': key,
1459 'old': old,
1461 'old': old,
1460 'new': new}
1462 'new': new}
1461 op.records.add('pushkey', record)
1463 op.records.add('pushkey', record)
1462 if op.reply is not None:
1464 if op.reply is not None:
1463 rpart = op.reply.newpart('reply:pushkey')
1465 rpart = op.reply.newpart('reply:pushkey')
1464 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1466 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1465 rpart.addparam('return', '%i' % ret, mandatory=False)
1467 rpart.addparam('return', '%i' % ret, mandatory=False)
1466 if inpart.mandatory and not ret:
1468 if inpart.mandatory and not ret:
1467 kwargs = {}
1469 kwargs = {}
1468 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1470 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1469 if key in inpart.params:
1471 if key in inpart.params:
1470 kwargs[key] = inpart.params[key]
1472 kwargs[key] = inpart.params[key]
1471 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1473 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1472
1474
1473 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1475 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1474 def handlepushkeyreply(op, inpart):
1476 def handlepushkeyreply(op, inpart):
1475 """retrieve the result of a pushkey request"""
1477 """retrieve the result of a pushkey request"""
1476 ret = int(inpart.params['return'])
1478 ret = int(inpart.params['return'])
1477 partid = int(inpart.params['in-reply-to'])
1479 partid = int(inpart.params['in-reply-to'])
1478 op.records.add('pushkey', {'return': ret}, partid)
1480 op.records.add('pushkey', {'return': ret}, partid)
1479
1481
1480 @parthandler('obsmarkers')
1482 @parthandler('obsmarkers')
1481 def handleobsmarker(op, inpart):
1483 def handleobsmarker(op, inpart):
1482 """add a stream of obsmarkers to the repo"""
1484 """add a stream of obsmarkers to the repo"""
1483 tr = op.gettransaction()
1485 tr = op.gettransaction()
1484 markerdata = inpart.read()
1486 markerdata = inpart.read()
1485 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1487 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1486 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1488 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1487 % len(markerdata))
1489 % len(markerdata))
1488 # The mergemarkers call will crash if marker creation is not enabled.
1490 # The mergemarkers call will crash if marker creation is not enabled.
1489 # we want to avoid this if the part is advisory.
1491 # we want to avoid this if the part is advisory.
1490 if not inpart.mandatory and op.repo.obsstore.readonly:
1492 if not inpart.mandatory and op.repo.obsstore.readonly:
1491 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1493 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1492 return
1494 return
1493 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1495 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1494 if new:
1496 if new:
1495 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1497 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1496 op.records.add('obsmarkers', {'new': new})
1498 op.records.add('obsmarkers', {'new': new})
1497 if op.reply is not None:
1499 if op.reply is not None:
1498 rpart = op.reply.newpart('reply:obsmarkers')
1500 rpart = op.reply.newpart('reply:obsmarkers')
1499 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1501 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1500 rpart.addparam('new', '%i' % new, mandatory=False)
1502 rpart.addparam('new', '%i' % new, mandatory=False)
1501
1503
1502
1504
1503 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1505 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1504 def handleobsmarkerreply(op, inpart):
1506 def handleobsmarkerreply(op, inpart):
1505 """retrieve the result of a pushkey request"""
1507 """retrieve the result of a pushkey request"""
1506 ret = int(inpart.params['new'])
1508 ret = int(inpart.params['new'])
1507 partid = int(inpart.params['in-reply-to'])
1509 partid = int(inpart.params['in-reply-to'])
1508 op.records.add('obsmarkers', {'new': ret}, partid)
1510 op.records.add('obsmarkers', {'new': ret}, partid)
1509
1511
1510 @parthandler('hgtagsfnodes')
1512 @parthandler('hgtagsfnodes')
1511 def handlehgtagsfnodes(op, inpart):
1513 def handlehgtagsfnodes(op, inpart):
1512 """Applies .hgtags fnodes cache entries to the local repo.
1514 """Applies .hgtags fnodes cache entries to the local repo.
1513
1515
1514 Payload is pairs of 20 byte changeset nodes and filenodes.
1516 Payload is pairs of 20 byte changeset nodes and filenodes.
1515 """
1517 """
1516 # Grab the transaction so we ensure that we have the lock at this point.
1518 # Grab the transaction so we ensure that we have the lock at this point.
1517 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1519 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1518 op.gettransaction()
1520 op.gettransaction()
1519 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1521 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1520
1522
1521 count = 0
1523 count = 0
1522 while True:
1524 while True:
1523 node = inpart.read(20)
1525 node = inpart.read(20)
1524 fnode = inpart.read(20)
1526 fnode = inpart.read(20)
1525 if len(node) < 20 or len(fnode) < 20:
1527 if len(node) < 20 or len(fnode) < 20:
1526 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1528 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1527 break
1529 break
1528 cache.setfnode(node, fnode)
1530 cache.setfnode(node, fnode)
1529 count += 1
1531 count += 1
1530
1532
1531 cache.write()
1533 cache.write()
1532 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1534 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now