##// END OF EJS Templates
bundle2: make unbundle.compressed return True when compressed...
Pierre-Yves David -
r26802:42f705f2 default
parent child Browse files
Show More
@@ -1,1536 +1,1540
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155 import urllib
155 import urllib
156
156
157 from .i18n import _
157 from .i18n import _
158 from . import (
158 from . import (
159 changegroup,
159 changegroup,
160 error,
160 error,
161 obsolete,
161 obsolete,
162 pushkey,
162 pushkey,
163 tags,
163 tags,
164 url,
164 url,
165 util,
165 util,
166 )
166 )
167
167
168 _pack = struct.pack
168 _pack = struct.pack
169 _unpack = struct.unpack
169 _unpack = struct.unpack
170
170
171 _fstreamparamsize = '>i'
171 _fstreamparamsize = '>i'
172 _fpartheadersize = '>i'
172 _fpartheadersize = '>i'
173 _fparttypesize = '>B'
173 _fparttypesize = '>B'
174 _fpartid = '>I'
174 _fpartid = '>I'
175 _fpayloadsize = '>i'
175 _fpayloadsize = '>i'
176 _fpartparamcount = '>BB'
176 _fpartparamcount = '>BB'
177
177
178 preferedchunksize = 4096
178 preferedchunksize = 4096
179
179
180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
181
181
182 def outdebug(ui, message):
182 def outdebug(ui, message):
183 """debug regarding output stream (bundling)"""
183 """debug regarding output stream (bundling)"""
184 if ui.configbool('devel', 'bundle2.debug', False):
184 if ui.configbool('devel', 'bundle2.debug', False):
185 ui.debug('bundle2-output: %s\n' % message)
185 ui.debug('bundle2-output: %s\n' % message)
186
186
187 def indebug(ui, message):
187 def indebug(ui, message):
188 """debug on input stream (unbundling)"""
188 """debug on input stream (unbundling)"""
189 if ui.configbool('devel', 'bundle2.debug', False):
189 if ui.configbool('devel', 'bundle2.debug', False):
190 ui.debug('bundle2-input: %s\n' % message)
190 ui.debug('bundle2-input: %s\n' % message)
191
191
192 def validateparttype(parttype):
192 def validateparttype(parttype):
193 """raise ValueError if a parttype contains invalid character"""
193 """raise ValueError if a parttype contains invalid character"""
194 if _parttypeforbidden.search(parttype):
194 if _parttypeforbidden.search(parttype):
195 raise ValueError(parttype)
195 raise ValueError(parttype)
196
196
197 def _makefpartparamsizes(nbparams):
197 def _makefpartparamsizes(nbparams):
198 """return a struct format to read part parameter sizes
198 """return a struct format to read part parameter sizes
199
199
200 The number parameters is variable so we need to build that format
200 The number parameters is variable so we need to build that format
201 dynamically.
201 dynamically.
202 """
202 """
203 return '>'+('BB'*nbparams)
203 return '>'+('BB'*nbparams)
204
204
205 parthandlermapping = {}
205 parthandlermapping = {}
206
206
207 def parthandler(parttype, params=()):
207 def parthandler(parttype, params=()):
208 """decorator that register a function as a bundle2 part handler
208 """decorator that register a function as a bundle2 part handler
209
209
210 eg::
210 eg::
211
211
212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
213 def myparttypehandler(...):
213 def myparttypehandler(...):
214 '''process a part of type "my part".'''
214 '''process a part of type "my part".'''
215 ...
215 ...
216 """
216 """
217 validateparttype(parttype)
217 validateparttype(parttype)
218 def _decorator(func):
218 def _decorator(func):
219 lparttype = parttype.lower() # enforce lower case matching.
219 lparttype = parttype.lower() # enforce lower case matching.
220 assert lparttype not in parthandlermapping
220 assert lparttype not in parthandlermapping
221 parthandlermapping[lparttype] = func
221 parthandlermapping[lparttype] = func
222 func.params = frozenset(params)
222 func.params = frozenset(params)
223 return func
223 return func
224 return _decorator
224 return _decorator
225
225
226 class unbundlerecords(object):
226 class unbundlerecords(object):
227 """keep record of what happens during and unbundle
227 """keep record of what happens during and unbundle
228
228
229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
230 category of record and obj is an arbitrary object.
230 category of record and obj is an arbitrary object.
231
231
232 `records['cat']` will return all entries of this category 'cat'.
232 `records['cat']` will return all entries of this category 'cat'.
233
233
234 Iterating on the object itself will yield `('category', obj)` tuples
234 Iterating on the object itself will yield `('category', obj)` tuples
235 for all entries.
235 for all entries.
236
236
237 All iterations happens in chronological order.
237 All iterations happens in chronological order.
238 """
238 """
239
239
240 def __init__(self):
240 def __init__(self):
241 self._categories = {}
241 self._categories = {}
242 self._sequences = []
242 self._sequences = []
243 self._replies = {}
243 self._replies = {}
244
244
245 def add(self, category, entry, inreplyto=None):
245 def add(self, category, entry, inreplyto=None):
246 """add a new record of a given category.
246 """add a new record of a given category.
247
247
248 The entry can then be retrieved in the list returned by
248 The entry can then be retrieved in the list returned by
249 self['category']."""
249 self['category']."""
250 self._categories.setdefault(category, []).append(entry)
250 self._categories.setdefault(category, []).append(entry)
251 self._sequences.append((category, entry))
251 self._sequences.append((category, entry))
252 if inreplyto is not None:
252 if inreplyto is not None:
253 self.getreplies(inreplyto).add(category, entry)
253 self.getreplies(inreplyto).add(category, entry)
254
254
255 def getreplies(self, partid):
255 def getreplies(self, partid):
256 """get the records that are replies to a specific part"""
256 """get the records that are replies to a specific part"""
257 return self._replies.setdefault(partid, unbundlerecords())
257 return self._replies.setdefault(partid, unbundlerecords())
258
258
259 def __getitem__(self, cat):
259 def __getitem__(self, cat):
260 return tuple(self._categories.get(cat, ()))
260 return tuple(self._categories.get(cat, ()))
261
261
262 def __iter__(self):
262 def __iter__(self):
263 return iter(self._sequences)
263 return iter(self._sequences)
264
264
265 def __len__(self):
265 def __len__(self):
266 return len(self._sequences)
266 return len(self._sequences)
267
267
268 def __nonzero__(self):
268 def __nonzero__(self):
269 return bool(self._sequences)
269 return bool(self._sequences)
270
270
271 class bundleoperation(object):
271 class bundleoperation(object):
272 """an object that represents a single bundling process
272 """an object that represents a single bundling process
273
273
274 Its purpose is to carry unbundle-related objects and states.
274 Its purpose is to carry unbundle-related objects and states.
275
275
276 A new object should be created at the beginning of each bundle processing.
276 A new object should be created at the beginning of each bundle processing.
277 The object is to be returned by the processing function.
277 The object is to be returned by the processing function.
278
278
279 The object has very little content now it will ultimately contain:
279 The object has very little content now it will ultimately contain:
280 * an access to the repo the bundle is applied to,
280 * an access to the repo the bundle is applied to,
281 * a ui object,
281 * a ui object,
282 * a way to retrieve a transaction to add changes to the repo,
282 * a way to retrieve a transaction to add changes to the repo,
283 * a way to record the result of processing each part,
283 * a way to record the result of processing each part,
284 * a way to construct a bundle response when applicable.
284 * a way to construct a bundle response when applicable.
285 """
285 """
286
286
287 def __init__(self, repo, transactiongetter, captureoutput=True):
287 def __init__(self, repo, transactiongetter, captureoutput=True):
288 self.repo = repo
288 self.repo = repo
289 self.ui = repo.ui
289 self.ui = repo.ui
290 self.records = unbundlerecords()
290 self.records = unbundlerecords()
291 self.gettransaction = transactiongetter
291 self.gettransaction = transactiongetter
292 self.reply = None
292 self.reply = None
293 self.captureoutput = captureoutput
293 self.captureoutput = captureoutput
294
294
295 class TransactionUnavailable(RuntimeError):
295 class TransactionUnavailable(RuntimeError):
296 pass
296 pass
297
297
298 def _notransaction():
298 def _notransaction():
299 """default method to get a transaction while processing a bundle
299 """default method to get a transaction while processing a bundle
300
300
301 Raise an exception to highlight the fact that no transaction was expected
301 Raise an exception to highlight the fact that no transaction was expected
302 to be created"""
302 to be created"""
303 raise TransactionUnavailable()
303 raise TransactionUnavailable()
304
304
305 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
305 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
306 # transform me into unbundler.apply() as soon as the freeze is lifted
306 # transform me into unbundler.apply() as soon as the freeze is lifted
307 tr.hookargs['bundle2'] = '1'
307 tr.hookargs['bundle2'] = '1'
308 if source is not None and 'source' not in tr.hookargs:
308 if source is not None and 'source' not in tr.hookargs:
309 tr.hookargs['source'] = source
309 tr.hookargs['source'] = source
310 if url is not None and 'url' not in tr.hookargs:
310 if url is not None and 'url' not in tr.hookargs:
311 tr.hookargs['url'] = url
311 tr.hookargs['url'] = url
312 return processbundle(repo, unbundler, lambda: tr, op=op)
312 return processbundle(repo, unbundler, lambda: tr, op=op)
313
313
314 def processbundle(repo, unbundler, transactiongetter=None, op=None):
314 def processbundle(repo, unbundler, transactiongetter=None, op=None):
315 """This function process a bundle, apply effect to/from a repo
315 """This function process a bundle, apply effect to/from a repo
316
316
317 It iterates over each part then searches for and uses the proper handling
317 It iterates over each part then searches for and uses the proper handling
318 code to process the part. Parts are processed in order.
318 code to process the part. Parts are processed in order.
319
319
320 This is very early version of this function that will be strongly reworked
320 This is very early version of this function that will be strongly reworked
321 before final usage.
321 before final usage.
322
322
323 Unknown Mandatory part will abort the process.
323 Unknown Mandatory part will abort the process.
324
324
325 It is temporarily possible to provide a prebuilt bundleoperation to the
325 It is temporarily possible to provide a prebuilt bundleoperation to the
326 function. This is used to ensure output is properly propagated in case of
326 function. This is used to ensure output is properly propagated in case of
327 an error during the unbundling. This output capturing part will likely be
327 an error during the unbundling. This output capturing part will likely be
328 reworked and this ability will probably go away in the process.
328 reworked and this ability will probably go away in the process.
329 """
329 """
330 if op is None:
330 if op is None:
331 if transactiongetter is None:
331 if transactiongetter is None:
332 transactiongetter = _notransaction
332 transactiongetter = _notransaction
333 op = bundleoperation(repo, transactiongetter)
333 op = bundleoperation(repo, transactiongetter)
334 # todo:
334 # todo:
335 # - replace this is a init function soon.
335 # - replace this is a init function soon.
336 # - exception catching
336 # - exception catching
337 unbundler.params
337 unbundler.params
338 if repo.ui.debugflag:
338 if repo.ui.debugflag:
339 msg = ['bundle2-input-bundle:']
339 msg = ['bundle2-input-bundle:']
340 if unbundler.params:
340 if unbundler.params:
341 msg.append(' %i params')
341 msg.append(' %i params')
342 if op.gettransaction is None:
342 if op.gettransaction is None:
343 msg.append(' no-transaction')
343 msg.append(' no-transaction')
344 else:
344 else:
345 msg.append(' with-transaction')
345 msg.append(' with-transaction')
346 msg.append('\n')
346 msg.append('\n')
347 repo.ui.debug(''.join(msg))
347 repo.ui.debug(''.join(msg))
348 iterparts = enumerate(unbundler.iterparts())
348 iterparts = enumerate(unbundler.iterparts())
349 part = None
349 part = None
350 nbpart = 0
350 nbpart = 0
351 try:
351 try:
352 for nbpart, part in iterparts:
352 for nbpart, part in iterparts:
353 _processpart(op, part)
353 _processpart(op, part)
354 except BaseException as exc:
354 except BaseException as exc:
355 for nbpart, part in iterparts:
355 for nbpart, part in iterparts:
356 # consume the bundle content
356 # consume the bundle content
357 part.seek(0, 2)
357 part.seek(0, 2)
358 # Small hack to let caller code distinguish exceptions from bundle2
358 # Small hack to let caller code distinguish exceptions from bundle2
359 # processing from processing the old format. This is mostly
359 # processing from processing the old format. This is mostly
360 # needed to handle different return codes to unbundle according to the
360 # needed to handle different return codes to unbundle according to the
361 # type of bundle. We should probably clean up or drop this return code
361 # type of bundle. We should probably clean up or drop this return code
362 # craziness in a future version.
362 # craziness in a future version.
363 exc.duringunbundle2 = True
363 exc.duringunbundle2 = True
364 salvaged = []
364 salvaged = []
365 replycaps = None
365 replycaps = None
366 if op.reply is not None:
366 if op.reply is not None:
367 salvaged = op.reply.salvageoutput()
367 salvaged = op.reply.salvageoutput()
368 replycaps = op.reply.capabilities
368 replycaps = op.reply.capabilities
369 exc._replycaps = replycaps
369 exc._replycaps = replycaps
370 exc._bundle2salvagedoutput = salvaged
370 exc._bundle2salvagedoutput = salvaged
371 raise
371 raise
372 finally:
372 finally:
373 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
373 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
374
374
375 return op
375 return op
376
376
377 def _processpart(op, part):
377 def _processpart(op, part):
378 """process a single part from a bundle
378 """process a single part from a bundle
379
379
380 The part is guaranteed to have been fully consumed when the function exits
380 The part is guaranteed to have been fully consumed when the function exits
381 (even if an exception is raised)."""
381 (even if an exception is raised)."""
382 status = 'unknown' # used by debug output
382 status = 'unknown' # used by debug output
383 try:
383 try:
384 try:
384 try:
385 handler = parthandlermapping.get(part.type)
385 handler = parthandlermapping.get(part.type)
386 if handler is None:
386 if handler is None:
387 status = 'unsupported-type'
387 status = 'unsupported-type'
388 raise error.BundleUnknownFeatureError(parttype=part.type)
388 raise error.BundleUnknownFeatureError(parttype=part.type)
389 indebug(op.ui, 'found a handler for part %r' % part.type)
389 indebug(op.ui, 'found a handler for part %r' % part.type)
390 unknownparams = part.mandatorykeys - handler.params
390 unknownparams = part.mandatorykeys - handler.params
391 if unknownparams:
391 if unknownparams:
392 unknownparams = list(unknownparams)
392 unknownparams = list(unknownparams)
393 unknownparams.sort()
393 unknownparams.sort()
394 status = 'unsupported-params (%s)' % unknownparams
394 status = 'unsupported-params (%s)' % unknownparams
395 raise error.BundleUnknownFeatureError(parttype=part.type,
395 raise error.BundleUnknownFeatureError(parttype=part.type,
396 params=unknownparams)
396 params=unknownparams)
397 status = 'supported'
397 status = 'supported'
398 except error.BundleUnknownFeatureError as exc:
398 except error.BundleUnknownFeatureError as exc:
399 if part.mandatory: # mandatory parts
399 if part.mandatory: # mandatory parts
400 raise
400 raise
401 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
401 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
402 return # skip to part processing
402 return # skip to part processing
403 finally:
403 finally:
404 if op.ui.debugflag:
404 if op.ui.debugflag:
405 msg = ['bundle2-input-part: "%s"' % part.type]
405 msg = ['bundle2-input-part: "%s"' % part.type]
406 if not part.mandatory:
406 if not part.mandatory:
407 msg.append(' (advisory)')
407 msg.append(' (advisory)')
408 nbmp = len(part.mandatorykeys)
408 nbmp = len(part.mandatorykeys)
409 nbap = len(part.params) - nbmp
409 nbap = len(part.params) - nbmp
410 if nbmp or nbap:
410 if nbmp or nbap:
411 msg.append(' (params:')
411 msg.append(' (params:')
412 if nbmp:
412 if nbmp:
413 msg.append(' %i mandatory' % nbmp)
413 msg.append(' %i mandatory' % nbmp)
414 if nbap:
414 if nbap:
415 msg.append(' %i advisory' % nbmp)
415 msg.append(' %i advisory' % nbmp)
416 msg.append(')')
416 msg.append(')')
417 msg.append(' %s\n' % status)
417 msg.append(' %s\n' % status)
418 op.ui.debug(''.join(msg))
418 op.ui.debug(''.join(msg))
419
419
420 # handler is called outside the above try block so that we don't
420 # handler is called outside the above try block so that we don't
421 # risk catching KeyErrors from anything other than the
421 # risk catching KeyErrors from anything other than the
422 # parthandlermapping lookup (any KeyError raised by handler()
422 # parthandlermapping lookup (any KeyError raised by handler()
423 # itself represents a defect of a different variety).
423 # itself represents a defect of a different variety).
424 output = None
424 output = None
425 if op.captureoutput and op.reply is not None:
425 if op.captureoutput and op.reply is not None:
426 op.ui.pushbuffer(error=True, subproc=True)
426 op.ui.pushbuffer(error=True, subproc=True)
427 output = ''
427 output = ''
428 try:
428 try:
429 handler(op, part)
429 handler(op, part)
430 finally:
430 finally:
431 if output is not None:
431 if output is not None:
432 output = op.ui.popbuffer()
432 output = op.ui.popbuffer()
433 if output:
433 if output:
434 outpart = op.reply.newpart('output', data=output,
434 outpart = op.reply.newpart('output', data=output,
435 mandatory=False)
435 mandatory=False)
436 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
436 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
437 finally:
437 finally:
438 # consume the part content to not corrupt the stream.
438 # consume the part content to not corrupt the stream.
439 part.seek(0, 2)
439 part.seek(0, 2)
440
440
441
441
442 def decodecaps(blob):
442 def decodecaps(blob):
443 """decode a bundle2 caps bytes blob into a dictionary
443 """decode a bundle2 caps bytes blob into a dictionary
444
444
445 The blob is a list of capabilities (one per line)
445 The blob is a list of capabilities (one per line)
446 Capabilities may have values using a line of the form::
446 Capabilities may have values using a line of the form::
447
447
448 capability=value1,value2,value3
448 capability=value1,value2,value3
449
449
450 The values are always a list."""
450 The values are always a list."""
451 caps = {}
451 caps = {}
452 for line in blob.splitlines():
452 for line in blob.splitlines():
453 if not line:
453 if not line:
454 continue
454 continue
455 if '=' not in line:
455 if '=' not in line:
456 key, vals = line, ()
456 key, vals = line, ()
457 else:
457 else:
458 key, vals = line.split('=', 1)
458 key, vals = line.split('=', 1)
459 vals = vals.split(',')
459 vals = vals.split(',')
460 key = urllib.unquote(key)
460 key = urllib.unquote(key)
461 vals = [urllib.unquote(v) for v in vals]
461 vals = [urllib.unquote(v) for v in vals]
462 caps[key] = vals
462 caps[key] = vals
463 return caps
463 return caps
464
464
465 def encodecaps(caps):
465 def encodecaps(caps):
466 """encode a bundle2 caps dictionary into a bytes blob"""
466 """encode a bundle2 caps dictionary into a bytes blob"""
467 chunks = []
467 chunks = []
468 for ca in sorted(caps):
468 for ca in sorted(caps):
469 vals = caps[ca]
469 vals = caps[ca]
470 ca = urllib.quote(ca)
470 ca = urllib.quote(ca)
471 vals = [urllib.quote(v) for v in vals]
471 vals = [urllib.quote(v) for v in vals]
472 if vals:
472 if vals:
473 ca = "%s=%s" % (ca, ','.join(vals))
473 ca = "%s=%s" % (ca, ','.join(vals))
474 chunks.append(ca)
474 chunks.append(ca)
475 return '\n'.join(chunks)
475 return '\n'.join(chunks)
476
476
477 class bundle20(object):
477 class bundle20(object):
478 """represent an outgoing bundle2 container
478 """represent an outgoing bundle2 container
479
479
480 Use the `addparam` method to add stream level parameter. and `newpart` to
480 Use the `addparam` method to add stream level parameter. and `newpart` to
481 populate it. Then call `getchunks` to retrieve all the binary chunks of
481 populate it. Then call `getchunks` to retrieve all the binary chunks of
482 data that compose the bundle2 container."""
482 data that compose the bundle2 container."""
483
483
484 _magicstring = 'HG20'
484 _magicstring = 'HG20'
485
485
486 def __init__(self, ui, capabilities=()):
486 def __init__(self, ui, capabilities=()):
487 self.ui = ui
487 self.ui = ui
488 self._params = []
488 self._params = []
489 self._parts = []
489 self._parts = []
490 self.capabilities = dict(capabilities)
490 self.capabilities = dict(capabilities)
491 self._compressor = util.compressors[None]()
491 self._compressor = util.compressors[None]()
492
492
493 def setcompression(self, alg):
493 def setcompression(self, alg):
494 """setup core part compression to <alg>"""
494 """setup core part compression to <alg>"""
495 if alg is None:
495 if alg is None:
496 return
496 return
497 assert not any(n.lower() == 'Compression' for n, v in self._params)
497 assert not any(n.lower() == 'Compression' for n, v in self._params)
498 self.addparam('Compression', alg)
498 self.addparam('Compression', alg)
499 self._compressor = util.compressors[alg]()
499 self._compressor = util.compressors[alg]()
500
500
501 @property
501 @property
502 def nbparts(self):
502 def nbparts(self):
503 """total number of parts added to the bundler"""
503 """total number of parts added to the bundler"""
504 return len(self._parts)
504 return len(self._parts)
505
505
506 # methods used to defines the bundle2 content
506 # methods used to defines the bundle2 content
507 def addparam(self, name, value=None):
507 def addparam(self, name, value=None):
508 """add a stream level parameter"""
508 """add a stream level parameter"""
509 if not name:
509 if not name:
510 raise ValueError('empty parameter name')
510 raise ValueError('empty parameter name')
511 if name[0] not in string.letters:
511 if name[0] not in string.letters:
512 raise ValueError('non letter first character: %r' % name)
512 raise ValueError('non letter first character: %r' % name)
513 self._params.append((name, value))
513 self._params.append((name, value))
514
514
515 def addpart(self, part):
515 def addpart(self, part):
516 """add a new part to the bundle2 container
516 """add a new part to the bundle2 container
517
517
518 Parts contains the actual applicative payload."""
518 Parts contains the actual applicative payload."""
519 assert part.id is None
519 assert part.id is None
520 part.id = len(self._parts) # very cheap counter
520 part.id = len(self._parts) # very cheap counter
521 self._parts.append(part)
521 self._parts.append(part)
522
522
523 def newpart(self, typeid, *args, **kwargs):
523 def newpart(self, typeid, *args, **kwargs):
524 """create a new part and add it to the containers
524 """create a new part and add it to the containers
525
525
526 As the part is directly added to the containers. For now, this means
526 As the part is directly added to the containers. For now, this means
527 that any failure to properly initialize the part after calling
527 that any failure to properly initialize the part after calling
528 ``newpart`` should result in a failure of the whole bundling process.
528 ``newpart`` should result in a failure of the whole bundling process.
529
529
530 You can still fall back to manually create and add if you need better
530 You can still fall back to manually create and add if you need better
531 control."""
531 control."""
532 part = bundlepart(typeid, *args, **kwargs)
532 part = bundlepart(typeid, *args, **kwargs)
533 self.addpart(part)
533 self.addpart(part)
534 return part
534 return part
535
535
536 # methods used to generate the bundle2 stream
536 # methods used to generate the bundle2 stream
537 def getchunks(self):
537 def getchunks(self):
538 if self.ui.debugflag:
538 if self.ui.debugflag:
539 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
539 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
540 if self._params:
540 if self._params:
541 msg.append(' (%i params)' % len(self._params))
541 msg.append(' (%i params)' % len(self._params))
542 msg.append(' %i parts total\n' % len(self._parts))
542 msg.append(' %i parts total\n' % len(self._parts))
543 self.ui.debug(''.join(msg))
543 self.ui.debug(''.join(msg))
544 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
544 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
545 yield self._magicstring
545 yield self._magicstring
546 param = self._paramchunk()
546 param = self._paramchunk()
547 outdebug(self.ui, 'bundle parameter: %s' % param)
547 outdebug(self.ui, 'bundle parameter: %s' % param)
548 yield _pack(_fstreamparamsize, len(param))
548 yield _pack(_fstreamparamsize, len(param))
549 if param:
549 if param:
550 yield param
550 yield param
551 # starting compression
551 # starting compression
552 for chunk in self._getcorechunk():
552 for chunk in self._getcorechunk():
553 yield self._compressor.compress(chunk)
553 yield self._compressor.compress(chunk)
554 yield self._compressor.flush()
554 yield self._compressor.flush()
555
555
556 def _paramchunk(self):
556 def _paramchunk(self):
557 """return a encoded version of all stream parameters"""
557 """return a encoded version of all stream parameters"""
558 blocks = []
558 blocks = []
559 for par, value in self._params:
559 for par, value in self._params:
560 par = urllib.quote(par)
560 par = urllib.quote(par)
561 if value is not None:
561 if value is not None:
562 value = urllib.quote(value)
562 value = urllib.quote(value)
563 par = '%s=%s' % (par, value)
563 par = '%s=%s' % (par, value)
564 blocks.append(par)
564 blocks.append(par)
565 return ' '.join(blocks)
565 return ' '.join(blocks)
566
566
567 def _getcorechunk(self):
567 def _getcorechunk(self):
568 """yield chunk for the core part of the bundle
568 """yield chunk for the core part of the bundle
569
569
570 (all but headers and parameters)"""
570 (all but headers and parameters)"""
571 outdebug(self.ui, 'start of parts')
571 outdebug(self.ui, 'start of parts')
572 for part in self._parts:
572 for part in self._parts:
573 outdebug(self.ui, 'bundle part: "%s"' % part.type)
573 outdebug(self.ui, 'bundle part: "%s"' % part.type)
574 for chunk in part.getchunks(ui=self.ui):
574 for chunk in part.getchunks(ui=self.ui):
575 yield chunk
575 yield chunk
576 outdebug(self.ui, 'end of bundle')
576 outdebug(self.ui, 'end of bundle')
577 yield _pack(_fpartheadersize, 0)
577 yield _pack(_fpartheadersize, 0)
578
578
579
579
580 def salvageoutput(self):
580 def salvageoutput(self):
581 """return a list with a copy of all output parts in the bundle
581 """return a list with a copy of all output parts in the bundle
582
582
583 This is meant to be used during error handling to make sure we preserve
583 This is meant to be used during error handling to make sure we preserve
584 server output"""
584 server output"""
585 salvaged = []
585 salvaged = []
586 for part in self._parts:
586 for part in self._parts:
587 if part.type.startswith('output'):
587 if part.type.startswith('output'):
588 salvaged.append(part.copy())
588 salvaged.append(part.copy())
589 return salvaged
589 return salvaged
590
590
591
591
592 class unpackermixin(object):
592 class unpackermixin(object):
593 """A mixin to extract bytes and struct data from a stream"""
593 """A mixin to extract bytes and struct data from a stream"""
594
594
595 def __init__(self, fp):
595 def __init__(self, fp):
596 self._fp = fp
596 self._fp = fp
597 self._seekable = (util.safehasattr(fp, 'seek') and
597 self._seekable = (util.safehasattr(fp, 'seek') and
598 util.safehasattr(fp, 'tell'))
598 util.safehasattr(fp, 'tell'))
599
599
600 def _unpack(self, format):
600 def _unpack(self, format):
601 """unpack this struct format from the stream"""
601 """unpack this struct format from the stream"""
602 data = self._readexact(struct.calcsize(format))
602 data = self._readexact(struct.calcsize(format))
603 return _unpack(format, data)
603 return _unpack(format, data)
604
604
605 def _readexact(self, size):
605 def _readexact(self, size):
606 """read exactly <size> bytes from the stream"""
606 """read exactly <size> bytes from the stream"""
607 return changegroup.readexactly(self._fp, size)
607 return changegroup.readexactly(self._fp, size)
608
608
609 def seek(self, offset, whence=0):
609 def seek(self, offset, whence=0):
610 """move the underlying file pointer"""
610 """move the underlying file pointer"""
611 if self._seekable:
611 if self._seekable:
612 return self._fp.seek(offset, whence)
612 return self._fp.seek(offset, whence)
613 else:
613 else:
614 raise NotImplementedError(_('File pointer is not seekable'))
614 raise NotImplementedError(_('File pointer is not seekable'))
615
615
616 def tell(self):
616 def tell(self):
617 """return the file offset, or None if file is not seekable"""
617 """return the file offset, or None if file is not seekable"""
618 if self._seekable:
618 if self._seekable:
619 try:
619 try:
620 return self._fp.tell()
620 return self._fp.tell()
621 except IOError as e:
621 except IOError as e:
622 if e.errno == errno.ESPIPE:
622 if e.errno == errno.ESPIPE:
623 self._seekable = False
623 self._seekable = False
624 else:
624 else:
625 raise
625 raise
626 return None
626 return None
627
627
628 def close(self):
628 def close(self):
629 """close underlying file"""
629 """close underlying file"""
630 if util.safehasattr(self._fp, 'close'):
630 if util.safehasattr(self._fp, 'close'):
631 return self._fp.close()
631 return self._fp.close()
632
632
633 def getunbundler(ui, fp, magicstring=None):
633 def getunbundler(ui, fp, magicstring=None):
634 """return a valid unbundler object for a given magicstring"""
634 """return a valid unbundler object for a given magicstring"""
635 if magicstring is None:
635 if magicstring is None:
636 magicstring = changegroup.readexactly(fp, 4)
636 magicstring = changegroup.readexactly(fp, 4)
637 magic, version = magicstring[0:2], magicstring[2:4]
637 magic, version = magicstring[0:2], magicstring[2:4]
638 if magic != 'HG':
638 if magic != 'HG':
639 raise error.Abort(_('not a Mercurial bundle'))
639 raise error.Abort(_('not a Mercurial bundle'))
640 unbundlerclass = formatmap.get(version)
640 unbundlerclass = formatmap.get(version)
641 if unbundlerclass is None:
641 if unbundlerclass is None:
642 raise error.Abort(_('unknown bundle version %s') % version)
642 raise error.Abort(_('unknown bundle version %s') % version)
643 unbundler = unbundlerclass(ui, fp)
643 unbundler = unbundlerclass(ui, fp)
644 indebug(ui, 'start processing of %s stream' % magicstring)
644 indebug(ui, 'start processing of %s stream' % magicstring)
645 return unbundler
645 return unbundler
646
646
647 class unbundle20(unpackermixin):
647 class unbundle20(unpackermixin):
648 """interpret a bundle2 stream
648 """interpret a bundle2 stream
649
649
650 This class is fed with a binary stream and yields parts through its
650 This class is fed with a binary stream and yields parts through its
651 `iterparts` methods."""
651 `iterparts` methods."""
652
652
653 _magicstring = 'HG20'
653 _magicstring = 'HG20'
654
654
655 def __init__(self, ui, fp):
655 def __init__(self, ui, fp):
656 """If header is specified, we do not read it out of the stream."""
656 """If header is specified, we do not read it out of the stream."""
657 self.ui = ui
657 self.ui = ui
658 self._decompressor = util.decompressors[None]
658 self._decompressor = util.decompressors[None]
659 self._compressed = None
659 super(unbundle20, self).__init__(fp)
660 super(unbundle20, self).__init__(fp)
660
661
661 @util.propertycache
662 @util.propertycache
662 def params(self):
663 def params(self):
663 """dictionary of stream level parameters"""
664 """dictionary of stream level parameters"""
664 indebug(self.ui, 'reading bundle2 stream parameters')
665 indebug(self.ui, 'reading bundle2 stream parameters')
665 params = {}
666 params = {}
666 paramssize = self._unpack(_fstreamparamsize)[0]
667 paramssize = self._unpack(_fstreamparamsize)[0]
667 if paramssize < 0:
668 if paramssize < 0:
668 raise error.BundleValueError('negative bundle param size: %i'
669 raise error.BundleValueError('negative bundle param size: %i'
669 % paramssize)
670 % paramssize)
670 if paramssize:
671 if paramssize:
671 params = self._readexact(paramssize)
672 params = self._readexact(paramssize)
672 params = self._processallparams(params)
673 params = self._processallparams(params)
673 return params
674 return params
674
675
675 def _processallparams(self, paramsblock):
676 def _processallparams(self, paramsblock):
676 """"""
677 """"""
677 params = {}
678 params = {}
678 for p in paramsblock.split(' '):
679 for p in paramsblock.split(' '):
679 p = p.split('=', 1)
680 p = p.split('=', 1)
680 p = [urllib.unquote(i) for i in p]
681 p = [urllib.unquote(i) for i in p]
681 if len(p) < 2:
682 if len(p) < 2:
682 p.append(None)
683 p.append(None)
683 self._processparam(*p)
684 self._processparam(*p)
684 params[p[0]] = p[1]
685 params[p[0]] = p[1]
685 return params
686 return params
686
687
687
688
688 def _processparam(self, name, value):
689 def _processparam(self, name, value):
689 """process a parameter, applying its effect if needed
690 """process a parameter, applying its effect if needed
690
691
691 Parameter starting with a lower case letter are advisory and will be
692 Parameter starting with a lower case letter are advisory and will be
692 ignored when unknown. Those starting with an upper case letter are
693 ignored when unknown. Those starting with an upper case letter are
693 mandatory and will this function will raise a KeyError when unknown.
694 mandatory and will this function will raise a KeyError when unknown.
694
695
695 Note: no option are currently supported. Any input will be either
696 Note: no option are currently supported. Any input will be either
696 ignored or failing.
697 ignored or failing.
697 """
698 """
698 if not name:
699 if not name:
699 raise ValueError('empty parameter name')
700 raise ValueError('empty parameter name')
700 if name[0] not in string.letters:
701 if name[0] not in string.letters:
701 raise ValueError('non letter first character: %r' % name)
702 raise ValueError('non letter first character: %r' % name)
702 try:
703 try:
703 handler = b2streamparamsmap[name.lower()]
704 handler = b2streamparamsmap[name.lower()]
704 except KeyError:
705 except KeyError:
705 if name[0].islower():
706 if name[0].islower():
706 indebug(self.ui, "ignoring unknown parameter %r" % name)
707 indebug(self.ui, "ignoring unknown parameter %r" % name)
707 else:
708 else:
708 raise error.BundleUnknownFeatureError(params=(name,))
709 raise error.BundleUnknownFeatureError(params=(name,))
709 else:
710 else:
710 handler(self, name, value)
711 handler(self, name, value)
711
712
712 def _forwardchunks(self):
713 def _forwardchunks(self):
713 """utility to transfer a bundle2 as binary
714 """utility to transfer a bundle2 as binary
714
715
715 This is made necessary by the fact the 'getbundle' command over 'ssh'
716 This is made necessary by the fact the 'getbundle' command over 'ssh'
716 have no way to know then the reply end, relying on the bundle to be
717 have no way to know then the reply end, relying on the bundle to be
717 interpreted to know its end. This is terrible and we are sorry, but we
718 interpreted to know its end. This is terrible and we are sorry, but we
718 needed to move forward to get general delta enabled.
719 needed to move forward to get general delta enabled.
719 """
720 """
720 yield self._magicstring
721 yield self._magicstring
721 assert 'params' not in vars(self)
722 assert 'params' not in vars(self)
722 paramssize = self._unpack(_fstreamparamsize)[0]
723 paramssize = self._unpack(_fstreamparamsize)[0]
723 if paramssize < 0:
724 if paramssize < 0:
724 raise error.BundleValueError('negative bundle param size: %i'
725 raise error.BundleValueError('negative bundle param size: %i'
725 % paramssize)
726 % paramssize)
726 yield _pack(_fstreamparamsize, paramssize)
727 yield _pack(_fstreamparamsize, paramssize)
727 if paramssize:
728 if paramssize:
728 params = self._readexact(paramssize)
729 params = self._readexact(paramssize)
729 self._processallparams(params)
730 self._processallparams(params)
730 yield params
731 yield params
731 assert self._decompressor is util.decompressors[None]
732 assert self._decompressor is util.decompressors[None]
732 # From there, payload might need to be decompressed
733 # From there, payload might need to be decompressed
733 self._fp = self._decompressor(self._fp)
734 self._fp = self._decompressor(self._fp)
734 emptycount = 0
735 emptycount = 0
735 while emptycount < 2:
736 while emptycount < 2:
736 # so we can brainlessly loop
737 # so we can brainlessly loop
737 assert _fpartheadersize == _fpayloadsize
738 assert _fpartheadersize == _fpayloadsize
738 size = self._unpack(_fpartheadersize)[0]
739 size = self._unpack(_fpartheadersize)[0]
739 yield _pack(_fpartheadersize, size)
740 yield _pack(_fpartheadersize, size)
740 if size:
741 if size:
741 emptycount = 0
742 emptycount = 0
742 else:
743 else:
743 emptycount += 1
744 emptycount += 1
744 continue
745 continue
745 if size == flaginterrupt:
746 if size == flaginterrupt:
746 continue
747 continue
747 elif size < 0:
748 elif size < 0:
748 raise error.BundleValueError('negative chunk size: %i')
749 raise error.BundleValueError('negative chunk size: %i')
749 yield self._readexact(size)
750 yield self._readexact(size)
750
751
751
752
752 def iterparts(self):
753 def iterparts(self):
753 """yield all parts contained in the stream"""
754 """yield all parts contained in the stream"""
754 # make sure param have been loaded
755 # make sure param have been loaded
755 self.params
756 self.params
756 # From there, payload need to be decompressed
757 # From there, payload need to be decompressed
757 self._fp = self._decompressor(self._fp)
758 self._fp = self._decompressor(self._fp)
758 indebug(self.ui, 'start extraction of bundle2 parts')
759 indebug(self.ui, 'start extraction of bundle2 parts')
759 headerblock = self._readpartheader()
760 headerblock = self._readpartheader()
760 while headerblock is not None:
761 while headerblock is not None:
761 part = unbundlepart(self.ui, headerblock, self._fp)
762 part = unbundlepart(self.ui, headerblock, self._fp)
762 yield part
763 yield part
763 part.seek(0, 2)
764 part.seek(0, 2)
764 headerblock = self._readpartheader()
765 headerblock = self._readpartheader()
765 indebug(self.ui, 'end of bundle2 stream')
766 indebug(self.ui, 'end of bundle2 stream')
766
767
767 def _readpartheader(self):
768 def _readpartheader(self):
768 """reads a part header size and return the bytes blob
769 """reads a part header size and return the bytes blob
769
770
770 returns None if empty"""
771 returns None if empty"""
771 headersize = self._unpack(_fpartheadersize)[0]
772 headersize = self._unpack(_fpartheadersize)[0]
772 if headersize < 0:
773 if headersize < 0:
773 raise error.BundleValueError('negative part header size: %i'
774 raise error.BundleValueError('negative part header size: %i'
774 % headersize)
775 % headersize)
775 indebug(self.ui, 'part header size: %i' % headersize)
776 indebug(self.ui, 'part header size: %i' % headersize)
776 if headersize:
777 if headersize:
777 return self._readexact(headersize)
778 return self._readexact(headersize)
778 return None
779 return None
779
780
780 def compressed(self):
781 def compressed(self):
781 return False
782 self.params # load params
783 return self._compressed
782
784
783 formatmap = {'20': unbundle20}
785 formatmap = {'20': unbundle20}
784
786
785 b2streamparamsmap = {}
787 b2streamparamsmap = {}
786
788
787 def b2streamparamhandler(name):
789 def b2streamparamhandler(name):
788 """register a handler for a stream level parameter"""
790 """register a handler for a stream level parameter"""
789 def decorator(func):
791 def decorator(func):
790 assert name not in formatmap
792 assert name not in formatmap
791 b2streamparamsmap[name] = func
793 b2streamparamsmap[name] = func
792 return func
794 return func
793 return decorator
795 return decorator
794
796
795 @b2streamparamhandler('compression')
797 @b2streamparamhandler('compression')
796 def processcompression(unbundler, param, value):
798 def processcompression(unbundler, param, value):
797 """read compression parameter and install payload decompression"""
799 """read compression parameter and install payload decompression"""
798 if value not in util.decompressors:
800 if value not in util.decompressors:
799 raise error.BundleUnknownFeatureError(params=(param,),
801 raise error.BundleUnknownFeatureError(params=(param,),
800 values=(value,))
802 values=(value,))
801 unbundler._decompressor = util.decompressors[value]
803 unbundler._decompressor = util.decompressors[value]
804 if value is not None:
805 unbundler._compressed = True
802
806
803 class bundlepart(object):
807 class bundlepart(object):
804 """A bundle2 part contains application level payload
808 """A bundle2 part contains application level payload
805
809
806 The part `type` is used to route the part to the application level
810 The part `type` is used to route the part to the application level
807 handler.
811 handler.
808
812
809 The part payload is contained in ``part.data``. It could be raw bytes or a
813 The part payload is contained in ``part.data``. It could be raw bytes or a
810 generator of byte chunks.
814 generator of byte chunks.
811
815
812 You can add parameters to the part using the ``addparam`` method.
816 You can add parameters to the part using the ``addparam`` method.
813 Parameters can be either mandatory (default) or advisory. Remote side
817 Parameters can be either mandatory (default) or advisory. Remote side
814 should be able to safely ignore the advisory ones.
818 should be able to safely ignore the advisory ones.
815
819
816 Both data and parameters cannot be modified after the generation has begun.
820 Both data and parameters cannot be modified after the generation has begun.
817 """
821 """
818
822
819 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
823 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
820 data='', mandatory=True):
824 data='', mandatory=True):
821 validateparttype(parttype)
825 validateparttype(parttype)
822 self.id = None
826 self.id = None
823 self.type = parttype
827 self.type = parttype
824 self._data = data
828 self._data = data
825 self._mandatoryparams = list(mandatoryparams)
829 self._mandatoryparams = list(mandatoryparams)
826 self._advisoryparams = list(advisoryparams)
830 self._advisoryparams = list(advisoryparams)
827 # checking for duplicated entries
831 # checking for duplicated entries
828 self._seenparams = set()
832 self._seenparams = set()
829 for pname, __ in self._mandatoryparams + self._advisoryparams:
833 for pname, __ in self._mandatoryparams + self._advisoryparams:
830 if pname in self._seenparams:
834 if pname in self._seenparams:
831 raise RuntimeError('duplicated params: %s' % pname)
835 raise RuntimeError('duplicated params: %s' % pname)
832 self._seenparams.add(pname)
836 self._seenparams.add(pname)
833 # status of the part's generation:
837 # status of the part's generation:
834 # - None: not started,
838 # - None: not started,
835 # - False: currently generated,
839 # - False: currently generated,
836 # - True: generation done.
840 # - True: generation done.
837 self._generated = None
841 self._generated = None
838 self.mandatory = mandatory
842 self.mandatory = mandatory
839
843
840 def copy(self):
844 def copy(self):
841 """return a copy of the part
845 """return a copy of the part
842
846
843 The new part have the very same content but no partid assigned yet.
847 The new part have the very same content but no partid assigned yet.
844 Parts with generated data cannot be copied."""
848 Parts with generated data cannot be copied."""
845 assert not util.safehasattr(self.data, 'next')
849 assert not util.safehasattr(self.data, 'next')
846 return self.__class__(self.type, self._mandatoryparams,
850 return self.__class__(self.type, self._mandatoryparams,
847 self._advisoryparams, self._data, self.mandatory)
851 self._advisoryparams, self._data, self.mandatory)
848
852
849 # methods used to defines the part content
853 # methods used to defines the part content
850 def __setdata(self, data):
854 def __setdata(self, data):
851 if self._generated is not None:
855 if self._generated is not None:
852 raise error.ReadOnlyPartError('part is being generated')
856 raise error.ReadOnlyPartError('part is being generated')
853 self._data = data
857 self._data = data
854 def __getdata(self):
858 def __getdata(self):
855 return self._data
859 return self._data
856 data = property(__getdata, __setdata)
860 data = property(__getdata, __setdata)
857
861
858 @property
862 @property
859 def mandatoryparams(self):
863 def mandatoryparams(self):
860 # make it an immutable tuple to force people through ``addparam``
864 # make it an immutable tuple to force people through ``addparam``
861 return tuple(self._mandatoryparams)
865 return tuple(self._mandatoryparams)
862
866
863 @property
867 @property
864 def advisoryparams(self):
868 def advisoryparams(self):
865 # make it an immutable tuple to force people through ``addparam``
869 # make it an immutable tuple to force people through ``addparam``
866 return tuple(self._advisoryparams)
870 return tuple(self._advisoryparams)
867
871
868 def addparam(self, name, value='', mandatory=True):
872 def addparam(self, name, value='', mandatory=True):
869 if self._generated is not None:
873 if self._generated is not None:
870 raise error.ReadOnlyPartError('part is being generated')
874 raise error.ReadOnlyPartError('part is being generated')
871 if name in self._seenparams:
875 if name in self._seenparams:
872 raise ValueError('duplicated params: %s' % name)
876 raise ValueError('duplicated params: %s' % name)
873 self._seenparams.add(name)
877 self._seenparams.add(name)
874 params = self._advisoryparams
878 params = self._advisoryparams
875 if mandatory:
879 if mandatory:
876 params = self._mandatoryparams
880 params = self._mandatoryparams
877 params.append((name, value))
881 params.append((name, value))
878
882
879 # methods used to generates the bundle2 stream
883 # methods used to generates the bundle2 stream
880 def getchunks(self, ui):
884 def getchunks(self, ui):
881 if self._generated is not None:
885 if self._generated is not None:
882 raise RuntimeError('part can only be consumed once')
886 raise RuntimeError('part can only be consumed once')
883 self._generated = False
887 self._generated = False
884
888
885 if ui.debugflag:
889 if ui.debugflag:
886 msg = ['bundle2-output-part: "%s"' % self.type]
890 msg = ['bundle2-output-part: "%s"' % self.type]
887 if not self.mandatory:
891 if not self.mandatory:
888 msg.append(' (advisory)')
892 msg.append(' (advisory)')
889 nbmp = len(self.mandatoryparams)
893 nbmp = len(self.mandatoryparams)
890 nbap = len(self.advisoryparams)
894 nbap = len(self.advisoryparams)
891 if nbmp or nbap:
895 if nbmp or nbap:
892 msg.append(' (params:')
896 msg.append(' (params:')
893 if nbmp:
897 if nbmp:
894 msg.append(' %i mandatory' % nbmp)
898 msg.append(' %i mandatory' % nbmp)
895 if nbap:
899 if nbap:
896 msg.append(' %i advisory' % nbmp)
900 msg.append(' %i advisory' % nbmp)
897 msg.append(')')
901 msg.append(')')
898 if not self.data:
902 if not self.data:
899 msg.append(' empty payload')
903 msg.append(' empty payload')
900 elif util.safehasattr(self.data, 'next'):
904 elif util.safehasattr(self.data, 'next'):
901 msg.append(' streamed payload')
905 msg.append(' streamed payload')
902 else:
906 else:
903 msg.append(' %i bytes payload' % len(self.data))
907 msg.append(' %i bytes payload' % len(self.data))
904 msg.append('\n')
908 msg.append('\n')
905 ui.debug(''.join(msg))
909 ui.debug(''.join(msg))
906
910
907 #### header
911 #### header
908 if self.mandatory:
912 if self.mandatory:
909 parttype = self.type.upper()
913 parttype = self.type.upper()
910 else:
914 else:
911 parttype = self.type.lower()
915 parttype = self.type.lower()
912 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
916 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
913 ## parttype
917 ## parttype
914 header = [_pack(_fparttypesize, len(parttype)),
918 header = [_pack(_fparttypesize, len(parttype)),
915 parttype, _pack(_fpartid, self.id),
919 parttype, _pack(_fpartid, self.id),
916 ]
920 ]
917 ## parameters
921 ## parameters
918 # count
922 # count
919 manpar = self.mandatoryparams
923 manpar = self.mandatoryparams
920 advpar = self.advisoryparams
924 advpar = self.advisoryparams
921 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
925 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
922 # size
926 # size
923 parsizes = []
927 parsizes = []
924 for key, value in manpar:
928 for key, value in manpar:
925 parsizes.append(len(key))
929 parsizes.append(len(key))
926 parsizes.append(len(value))
930 parsizes.append(len(value))
927 for key, value in advpar:
931 for key, value in advpar:
928 parsizes.append(len(key))
932 parsizes.append(len(key))
929 parsizes.append(len(value))
933 parsizes.append(len(value))
930 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
934 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
931 header.append(paramsizes)
935 header.append(paramsizes)
932 # key, value
936 # key, value
933 for key, value in manpar:
937 for key, value in manpar:
934 header.append(key)
938 header.append(key)
935 header.append(value)
939 header.append(value)
936 for key, value in advpar:
940 for key, value in advpar:
937 header.append(key)
941 header.append(key)
938 header.append(value)
942 header.append(value)
939 ## finalize header
943 ## finalize header
940 headerchunk = ''.join(header)
944 headerchunk = ''.join(header)
941 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
945 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
942 yield _pack(_fpartheadersize, len(headerchunk))
946 yield _pack(_fpartheadersize, len(headerchunk))
943 yield headerchunk
947 yield headerchunk
944 ## payload
948 ## payload
945 try:
949 try:
946 for chunk in self._payloadchunks():
950 for chunk in self._payloadchunks():
947 outdebug(ui, 'payload chunk size: %i' % len(chunk))
951 outdebug(ui, 'payload chunk size: %i' % len(chunk))
948 yield _pack(_fpayloadsize, len(chunk))
952 yield _pack(_fpayloadsize, len(chunk))
949 yield chunk
953 yield chunk
950 except GeneratorExit:
954 except GeneratorExit:
951 # GeneratorExit means that nobody is listening for our
955 # GeneratorExit means that nobody is listening for our
952 # results anyway, so just bail quickly rather than trying
956 # results anyway, so just bail quickly rather than trying
953 # to produce an error part.
957 # to produce an error part.
954 ui.debug('bundle2-generatorexit\n')
958 ui.debug('bundle2-generatorexit\n')
955 raise
959 raise
956 except BaseException as exc:
960 except BaseException as exc:
957 # backup exception data for later
961 # backup exception data for later
958 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
962 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
959 % exc)
963 % exc)
960 exc_info = sys.exc_info()
964 exc_info = sys.exc_info()
961 msg = 'unexpected error: %s' % exc
965 msg = 'unexpected error: %s' % exc
962 interpart = bundlepart('error:abort', [('message', msg)],
966 interpart = bundlepart('error:abort', [('message', msg)],
963 mandatory=False)
967 mandatory=False)
964 interpart.id = 0
968 interpart.id = 0
965 yield _pack(_fpayloadsize, -1)
969 yield _pack(_fpayloadsize, -1)
966 for chunk in interpart.getchunks(ui=ui):
970 for chunk in interpart.getchunks(ui=ui):
967 yield chunk
971 yield chunk
968 outdebug(ui, 'closing payload chunk')
972 outdebug(ui, 'closing payload chunk')
969 # abort current part payload
973 # abort current part payload
970 yield _pack(_fpayloadsize, 0)
974 yield _pack(_fpayloadsize, 0)
971 raise exc_info[0], exc_info[1], exc_info[2]
975 raise exc_info[0], exc_info[1], exc_info[2]
972 # end of payload
976 # end of payload
973 outdebug(ui, 'closing payload chunk')
977 outdebug(ui, 'closing payload chunk')
974 yield _pack(_fpayloadsize, 0)
978 yield _pack(_fpayloadsize, 0)
975 self._generated = True
979 self._generated = True
976
980
977 def _payloadchunks(self):
981 def _payloadchunks(self):
978 """yield chunks of a the part payload
982 """yield chunks of a the part payload
979
983
980 Exists to handle the different methods to provide data to a part."""
984 Exists to handle the different methods to provide data to a part."""
981 # we only support fixed size data now.
985 # we only support fixed size data now.
982 # This will be improved in the future.
986 # This will be improved in the future.
983 if util.safehasattr(self.data, 'next'):
987 if util.safehasattr(self.data, 'next'):
984 buff = util.chunkbuffer(self.data)
988 buff = util.chunkbuffer(self.data)
985 chunk = buff.read(preferedchunksize)
989 chunk = buff.read(preferedchunksize)
986 while chunk:
990 while chunk:
987 yield chunk
991 yield chunk
988 chunk = buff.read(preferedchunksize)
992 chunk = buff.read(preferedchunksize)
989 elif len(self.data):
993 elif len(self.data):
990 yield self.data
994 yield self.data
991
995
992
996
993 flaginterrupt = -1
997 flaginterrupt = -1
994
998
995 class interrupthandler(unpackermixin):
999 class interrupthandler(unpackermixin):
996 """read one part and process it with restricted capability
1000 """read one part and process it with restricted capability
997
1001
998 This allows to transmit exception raised on the producer size during part
1002 This allows to transmit exception raised on the producer size during part
999 iteration while the consumer is reading a part.
1003 iteration while the consumer is reading a part.
1000
1004
1001 Part processed in this manner only have access to a ui object,"""
1005 Part processed in this manner only have access to a ui object,"""
1002
1006
1003 def __init__(self, ui, fp):
1007 def __init__(self, ui, fp):
1004 super(interrupthandler, self).__init__(fp)
1008 super(interrupthandler, self).__init__(fp)
1005 self.ui = ui
1009 self.ui = ui
1006
1010
1007 def _readpartheader(self):
1011 def _readpartheader(self):
1008 """reads a part header size and return the bytes blob
1012 """reads a part header size and return the bytes blob
1009
1013
1010 returns None if empty"""
1014 returns None if empty"""
1011 headersize = self._unpack(_fpartheadersize)[0]
1015 headersize = self._unpack(_fpartheadersize)[0]
1012 if headersize < 0:
1016 if headersize < 0:
1013 raise error.BundleValueError('negative part header size: %i'
1017 raise error.BundleValueError('negative part header size: %i'
1014 % headersize)
1018 % headersize)
1015 indebug(self.ui, 'part header size: %i\n' % headersize)
1019 indebug(self.ui, 'part header size: %i\n' % headersize)
1016 if headersize:
1020 if headersize:
1017 return self._readexact(headersize)
1021 return self._readexact(headersize)
1018 return None
1022 return None
1019
1023
1020 def __call__(self):
1024 def __call__(self):
1021
1025
1022 self.ui.debug('bundle2-input-stream-interrupt:'
1026 self.ui.debug('bundle2-input-stream-interrupt:'
1023 ' opening out of band context\n')
1027 ' opening out of band context\n')
1024 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1028 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1025 headerblock = self._readpartheader()
1029 headerblock = self._readpartheader()
1026 if headerblock is None:
1030 if headerblock is None:
1027 indebug(self.ui, 'no part found during interruption.')
1031 indebug(self.ui, 'no part found during interruption.')
1028 return
1032 return
1029 part = unbundlepart(self.ui, headerblock, self._fp)
1033 part = unbundlepart(self.ui, headerblock, self._fp)
1030 op = interruptoperation(self.ui)
1034 op = interruptoperation(self.ui)
1031 _processpart(op, part)
1035 _processpart(op, part)
1032 self.ui.debug('bundle2-input-stream-interrupt:'
1036 self.ui.debug('bundle2-input-stream-interrupt:'
1033 ' closing out of band context\n')
1037 ' closing out of band context\n')
1034
1038
1035 class interruptoperation(object):
1039 class interruptoperation(object):
1036 """A limited operation to be use by part handler during interruption
1040 """A limited operation to be use by part handler during interruption
1037
1041
1038 It only have access to an ui object.
1042 It only have access to an ui object.
1039 """
1043 """
1040
1044
1041 def __init__(self, ui):
1045 def __init__(self, ui):
1042 self.ui = ui
1046 self.ui = ui
1043 self.reply = None
1047 self.reply = None
1044 self.captureoutput = False
1048 self.captureoutput = False
1045
1049
1046 @property
1050 @property
1047 def repo(self):
1051 def repo(self):
1048 raise RuntimeError('no repo access from stream interruption')
1052 raise RuntimeError('no repo access from stream interruption')
1049
1053
1050 def gettransaction(self):
1054 def gettransaction(self):
1051 raise TransactionUnavailable('no repo access from stream interruption')
1055 raise TransactionUnavailable('no repo access from stream interruption')
1052
1056
1053 class unbundlepart(unpackermixin):
1057 class unbundlepart(unpackermixin):
1054 """a bundle part read from a bundle"""
1058 """a bundle part read from a bundle"""
1055
1059
1056 def __init__(self, ui, header, fp):
1060 def __init__(self, ui, header, fp):
1057 super(unbundlepart, self).__init__(fp)
1061 super(unbundlepart, self).__init__(fp)
1058 self.ui = ui
1062 self.ui = ui
1059 # unbundle state attr
1063 # unbundle state attr
1060 self._headerdata = header
1064 self._headerdata = header
1061 self._headeroffset = 0
1065 self._headeroffset = 0
1062 self._initialized = False
1066 self._initialized = False
1063 self.consumed = False
1067 self.consumed = False
1064 # part data
1068 # part data
1065 self.id = None
1069 self.id = None
1066 self.type = None
1070 self.type = None
1067 self.mandatoryparams = None
1071 self.mandatoryparams = None
1068 self.advisoryparams = None
1072 self.advisoryparams = None
1069 self.params = None
1073 self.params = None
1070 self.mandatorykeys = ()
1074 self.mandatorykeys = ()
1071 self._payloadstream = None
1075 self._payloadstream = None
1072 self._readheader()
1076 self._readheader()
1073 self._mandatory = None
1077 self._mandatory = None
1074 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1078 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1075 self._pos = 0
1079 self._pos = 0
1076
1080
1077 def _fromheader(self, size):
1081 def _fromheader(self, size):
1078 """return the next <size> byte from the header"""
1082 """return the next <size> byte from the header"""
1079 offset = self._headeroffset
1083 offset = self._headeroffset
1080 data = self._headerdata[offset:(offset + size)]
1084 data = self._headerdata[offset:(offset + size)]
1081 self._headeroffset = offset + size
1085 self._headeroffset = offset + size
1082 return data
1086 return data
1083
1087
1084 def _unpackheader(self, format):
1088 def _unpackheader(self, format):
1085 """read given format from header
1089 """read given format from header
1086
1090
1087 This automatically compute the size of the format to read."""
1091 This automatically compute the size of the format to read."""
1088 data = self._fromheader(struct.calcsize(format))
1092 data = self._fromheader(struct.calcsize(format))
1089 return _unpack(format, data)
1093 return _unpack(format, data)
1090
1094
1091 def _initparams(self, mandatoryparams, advisoryparams):
1095 def _initparams(self, mandatoryparams, advisoryparams):
1092 """internal function to setup all logic related parameters"""
1096 """internal function to setup all logic related parameters"""
1093 # make it read only to prevent people touching it by mistake.
1097 # make it read only to prevent people touching it by mistake.
1094 self.mandatoryparams = tuple(mandatoryparams)
1098 self.mandatoryparams = tuple(mandatoryparams)
1095 self.advisoryparams = tuple(advisoryparams)
1099 self.advisoryparams = tuple(advisoryparams)
1096 # user friendly UI
1100 # user friendly UI
1097 self.params = dict(self.mandatoryparams)
1101 self.params = dict(self.mandatoryparams)
1098 self.params.update(dict(self.advisoryparams))
1102 self.params.update(dict(self.advisoryparams))
1099 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1103 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1100
1104
1101 def _payloadchunks(self, chunknum=0):
1105 def _payloadchunks(self, chunknum=0):
1102 '''seek to specified chunk and start yielding data'''
1106 '''seek to specified chunk and start yielding data'''
1103 if len(self._chunkindex) == 0:
1107 if len(self._chunkindex) == 0:
1104 assert chunknum == 0, 'Must start with chunk 0'
1108 assert chunknum == 0, 'Must start with chunk 0'
1105 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1109 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1106 else:
1110 else:
1107 assert chunknum < len(self._chunkindex), \
1111 assert chunknum < len(self._chunkindex), \
1108 'Unknown chunk %d' % chunknum
1112 'Unknown chunk %d' % chunknum
1109 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1113 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1110
1114
1111 pos = self._chunkindex[chunknum][0]
1115 pos = self._chunkindex[chunknum][0]
1112 payloadsize = self._unpack(_fpayloadsize)[0]
1116 payloadsize = self._unpack(_fpayloadsize)[0]
1113 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1117 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1114 while payloadsize:
1118 while payloadsize:
1115 if payloadsize == flaginterrupt:
1119 if payloadsize == flaginterrupt:
1116 # interruption detection, the handler will now read a
1120 # interruption detection, the handler will now read a
1117 # single part and process it.
1121 # single part and process it.
1118 interrupthandler(self.ui, self._fp)()
1122 interrupthandler(self.ui, self._fp)()
1119 elif payloadsize < 0:
1123 elif payloadsize < 0:
1120 msg = 'negative payload chunk size: %i' % payloadsize
1124 msg = 'negative payload chunk size: %i' % payloadsize
1121 raise error.BundleValueError(msg)
1125 raise error.BundleValueError(msg)
1122 else:
1126 else:
1123 result = self._readexact(payloadsize)
1127 result = self._readexact(payloadsize)
1124 chunknum += 1
1128 chunknum += 1
1125 pos += payloadsize
1129 pos += payloadsize
1126 if chunknum == len(self._chunkindex):
1130 if chunknum == len(self._chunkindex):
1127 self._chunkindex.append((pos,
1131 self._chunkindex.append((pos,
1128 super(unbundlepart, self).tell()))
1132 super(unbundlepart, self).tell()))
1129 yield result
1133 yield result
1130 payloadsize = self._unpack(_fpayloadsize)[0]
1134 payloadsize = self._unpack(_fpayloadsize)[0]
1131 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1135 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1132
1136
1133 def _findchunk(self, pos):
1137 def _findchunk(self, pos):
1134 '''for a given payload position, return a chunk number and offset'''
1138 '''for a given payload position, return a chunk number and offset'''
1135 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1139 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1136 if ppos == pos:
1140 if ppos == pos:
1137 return chunk, 0
1141 return chunk, 0
1138 elif ppos > pos:
1142 elif ppos > pos:
1139 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1143 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1140 raise ValueError('Unknown chunk')
1144 raise ValueError('Unknown chunk')
1141
1145
1142 def _readheader(self):
1146 def _readheader(self):
1143 """read the header and setup the object"""
1147 """read the header and setup the object"""
1144 typesize = self._unpackheader(_fparttypesize)[0]
1148 typesize = self._unpackheader(_fparttypesize)[0]
1145 self.type = self._fromheader(typesize)
1149 self.type = self._fromheader(typesize)
1146 indebug(self.ui, 'part type: "%s"' % self.type)
1150 indebug(self.ui, 'part type: "%s"' % self.type)
1147 self.id = self._unpackheader(_fpartid)[0]
1151 self.id = self._unpackheader(_fpartid)[0]
1148 indebug(self.ui, 'part id: "%s"' % self.id)
1152 indebug(self.ui, 'part id: "%s"' % self.id)
1149 # extract mandatory bit from type
1153 # extract mandatory bit from type
1150 self.mandatory = (self.type != self.type.lower())
1154 self.mandatory = (self.type != self.type.lower())
1151 self.type = self.type.lower()
1155 self.type = self.type.lower()
1152 ## reading parameters
1156 ## reading parameters
1153 # param count
1157 # param count
1154 mancount, advcount = self._unpackheader(_fpartparamcount)
1158 mancount, advcount = self._unpackheader(_fpartparamcount)
1155 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1159 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1156 # param size
1160 # param size
1157 fparamsizes = _makefpartparamsizes(mancount + advcount)
1161 fparamsizes = _makefpartparamsizes(mancount + advcount)
1158 paramsizes = self._unpackheader(fparamsizes)
1162 paramsizes = self._unpackheader(fparamsizes)
1159 # make it a list of couple again
1163 # make it a list of couple again
1160 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1164 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1161 # split mandatory from advisory
1165 # split mandatory from advisory
1162 mansizes = paramsizes[:mancount]
1166 mansizes = paramsizes[:mancount]
1163 advsizes = paramsizes[mancount:]
1167 advsizes = paramsizes[mancount:]
1164 # retrieve param value
1168 # retrieve param value
1165 manparams = []
1169 manparams = []
1166 for key, value in mansizes:
1170 for key, value in mansizes:
1167 manparams.append((self._fromheader(key), self._fromheader(value)))
1171 manparams.append((self._fromheader(key), self._fromheader(value)))
1168 advparams = []
1172 advparams = []
1169 for key, value in advsizes:
1173 for key, value in advsizes:
1170 advparams.append((self._fromheader(key), self._fromheader(value)))
1174 advparams.append((self._fromheader(key), self._fromheader(value)))
1171 self._initparams(manparams, advparams)
1175 self._initparams(manparams, advparams)
1172 ## part payload
1176 ## part payload
1173 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1177 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1174 # we read the data, tell it
1178 # we read the data, tell it
1175 self._initialized = True
1179 self._initialized = True
1176
1180
1177 def read(self, size=None):
1181 def read(self, size=None):
1178 """read payload data"""
1182 """read payload data"""
1179 if not self._initialized:
1183 if not self._initialized:
1180 self._readheader()
1184 self._readheader()
1181 if size is None:
1185 if size is None:
1182 data = self._payloadstream.read()
1186 data = self._payloadstream.read()
1183 else:
1187 else:
1184 data = self._payloadstream.read(size)
1188 data = self._payloadstream.read(size)
1185 self._pos += len(data)
1189 self._pos += len(data)
1186 if size is None or len(data) < size:
1190 if size is None or len(data) < size:
1187 if not self.consumed and self._pos:
1191 if not self.consumed and self._pos:
1188 self.ui.debug('bundle2-input-part: total payload size %i\n'
1192 self.ui.debug('bundle2-input-part: total payload size %i\n'
1189 % self._pos)
1193 % self._pos)
1190 self.consumed = True
1194 self.consumed = True
1191 return data
1195 return data
1192
1196
1193 def tell(self):
1197 def tell(self):
1194 return self._pos
1198 return self._pos
1195
1199
1196 def seek(self, offset, whence=0):
1200 def seek(self, offset, whence=0):
1197 if whence == 0:
1201 if whence == 0:
1198 newpos = offset
1202 newpos = offset
1199 elif whence == 1:
1203 elif whence == 1:
1200 newpos = self._pos + offset
1204 newpos = self._pos + offset
1201 elif whence == 2:
1205 elif whence == 2:
1202 if not self.consumed:
1206 if not self.consumed:
1203 self.read()
1207 self.read()
1204 newpos = self._chunkindex[-1][0] - offset
1208 newpos = self._chunkindex[-1][0] - offset
1205 else:
1209 else:
1206 raise ValueError('Unknown whence value: %r' % (whence,))
1210 raise ValueError('Unknown whence value: %r' % (whence,))
1207
1211
1208 if newpos > self._chunkindex[-1][0] and not self.consumed:
1212 if newpos > self._chunkindex[-1][0] and not self.consumed:
1209 self.read()
1213 self.read()
1210 if not 0 <= newpos <= self._chunkindex[-1][0]:
1214 if not 0 <= newpos <= self._chunkindex[-1][0]:
1211 raise ValueError('Offset out of range')
1215 raise ValueError('Offset out of range')
1212
1216
1213 if self._pos != newpos:
1217 if self._pos != newpos:
1214 chunk, internaloffset = self._findchunk(newpos)
1218 chunk, internaloffset = self._findchunk(newpos)
1215 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1219 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1216 adjust = self.read(internaloffset)
1220 adjust = self.read(internaloffset)
1217 if len(adjust) != internaloffset:
1221 if len(adjust) != internaloffset:
1218 raise error.Abort(_('Seek failed\n'))
1222 raise error.Abort(_('Seek failed\n'))
1219 self._pos = newpos
1223 self._pos = newpos
1220
1224
1221 # These are only the static capabilities.
1225 # These are only the static capabilities.
1222 # Check the 'getrepocaps' function for the rest.
1226 # Check the 'getrepocaps' function for the rest.
1223 capabilities = {'HG20': (),
1227 capabilities = {'HG20': (),
1224 'error': ('abort', 'unsupportedcontent', 'pushraced',
1228 'error': ('abort', 'unsupportedcontent', 'pushraced',
1225 'pushkey'),
1229 'pushkey'),
1226 'listkeys': (),
1230 'listkeys': (),
1227 'pushkey': (),
1231 'pushkey': (),
1228 'digests': tuple(sorted(util.DIGESTS.keys())),
1232 'digests': tuple(sorted(util.DIGESTS.keys())),
1229 'remote-changegroup': ('http', 'https'),
1233 'remote-changegroup': ('http', 'https'),
1230 'hgtagsfnodes': (),
1234 'hgtagsfnodes': (),
1231 }
1235 }
1232
1236
1233 def getrepocaps(repo, allowpushback=False):
1237 def getrepocaps(repo, allowpushback=False):
1234 """return the bundle2 capabilities for a given repo
1238 """return the bundle2 capabilities for a given repo
1235
1239
1236 Exists to allow extensions (like evolution) to mutate the capabilities.
1240 Exists to allow extensions (like evolution) to mutate the capabilities.
1237 """
1241 """
1238 caps = capabilities.copy()
1242 caps = capabilities.copy()
1239 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1243 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1240 if obsolete.isenabled(repo, obsolete.exchangeopt):
1244 if obsolete.isenabled(repo, obsolete.exchangeopt):
1241 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1245 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1242 caps['obsmarkers'] = supportedformat
1246 caps['obsmarkers'] = supportedformat
1243 if allowpushback:
1247 if allowpushback:
1244 caps['pushback'] = ()
1248 caps['pushback'] = ()
1245 return caps
1249 return caps
1246
1250
1247 def bundle2caps(remote):
1251 def bundle2caps(remote):
1248 """return the bundle capabilities of a peer as dict"""
1252 """return the bundle capabilities of a peer as dict"""
1249 raw = remote.capable('bundle2')
1253 raw = remote.capable('bundle2')
1250 if not raw and raw != '':
1254 if not raw and raw != '':
1251 return {}
1255 return {}
1252 capsblob = urllib.unquote(remote.capable('bundle2'))
1256 capsblob = urllib.unquote(remote.capable('bundle2'))
1253 return decodecaps(capsblob)
1257 return decodecaps(capsblob)
1254
1258
1255 def obsmarkersversion(caps):
1259 def obsmarkersversion(caps):
1256 """extract the list of supported obsmarkers versions from a bundle2caps dict
1260 """extract the list of supported obsmarkers versions from a bundle2caps dict
1257 """
1261 """
1258 obscaps = caps.get('obsmarkers', ())
1262 obscaps = caps.get('obsmarkers', ())
1259 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1263 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1260
1264
1261 @parthandler('changegroup', ('version', 'nbchanges'))
1265 @parthandler('changegroup', ('version', 'nbchanges'))
1262 def handlechangegroup(op, inpart):
1266 def handlechangegroup(op, inpart):
1263 """apply a changegroup part on the repo
1267 """apply a changegroup part on the repo
1264
1268
1265 This is a very early implementation that will massive rework before being
1269 This is a very early implementation that will massive rework before being
1266 inflicted to any end-user.
1270 inflicted to any end-user.
1267 """
1271 """
1268 # Make sure we trigger a transaction creation
1272 # Make sure we trigger a transaction creation
1269 #
1273 #
1270 # The addchangegroup function will get a transaction object by itself, but
1274 # The addchangegroup function will get a transaction object by itself, but
1271 # we need to make sure we trigger the creation of a transaction object used
1275 # we need to make sure we trigger the creation of a transaction object used
1272 # for the whole processing scope.
1276 # for the whole processing scope.
1273 op.gettransaction()
1277 op.gettransaction()
1274 unpackerversion = inpart.params.get('version', '01')
1278 unpackerversion = inpart.params.get('version', '01')
1275 # We should raise an appropriate exception here
1279 # We should raise an appropriate exception here
1276 unpacker = changegroup.packermap[unpackerversion][1]
1280 unpacker = changegroup.packermap[unpackerversion][1]
1277 cg = unpacker(inpart, None)
1281 cg = unpacker(inpart, None)
1278 # the source and url passed here are overwritten by the one contained in
1282 # the source and url passed here are overwritten by the one contained in
1279 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1283 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1280 nbchangesets = None
1284 nbchangesets = None
1281 if 'nbchanges' in inpart.params:
1285 if 'nbchanges' in inpart.params:
1282 nbchangesets = int(inpart.params.get('nbchanges'))
1286 nbchangesets = int(inpart.params.get('nbchanges'))
1283 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1287 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1284 op.records.add('changegroup', {'return': ret})
1288 op.records.add('changegroup', {'return': ret})
1285 if op.reply is not None:
1289 if op.reply is not None:
1286 # This is definitely not the final form of this
1290 # This is definitely not the final form of this
1287 # return. But one need to start somewhere.
1291 # return. But one need to start somewhere.
1288 part = op.reply.newpart('reply:changegroup', mandatory=False)
1292 part = op.reply.newpart('reply:changegroup', mandatory=False)
1289 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1293 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1290 part.addparam('return', '%i' % ret, mandatory=False)
1294 part.addparam('return', '%i' % ret, mandatory=False)
1291 assert not inpart.read()
1295 assert not inpart.read()
1292
1296
1293 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1297 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1294 ['digest:%s' % k for k in util.DIGESTS.keys()])
1298 ['digest:%s' % k for k in util.DIGESTS.keys()])
1295 @parthandler('remote-changegroup', _remotechangegroupparams)
1299 @parthandler('remote-changegroup', _remotechangegroupparams)
1296 def handleremotechangegroup(op, inpart):
1300 def handleremotechangegroup(op, inpart):
1297 """apply a bundle10 on the repo, given an url and validation information
1301 """apply a bundle10 on the repo, given an url and validation information
1298
1302
1299 All the information about the remote bundle to import are given as
1303 All the information about the remote bundle to import are given as
1300 parameters. The parameters include:
1304 parameters. The parameters include:
1301 - url: the url to the bundle10.
1305 - url: the url to the bundle10.
1302 - size: the bundle10 file size. It is used to validate what was
1306 - size: the bundle10 file size. It is used to validate what was
1303 retrieved by the client matches the server knowledge about the bundle.
1307 retrieved by the client matches the server knowledge about the bundle.
1304 - digests: a space separated list of the digest types provided as
1308 - digests: a space separated list of the digest types provided as
1305 parameters.
1309 parameters.
1306 - digest:<digest-type>: the hexadecimal representation of the digest with
1310 - digest:<digest-type>: the hexadecimal representation of the digest with
1307 that name. Like the size, it is used to validate what was retrieved by
1311 that name. Like the size, it is used to validate what was retrieved by
1308 the client matches what the server knows about the bundle.
1312 the client matches what the server knows about the bundle.
1309
1313
1310 When multiple digest types are given, all of them are checked.
1314 When multiple digest types are given, all of them are checked.
1311 """
1315 """
1312 try:
1316 try:
1313 raw_url = inpart.params['url']
1317 raw_url = inpart.params['url']
1314 except KeyError:
1318 except KeyError:
1315 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1319 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1316 parsed_url = util.url(raw_url)
1320 parsed_url = util.url(raw_url)
1317 if parsed_url.scheme not in capabilities['remote-changegroup']:
1321 if parsed_url.scheme not in capabilities['remote-changegroup']:
1318 raise error.Abort(_('remote-changegroup does not support %s urls') %
1322 raise error.Abort(_('remote-changegroup does not support %s urls') %
1319 parsed_url.scheme)
1323 parsed_url.scheme)
1320
1324
1321 try:
1325 try:
1322 size = int(inpart.params['size'])
1326 size = int(inpart.params['size'])
1323 except ValueError:
1327 except ValueError:
1324 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1328 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1325 % 'size')
1329 % 'size')
1326 except KeyError:
1330 except KeyError:
1327 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1331 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1328
1332
1329 digests = {}
1333 digests = {}
1330 for typ in inpart.params.get('digests', '').split():
1334 for typ in inpart.params.get('digests', '').split():
1331 param = 'digest:%s' % typ
1335 param = 'digest:%s' % typ
1332 try:
1336 try:
1333 value = inpart.params[param]
1337 value = inpart.params[param]
1334 except KeyError:
1338 except KeyError:
1335 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1339 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1336 param)
1340 param)
1337 digests[typ] = value
1341 digests[typ] = value
1338
1342
1339 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1343 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1340
1344
1341 # Make sure we trigger a transaction creation
1345 # Make sure we trigger a transaction creation
1342 #
1346 #
1343 # The addchangegroup function will get a transaction object by itself, but
1347 # The addchangegroup function will get a transaction object by itself, but
1344 # we need to make sure we trigger the creation of a transaction object used
1348 # we need to make sure we trigger the creation of a transaction object used
1345 # for the whole processing scope.
1349 # for the whole processing scope.
1346 op.gettransaction()
1350 op.gettransaction()
1347 from . import exchange
1351 from . import exchange
1348 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1352 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1349 if not isinstance(cg, changegroup.cg1unpacker):
1353 if not isinstance(cg, changegroup.cg1unpacker):
1350 raise error.Abort(_('%s: not a bundle version 1.0') %
1354 raise error.Abort(_('%s: not a bundle version 1.0') %
1351 util.hidepassword(raw_url))
1355 util.hidepassword(raw_url))
1352 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1356 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1353 op.records.add('changegroup', {'return': ret})
1357 op.records.add('changegroup', {'return': ret})
1354 if op.reply is not None:
1358 if op.reply is not None:
1355 # This is definitely not the final form of this
1359 # This is definitely not the final form of this
1356 # return. But one need to start somewhere.
1360 # return. But one need to start somewhere.
1357 part = op.reply.newpart('reply:changegroup')
1361 part = op.reply.newpart('reply:changegroup')
1358 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1362 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1359 part.addparam('return', '%i' % ret, mandatory=False)
1363 part.addparam('return', '%i' % ret, mandatory=False)
1360 try:
1364 try:
1361 real_part.validate()
1365 real_part.validate()
1362 except error.Abort as e:
1366 except error.Abort as e:
1363 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1367 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1364 (util.hidepassword(raw_url), str(e)))
1368 (util.hidepassword(raw_url), str(e)))
1365 assert not inpart.read()
1369 assert not inpart.read()
1366
1370
1367 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1371 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1368 def handlereplychangegroup(op, inpart):
1372 def handlereplychangegroup(op, inpart):
1369 ret = int(inpart.params['return'])
1373 ret = int(inpart.params['return'])
1370 replyto = int(inpart.params['in-reply-to'])
1374 replyto = int(inpart.params['in-reply-to'])
1371 op.records.add('changegroup', {'return': ret}, replyto)
1375 op.records.add('changegroup', {'return': ret}, replyto)
1372
1376
1373 @parthandler('check:heads')
1377 @parthandler('check:heads')
1374 def handlecheckheads(op, inpart):
1378 def handlecheckheads(op, inpart):
1375 """check that head of the repo did not change
1379 """check that head of the repo did not change
1376
1380
1377 This is used to detect a push race when using unbundle.
1381 This is used to detect a push race when using unbundle.
1378 This replaces the "heads" argument of unbundle."""
1382 This replaces the "heads" argument of unbundle."""
1379 h = inpart.read(20)
1383 h = inpart.read(20)
1380 heads = []
1384 heads = []
1381 while len(h) == 20:
1385 while len(h) == 20:
1382 heads.append(h)
1386 heads.append(h)
1383 h = inpart.read(20)
1387 h = inpart.read(20)
1384 assert not h
1388 assert not h
1385 # Trigger a transaction so that we are guaranteed to have the lock now.
1389 # Trigger a transaction so that we are guaranteed to have the lock now.
1386 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1390 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1387 op.gettransaction()
1391 op.gettransaction()
1388 if heads != op.repo.heads():
1392 if heads != op.repo.heads():
1389 raise error.PushRaced('repository changed while pushing - '
1393 raise error.PushRaced('repository changed while pushing - '
1390 'please try again')
1394 'please try again')
1391
1395
1392 @parthandler('output')
1396 @parthandler('output')
1393 def handleoutput(op, inpart):
1397 def handleoutput(op, inpart):
1394 """forward output captured on the server to the client"""
1398 """forward output captured on the server to the client"""
1395 for line in inpart.read().splitlines():
1399 for line in inpart.read().splitlines():
1396 op.ui.status(('remote: %s\n' % line))
1400 op.ui.status(('remote: %s\n' % line))
1397
1401
1398 @parthandler('replycaps')
1402 @parthandler('replycaps')
1399 def handlereplycaps(op, inpart):
1403 def handlereplycaps(op, inpart):
1400 """Notify that a reply bundle should be created
1404 """Notify that a reply bundle should be created
1401
1405
1402 The payload contains the capabilities information for the reply"""
1406 The payload contains the capabilities information for the reply"""
1403 caps = decodecaps(inpart.read())
1407 caps = decodecaps(inpart.read())
1404 if op.reply is None:
1408 if op.reply is None:
1405 op.reply = bundle20(op.ui, caps)
1409 op.reply = bundle20(op.ui, caps)
1406
1410
1407 @parthandler('error:abort', ('message', 'hint'))
1411 @parthandler('error:abort', ('message', 'hint'))
1408 def handleerrorabort(op, inpart):
1412 def handleerrorabort(op, inpart):
1409 """Used to transmit abort error over the wire"""
1413 """Used to transmit abort error over the wire"""
1410 raise error.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1414 raise error.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1411
1415
1412 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1416 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1413 'in-reply-to'))
1417 'in-reply-to'))
1414 def handleerrorpushkey(op, inpart):
1418 def handleerrorpushkey(op, inpart):
1415 """Used to transmit failure of a mandatory pushkey over the wire"""
1419 """Used to transmit failure of a mandatory pushkey over the wire"""
1416 kwargs = {}
1420 kwargs = {}
1417 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1421 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1418 value = inpart.params.get(name)
1422 value = inpart.params.get(name)
1419 if value is not None:
1423 if value is not None:
1420 kwargs[name] = value
1424 kwargs[name] = value
1421 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1425 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1422
1426
1423 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1427 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1424 def handleerrorunsupportedcontent(op, inpart):
1428 def handleerrorunsupportedcontent(op, inpart):
1425 """Used to transmit unknown content error over the wire"""
1429 """Used to transmit unknown content error over the wire"""
1426 kwargs = {}
1430 kwargs = {}
1427 parttype = inpart.params.get('parttype')
1431 parttype = inpart.params.get('parttype')
1428 if parttype is not None:
1432 if parttype is not None:
1429 kwargs['parttype'] = parttype
1433 kwargs['parttype'] = parttype
1430 params = inpart.params.get('params')
1434 params = inpart.params.get('params')
1431 if params is not None:
1435 if params is not None:
1432 kwargs['params'] = params.split('\0')
1436 kwargs['params'] = params.split('\0')
1433
1437
1434 raise error.BundleUnknownFeatureError(**kwargs)
1438 raise error.BundleUnknownFeatureError(**kwargs)
1435
1439
1436 @parthandler('error:pushraced', ('message',))
1440 @parthandler('error:pushraced', ('message',))
1437 def handleerrorpushraced(op, inpart):
1441 def handleerrorpushraced(op, inpart):
1438 """Used to transmit push race error over the wire"""
1442 """Used to transmit push race error over the wire"""
1439 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1443 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1440
1444
1441 @parthandler('listkeys', ('namespace',))
1445 @parthandler('listkeys', ('namespace',))
1442 def handlelistkeys(op, inpart):
1446 def handlelistkeys(op, inpart):
1443 """retrieve pushkey namespace content stored in a bundle2"""
1447 """retrieve pushkey namespace content stored in a bundle2"""
1444 namespace = inpart.params['namespace']
1448 namespace = inpart.params['namespace']
1445 r = pushkey.decodekeys(inpart.read())
1449 r = pushkey.decodekeys(inpart.read())
1446 op.records.add('listkeys', (namespace, r))
1450 op.records.add('listkeys', (namespace, r))
1447
1451
1448 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1452 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1449 def handlepushkey(op, inpart):
1453 def handlepushkey(op, inpart):
1450 """process a pushkey request"""
1454 """process a pushkey request"""
1451 dec = pushkey.decode
1455 dec = pushkey.decode
1452 namespace = dec(inpart.params['namespace'])
1456 namespace = dec(inpart.params['namespace'])
1453 key = dec(inpart.params['key'])
1457 key = dec(inpart.params['key'])
1454 old = dec(inpart.params['old'])
1458 old = dec(inpart.params['old'])
1455 new = dec(inpart.params['new'])
1459 new = dec(inpart.params['new'])
1456 # Grab the transaction to ensure that we have the lock before performing the
1460 # Grab the transaction to ensure that we have the lock before performing the
1457 # pushkey.
1461 # pushkey.
1458 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1462 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1459 op.gettransaction()
1463 op.gettransaction()
1460 ret = op.repo.pushkey(namespace, key, old, new)
1464 ret = op.repo.pushkey(namespace, key, old, new)
1461 record = {'namespace': namespace,
1465 record = {'namespace': namespace,
1462 'key': key,
1466 'key': key,
1463 'old': old,
1467 'old': old,
1464 'new': new}
1468 'new': new}
1465 op.records.add('pushkey', record)
1469 op.records.add('pushkey', record)
1466 if op.reply is not None:
1470 if op.reply is not None:
1467 rpart = op.reply.newpart('reply:pushkey')
1471 rpart = op.reply.newpart('reply:pushkey')
1468 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1472 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1469 rpart.addparam('return', '%i' % ret, mandatory=False)
1473 rpart.addparam('return', '%i' % ret, mandatory=False)
1470 if inpart.mandatory and not ret:
1474 if inpart.mandatory and not ret:
1471 kwargs = {}
1475 kwargs = {}
1472 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1476 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1473 if key in inpart.params:
1477 if key in inpart.params:
1474 kwargs[key] = inpart.params[key]
1478 kwargs[key] = inpart.params[key]
1475 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1479 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1476
1480
1477 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1481 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1478 def handlepushkeyreply(op, inpart):
1482 def handlepushkeyreply(op, inpart):
1479 """retrieve the result of a pushkey request"""
1483 """retrieve the result of a pushkey request"""
1480 ret = int(inpart.params['return'])
1484 ret = int(inpart.params['return'])
1481 partid = int(inpart.params['in-reply-to'])
1485 partid = int(inpart.params['in-reply-to'])
1482 op.records.add('pushkey', {'return': ret}, partid)
1486 op.records.add('pushkey', {'return': ret}, partid)
1483
1487
1484 @parthandler('obsmarkers')
1488 @parthandler('obsmarkers')
1485 def handleobsmarker(op, inpart):
1489 def handleobsmarker(op, inpart):
1486 """add a stream of obsmarkers to the repo"""
1490 """add a stream of obsmarkers to the repo"""
1487 tr = op.gettransaction()
1491 tr = op.gettransaction()
1488 markerdata = inpart.read()
1492 markerdata = inpart.read()
1489 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1493 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1490 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1494 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1491 % len(markerdata))
1495 % len(markerdata))
1492 # The mergemarkers call will crash if marker creation is not enabled.
1496 # The mergemarkers call will crash if marker creation is not enabled.
1493 # we want to avoid this if the part is advisory.
1497 # we want to avoid this if the part is advisory.
1494 if not inpart.mandatory and op.repo.obsstore.readonly:
1498 if not inpart.mandatory and op.repo.obsstore.readonly:
1495 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1499 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1496 return
1500 return
1497 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1501 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1498 if new:
1502 if new:
1499 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1503 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1500 op.records.add('obsmarkers', {'new': new})
1504 op.records.add('obsmarkers', {'new': new})
1501 if op.reply is not None:
1505 if op.reply is not None:
1502 rpart = op.reply.newpart('reply:obsmarkers')
1506 rpart = op.reply.newpart('reply:obsmarkers')
1503 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1507 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1504 rpart.addparam('new', '%i' % new, mandatory=False)
1508 rpart.addparam('new', '%i' % new, mandatory=False)
1505
1509
1506
1510
1507 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1511 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1508 def handleobsmarkerreply(op, inpart):
1512 def handleobsmarkerreply(op, inpart):
1509 """retrieve the result of a pushkey request"""
1513 """retrieve the result of a pushkey request"""
1510 ret = int(inpart.params['new'])
1514 ret = int(inpart.params['new'])
1511 partid = int(inpart.params['in-reply-to'])
1515 partid = int(inpart.params['in-reply-to'])
1512 op.records.add('obsmarkers', {'new': ret}, partid)
1516 op.records.add('obsmarkers', {'new': ret}, partid)
1513
1517
1514 @parthandler('hgtagsfnodes')
1518 @parthandler('hgtagsfnodes')
1515 def handlehgtagsfnodes(op, inpart):
1519 def handlehgtagsfnodes(op, inpart):
1516 """Applies .hgtags fnodes cache entries to the local repo.
1520 """Applies .hgtags fnodes cache entries to the local repo.
1517
1521
1518 Payload is pairs of 20 byte changeset nodes and filenodes.
1522 Payload is pairs of 20 byte changeset nodes and filenodes.
1519 """
1523 """
1520 # Grab the transaction so we ensure that we have the lock at this point.
1524 # Grab the transaction so we ensure that we have the lock at this point.
1521 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1525 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1522 op.gettransaction()
1526 op.gettransaction()
1523 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1527 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1524
1528
1525 count = 0
1529 count = 0
1526 while True:
1530 while True:
1527 node = inpart.read(20)
1531 node = inpart.read(20)
1528 fnode = inpart.read(20)
1532 fnode = inpart.read(20)
1529 if len(node) < 20 or len(fnode) < 20:
1533 if len(node) < 20 or len(fnode) < 20:
1530 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1534 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1531 break
1535 break
1532 cache.setfnode(node, fnode)
1536 cache.setfnode(node, fnode)
1533 count += 1
1537 count += 1
1534
1538
1535 cache.write()
1539 cache.write()
1536 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1540 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now