##// END OF EJS Templates
bundle: remove obsolete (and duplicate) comment...
Martin von Zweigbergk -
r28672:ca489611 default
parent child Browse files
Show More
@@ -1,1610 +1,1606 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155 import urllib
155 import urllib
156
156
157 from .i18n import _
157 from .i18n import _
158 from . import (
158 from . import (
159 changegroup,
159 changegroup,
160 error,
160 error,
161 obsolete,
161 obsolete,
162 pushkey,
162 pushkey,
163 tags,
163 tags,
164 url,
164 url,
165 util,
165 util,
166 )
166 )
167
167
168 _pack = struct.pack
168 _pack = struct.pack
169 _unpack = struct.unpack
169 _unpack = struct.unpack
170
170
171 _fstreamparamsize = '>i'
171 _fstreamparamsize = '>i'
172 _fpartheadersize = '>i'
172 _fpartheadersize = '>i'
173 _fparttypesize = '>B'
173 _fparttypesize = '>B'
174 _fpartid = '>I'
174 _fpartid = '>I'
175 _fpayloadsize = '>i'
175 _fpayloadsize = '>i'
176 _fpartparamcount = '>BB'
176 _fpartparamcount = '>BB'
177
177
178 preferedchunksize = 4096
178 preferedchunksize = 4096
179
179
180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
181
181
182 def outdebug(ui, message):
182 def outdebug(ui, message):
183 """debug regarding output stream (bundling)"""
183 """debug regarding output stream (bundling)"""
184 if ui.configbool('devel', 'bundle2.debug', False):
184 if ui.configbool('devel', 'bundle2.debug', False):
185 ui.debug('bundle2-output: %s\n' % message)
185 ui.debug('bundle2-output: %s\n' % message)
186
186
187 def indebug(ui, message):
187 def indebug(ui, message):
188 """debug on input stream (unbundling)"""
188 """debug on input stream (unbundling)"""
189 if ui.configbool('devel', 'bundle2.debug', False):
189 if ui.configbool('devel', 'bundle2.debug', False):
190 ui.debug('bundle2-input: %s\n' % message)
190 ui.debug('bundle2-input: %s\n' % message)
191
191
192 def validateparttype(parttype):
192 def validateparttype(parttype):
193 """raise ValueError if a parttype contains invalid character"""
193 """raise ValueError if a parttype contains invalid character"""
194 if _parttypeforbidden.search(parttype):
194 if _parttypeforbidden.search(parttype):
195 raise ValueError(parttype)
195 raise ValueError(parttype)
196
196
197 def _makefpartparamsizes(nbparams):
197 def _makefpartparamsizes(nbparams):
198 """return a struct format to read part parameter sizes
198 """return a struct format to read part parameter sizes
199
199
200 The number parameters is variable so we need to build that format
200 The number parameters is variable so we need to build that format
201 dynamically.
201 dynamically.
202 """
202 """
203 return '>'+('BB'*nbparams)
203 return '>'+('BB'*nbparams)
204
204
205 parthandlermapping = {}
205 parthandlermapping = {}
206
206
207 def parthandler(parttype, params=()):
207 def parthandler(parttype, params=()):
208 """decorator that register a function as a bundle2 part handler
208 """decorator that register a function as a bundle2 part handler
209
209
210 eg::
210 eg::
211
211
212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
213 def myparttypehandler(...):
213 def myparttypehandler(...):
214 '''process a part of type "my part".'''
214 '''process a part of type "my part".'''
215 ...
215 ...
216 """
216 """
217 validateparttype(parttype)
217 validateparttype(parttype)
218 def _decorator(func):
218 def _decorator(func):
219 lparttype = parttype.lower() # enforce lower case matching.
219 lparttype = parttype.lower() # enforce lower case matching.
220 assert lparttype not in parthandlermapping
220 assert lparttype not in parthandlermapping
221 parthandlermapping[lparttype] = func
221 parthandlermapping[lparttype] = func
222 func.params = frozenset(params)
222 func.params = frozenset(params)
223 return func
223 return func
224 return _decorator
224 return _decorator
225
225
226 class unbundlerecords(object):
226 class unbundlerecords(object):
227 """keep record of what happens during and unbundle
227 """keep record of what happens during and unbundle
228
228
229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
230 category of record and obj is an arbitrary object.
230 category of record and obj is an arbitrary object.
231
231
232 `records['cat']` will return all entries of this category 'cat'.
232 `records['cat']` will return all entries of this category 'cat'.
233
233
234 Iterating on the object itself will yield `('category', obj)` tuples
234 Iterating on the object itself will yield `('category', obj)` tuples
235 for all entries.
235 for all entries.
236
236
237 All iterations happens in chronological order.
237 All iterations happens in chronological order.
238 """
238 """
239
239
240 def __init__(self):
240 def __init__(self):
241 self._categories = {}
241 self._categories = {}
242 self._sequences = []
242 self._sequences = []
243 self._replies = {}
243 self._replies = {}
244
244
245 def add(self, category, entry, inreplyto=None):
245 def add(self, category, entry, inreplyto=None):
246 """add a new record of a given category.
246 """add a new record of a given category.
247
247
248 The entry can then be retrieved in the list returned by
248 The entry can then be retrieved in the list returned by
249 self['category']."""
249 self['category']."""
250 self._categories.setdefault(category, []).append(entry)
250 self._categories.setdefault(category, []).append(entry)
251 self._sequences.append((category, entry))
251 self._sequences.append((category, entry))
252 if inreplyto is not None:
252 if inreplyto is not None:
253 self.getreplies(inreplyto).add(category, entry)
253 self.getreplies(inreplyto).add(category, entry)
254
254
255 def getreplies(self, partid):
255 def getreplies(self, partid):
256 """get the records that are replies to a specific part"""
256 """get the records that are replies to a specific part"""
257 return self._replies.setdefault(partid, unbundlerecords())
257 return self._replies.setdefault(partid, unbundlerecords())
258
258
259 def __getitem__(self, cat):
259 def __getitem__(self, cat):
260 return tuple(self._categories.get(cat, ()))
260 return tuple(self._categories.get(cat, ()))
261
261
262 def __iter__(self):
262 def __iter__(self):
263 return iter(self._sequences)
263 return iter(self._sequences)
264
264
265 def __len__(self):
265 def __len__(self):
266 return len(self._sequences)
266 return len(self._sequences)
267
267
268 def __nonzero__(self):
268 def __nonzero__(self):
269 return bool(self._sequences)
269 return bool(self._sequences)
270
270
271 class bundleoperation(object):
271 class bundleoperation(object):
272 """an object that represents a single bundling process
272 """an object that represents a single bundling process
273
273
274 Its purpose is to carry unbundle-related objects and states.
274 Its purpose is to carry unbundle-related objects and states.
275
275
276 A new object should be created at the beginning of each bundle processing.
276 A new object should be created at the beginning of each bundle processing.
277 The object is to be returned by the processing function.
277 The object is to be returned by the processing function.
278
278
279 The object has very little content now it will ultimately contain:
279 The object has very little content now it will ultimately contain:
280 * an access to the repo the bundle is applied to,
280 * an access to the repo the bundle is applied to,
281 * a ui object,
281 * a ui object,
282 * a way to retrieve a transaction to add changes to the repo,
282 * a way to retrieve a transaction to add changes to the repo,
283 * a way to record the result of processing each part,
283 * a way to record the result of processing each part,
284 * a way to construct a bundle response when applicable.
284 * a way to construct a bundle response when applicable.
285 """
285 """
286
286
287 def __init__(self, repo, transactiongetter, captureoutput=True):
287 def __init__(self, repo, transactiongetter, captureoutput=True):
288 self.repo = repo
288 self.repo = repo
289 self.ui = repo.ui
289 self.ui = repo.ui
290 self.records = unbundlerecords()
290 self.records = unbundlerecords()
291 self.gettransaction = transactiongetter
291 self.gettransaction = transactiongetter
292 self.reply = None
292 self.reply = None
293 self.captureoutput = captureoutput
293 self.captureoutput = captureoutput
294
294
295 class TransactionUnavailable(RuntimeError):
295 class TransactionUnavailable(RuntimeError):
296 pass
296 pass
297
297
298 def _notransaction():
298 def _notransaction():
299 """default method to get a transaction while processing a bundle
299 """default method to get a transaction while processing a bundle
300
300
301 Raise an exception to highlight the fact that no transaction was expected
301 Raise an exception to highlight the fact that no transaction was expected
302 to be created"""
302 to be created"""
303 raise TransactionUnavailable()
303 raise TransactionUnavailable()
304
304
305 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
305 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
306 # transform me into unbundler.apply() as soon as the freeze is lifted
306 # transform me into unbundler.apply() as soon as the freeze is lifted
307 tr.hookargs['bundle2'] = '1'
307 tr.hookargs['bundle2'] = '1'
308 if source is not None and 'source' not in tr.hookargs:
308 if source is not None and 'source' not in tr.hookargs:
309 tr.hookargs['source'] = source
309 tr.hookargs['source'] = source
310 if url is not None and 'url' not in tr.hookargs:
310 if url is not None and 'url' not in tr.hookargs:
311 tr.hookargs['url'] = url
311 tr.hookargs['url'] = url
312 return processbundle(repo, unbundler, lambda: tr, op=op)
312 return processbundle(repo, unbundler, lambda: tr, op=op)
313
313
314 def processbundle(repo, unbundler, transactiongetter=None, op=None):
314 def processbundle(repo, unbundler, transactiongetter=None, op=None):
315 """This function process a bundle, apply effect to/from a repo
315 """This function process a bundle, apply effect to/from a repo
316
316
317 It iterates over each part then searches for and uses the proper handling
317 It iterates over each part then searches for and uses the proper handling
318 code to process the part. Parts are processed in order.
318 code to process the part. Parts are processed in order.
319
319
320 This is very early version of this function that will be strongly reworked
320 This is very early version of this function that will be strongly reworked
321 before final usage.
321 before final usage.
322
322
323 Unknown Mandatory part will abort the process.
323 Unknown Mandatory part will abort the process.
324
324
325 It is temporarily possible to provide a prebuilt bundleoperation to the
325 It is temporarily possible to provide a prebuilt bundleoperation to the
326 function. This is used to ensure output is properly propagated in case of
326 function. This is used to ensure output is properly propagated in case of
327 an error during the unbundling. This output capturing part will likely be
327 an error during the unbundling. This output capturing part will likely be
328 reworked and this ability will probably go away in the process.
328 reworked and this ability will probably go away in the process.
329 """
329 """
330 if op is None:
330 if op is None:
331 if transactiongetter is None:
331 if transactiongetter is None:
332 transactiongetter = _notransaction
332 transactiongetter = _notransaction
333 op = bundleoperation(repo, transactiongetter)
333 op = bundleoperation(repo, transactiongetter)
334 # todo:
334 # todo:
335 # - replace this is a init function soon.
335 # - replace this is a init function soon.
336 # - exception catching
336 # - exception catching
337 unbundler.params
337 unbundler.params
338 if repo.ui.debugflag:
338 if repo.ui.debugflag:
339 msg = ['bundle2-input-bundle:']
339 msg = ['bundle2-input-bundle:']
340 if unbundler.params:
340 if unbundler.params:
341 msg.append(' %i params')
341 msg.append(' %i params')
342 if op.gettransaction is None:
342 if op.gettransaction is None:
343 msg.append(' no-transaction')
343 msg.append(' no-transaction')
344 else:
344 else:
345 msg.append(' with-transaction')
345 msg.append(' with-transaction')
346 msg.append('\n')
346 msg.append('\n')
347 repo.ui.debug(''.join(msg))
347 repo.ui.debug(''.join(msg))
348 iterparts = enumerate(unbundler.iterparts())
348 iterparts = enumerate(unbundler.iterparts())
349 part = None
349 part = None
350 nbpart = 0
350 nbpart = 0
351 try:
351 try:
352 for nbpart, part in iterparts:
352 for nbpart, part in iterparts:
353 _processpart(op, part)
353 _processpart(op, part)
354 except BaseException as exc:
354 except BaseException as exc:
355 for nbpart, part in iterparts:
355 for nbpart, part in iterparts:
356 # consume the bundle content
356 # consume the bundle content
357 part.seek(0, 2)
357 part.seek(0, 2)
358 # Small hack to let caller code distinguish exceptions from bundle2
358 # Small hack to let caller code distinguish exceptions from bundle2
359 # processing from processing the old format. This is mostly
359 # processing from processing the old format. This is mostly
360 # needed to handle different return codes to unbundle according to the
360 # needed to handle different return codes to unbundle according to the
361 # type of bundle. We should probably clean up or drop this return code
361 # type of bundle. We should probably clean up or drop this return code
362 # craziness in a future version.
362 # craziness in a future version.
363 exc.duringunbundle2 = True
363 exc.duringunbundle2 = True
364 salvaged = []
364 salvaged = []
365 replycaps = None
365 replycaps = None
366 if op.reply is not None:
366 if op.reply is not None:
367 salvaged = op.reply.salvageoutput()
367 salvaged = op.reply.salvageoutput()
368 replycaps = op.reply.capabilities
368 replycaps = op.reply.capabilities
369 exc._replycaps = replycaps
369 exc._replycaps = replycaps
370 exc._bundle2salvagedoutput = salvaged
370 exc._bundle2salvagedoutput = salvaged
371 raise
371 raise
372 finally:
372 finally:
373 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
373 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
374
374
375 return op
375 return op
376
376
377 def _processpart(op, part):
377 def _processpart(op, part):
378 """process a single part from a bundle
378 """process a single part from a bundle
379
379
380 The part is guaranteed to have been fully consumed when the function exits
380 The part is guaranteed to have been fully consumed when the function exits
381 (even if an exception is raised)."""
381 (even if an exception is raised)."""
382 status = 'unknown' # used by debug output
382 status = 'unknown' # used by debug output
383 try:
383 try:
384 try:
384 try:
385 handler = parthandlermapping.get(part.type)
385 handler = parthandlermapping.get(part.type)
386 if handler is None:
386 if handler is None:
387 status = 'unsupported-type'
387 status = 'unsupported-type'
388 raise error.BundleUnknownFeatureError(parttype=part.type)
388 raise error.BundleUnknownFeatureError(parttype=part.type)
389 indebug(op.ui, 'found a handler for part %r' % part.type)
389 indebug(op.ui, 'found a handler for part %r' % part.type)
390 unknownparams = part.mandatorykeys - handler.params
390 unknownparams = part.mandatorykeys - handler.params
391 if unknownparams:
391 if unknownparams:
392 unknownparams = list(unknownparams)
392 unknownparams = list(unknownparams)
393 unknownparams.sort()
393 unknownparams.sort()
394 status = 'unsupported-params (%s)' % unknownparams
394 status = 'unsupported-params (%s)' % unknownparams
395 raise error.BundleUnknownFeatureError(parttype=part.type,
395 raise error.BundleUnknownFeatureError(parttype=part.type,
396 params=unknownparams)
396 params=unknownparams)
397 status = 'supported'
397 status = 'supported'
398 except error.BundleUnknownFeatureError as exc:
398 except error.BundleUnknownFeatureError as exc:
399 if part.mandatory: # mandatory parts
399 if part.mandatory: # mandatory parts
400 raise
400 raise
401 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
401 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
402 return # skip to part processing
402 return # skip to part processing
403 finally:
403 finally:
404 if op.ui.debugflag:
404 if op.ui.debugflag:
405 msg = ['bundle2-input-part: "%s"' % part.type]
405 msg = ['bundle2-input-part: "%s"' % part.type]
406 if not part.mandatory:
406 if not part.mandatory:
407 msg.append(' (advisory)')
407 msg.append(' (advisory)')
408 nbmp = len(part.mandatorykeys)
408 nbmp = len(part.mandatorykeys)
409 nbap = len(part.params) - nbmp
409 nbap = len(part.params) - nbmp
410 if nbmp or nbap:
410 if nbmp or nbap:
411 msg.append(' (params:')
411 msg.append(' (params:')
412 if nbmp:
412 if nbmp:
413 msg.append(' %i mandatory' % nbmp)
413 msg.append(' %i mandatory' % nbmp)
414 if nbap:
414 if nbap:
415 msg.append(' %i advisory' % nbmp)
415 msg.append(' %i advisory' % nbmp)
416 msg.append(')')
416 msg.append(')')
417 msg.append(' %s\n' % status)
417 msg.append(' %s\n' % status)
418 op.ui.debug(''.join(msg))
418 op.ui.debug(''.join(msg))
419
419
420 # handler is called outside the above try block so that we don't
420 # handler is called outside the above try block so that we don't
421 # risk catching KeyErrors from anything other than the
421 # risk catching KeyErrors from anything other than the
422 # parthandlermapping lookup (any KeyError raised by handler()
422 # parthandlermapping lookup (any KeyError raised by handler()
423 # itself represents a defect of a different variety).
423 # itself represents a defect of a different variety).
424 output = None
424 output = None
425 if op.captureoutput and op.reply is not None:
425 if op.captureoutput and op.reply is not None:
426 op.ui.pushbuffer(error=True, subproc=True)
426 op.ui.pushbuffer(error=True, subproc=True)
427 output = ''
427 output = ''
428 try:
428 try:
429 handler(op, part)
429 handler(op, part)
430 finally:
430 finally:
431 if output is not None:
431 if output is not None:
432 output = op.ui.popbuffer()
432 output = op.ui.popbuffer()
433 if output:
433 if output:
434 outpart = op.reply.newpart('output', data=output,
434 outpart = op.reply.newpart('output', data=output,
435 mandatory=False)
435 mandatory=False)
436 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
436 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
437 finally:
437 finally:
438 # consume the part content to not corrupt the stream.
438 # consume the part content to not corrupt the stream.
439 part.seek(0, 2)
439 part.seek(0, 2)
440
440
441
441
442 def decodecaps(blob):
442 def decodecaps(blob):
443 """decode a bundle2 caps bytes blob into a dictionary
443 """decode a bundle2 caps bytes blob into a dictionary
444
444
445 The blob is a list of capabilities (one per line)
445 The blob is a list of capabilities (one per line)
446 Capabilities may have values using a line of the form::
446 Capabilities may have values using a line of the form::
447
447
448 capability=value1,value2,value3
448 capability=value1,value2,value3
449
449
450 The values are always a list."""
450 The values are always a list."""
451 caps = {}
451 caps = {}
452 for line in blob.splitlines():
452 for line in blob.splitlines():
453 if not line:
453 if not line:
454 continue
454 continue
455 if '=' not in line:
455 if '=' not in line:
456 key, vals = line, ()
456 key, vals = line, ()
457 else:
457 else:
458 key, vals = line.split('=', 1)
458 key, vals = line.split('=', 1)
459 vals = vals.split(',')
459 vals = vals.split(',')
460 key = urllib.unquote(key)
460 key = urllib.unquote(key)
461 vals = [urllib.unquote(v) for v in vals]
461 vals = [urllib.unquote(v) for v in vals]
462 caps[key] = vals
462 caps[key] = vals
463 return caps
463 return caps
464
464
465 def encodecaps(caps):
465 def encodecaps(caps):
466 """encode a bundle2 caps dictionary into a bytes blob"""
466 """encode a bundle2 caps dictionary into a bytes blob"""
467 chunks = []
467 chunks = []
468 for ca in sorted(caps):
468 for ca in sorted(caps):
469 vals = caps[ca]
469 vals = caps[ca]
470 ca = urllib.quote(ca)
470 ca = urllib.quote(ca)
471 vals = [urllib.quote(v) for v in vals]
471 vals = [urllib.quote(v) for v in vals]
472 if vals:
472 if vals:
473 ca = "%s=%s" % (ca, ','.join(vals))
473 ca = "%s=%s" % (ca, ','.join(vals))
474 chunks.append(ca)
474 chunks.append(ca)
475 return '\n'.join(chunks)
475 return '\n'.join(chunks)
476
476
477 bundletypes = {
477 bundletypes = {
478 "": ("", None), # only when using unbundle on ssh and old http servers
478 "": ("", None), # only when using unbundle on ssh and old http servers
479 # since the unification ssh accepts a header but there
479 # since the unification ssh accepts a header but there
480 # is no capability signaling it.
480 # is no capability signaling it.
481 "HG20": (), # special-cased below
481 "HG20": (), # special-cased below
482 "HG10UN": ("HG10UN", None),
482 "HG10UN": ("HG10UN", None),
483 "HG10BZ": ("HG10", 'BZ'),
483 "HG10BZ": ("HG10", 'BZ'),
484 "HG10GZ": ("HG10GZ", 'GZ'),
484 "HG10GZ": ("HG10GZ", 'GZ'),
485 }
485 }
486
486
487 # hgweb uses this list to communicate its preferred type
487 # hgweb uses this list to communicate its preferred type
488 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
488 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
489
489
490 class bundle20(object):
490 class bundle20(object):
491 """represent an outgoing bundle2 container
491 """represent an outgoing bundle2 container
492
492
493 Use the `addparam` method to add stream level parameter. and `newpart` to
493 Use the `addparam` method to add stream level parameter. and `newpart` to
494 populate it. Then call `getchunks` to retrieve all the binary chunks of
494 populate it. Then call `getchunks` to retrieve all the binary chunks of
495 data that compose the bundle2 container."""
495 data that compose the bundle2 container."""
496
496
497 _magicstring = 'HG20'
497 _magicstring = 'HG20'
498
498
499 def __init__(self, ui, capabilities=()):
499 def __init__(self, ui, capabilities=()):
500 self.ui = ui
500 self.ui = ui
501 self._params = []
501 self._params = []
502 self._parts = []
502 self._parts = []
503 self.capabilities = dict(capabilities)
503 self.capabilities = dict(capabilities)
504 self._compressor = util.compressors[None]()
504 self._compressor = util.compressors[None]()
505
505
506 def setcompression(self, alg):
506 def setcompression(self, alg):
507 """setup core part compression to <alg>"""
507 """setup core part compression to <alg>"""
508 if alg is None:
508 if alg is None:
509 return
509 return
510 assert not any(n.lower() == 'Compression' for n, v in self._params)
510 assert not any(n.lower() == 'Compression' for n, v in self._params)
511 self.addparam('Compression', alg)
511 self.addparam('Compression', alg)
512 self._compressor = util.compressors[alg]()
512 self._compressor = util.compressors[alg]()
513
513
514 @property
514 @property
515 def nbparts(self):
515 def nbparts(self):
516 """total number of parts added to the bundler"""
516 """total number of parts added to the bundler"""
517 return len(self._parts)
517 return len(self._parts)
518
518
519 # methods used to defines the bundle2 content
519 # methods used to defines the bundle2 content
520 def addparam(self, name, value=None):
520 def addparam(self, name, value=None):
521 """add a stream level parameter"""
521 """add a stream level parameter"""
522 if not name:
522 if not name:
523 raise ValueError('empty parameter name')
523 raise ValueError('empty parameter name')
524 if name[0] not in string.letters:
524 if name[0] not in string.letters:
525 raise ValueError('non letter first character: %r' % name)
525 raise ValueError('non letter first character: %r' % name)
526 self._params.append((name, value))
526 self._params.append((name, value))
527
527
528 def addpart(self, part):
528 def addpart(self, part):
529 """add a new part to the bundle2 container
529 """add a new part to the bundle2 container
530
530
531 Parts contains the actual applicative payload."""
531 Parts contains the actual applicative payload."""
532 assert part.id is None
532 assert part.id is None
533 part.id = len(self._parts) # very cheap counter
533 part.id = len(self._parts) # very cheap counter
534 self._parts.append(part)
534 self._parts.append(part)
535
535
536 def newpart(self, typeid, *args, **kwargs):
536 def newpart(self, typeid, *args, **kwargs):
537 """create a new part and add it to the containers
537 """create a new part and add it to the containers
538
538
539 As the part is directly added to the containers. For now, this means
539 As the part is directly added to the containers. For now, this means
540 that any failure to properly initialize the part after calling
540 that any failure to properly initialize the part after calling
541 ``newpart`` should result in a failure of the whole bundling process.
541 ``newpart`` should result in a failure of the whole bundling process.
542
542
543 You can still fall back to manually create and add if you need better
543 You can still fall back to manually create and add if you need better
544 control."""
544 control."""
545 part = bundlepart(typeid, *args, **kwargs)
545 part = bundlepart(typeid, *args, **kwargs)
546 self.addpart(part)
546 self.addpart(part)
547 return part
547 return part
548
548
549 # methods used to generate the bundle2 stream
549 # methods used to generate the bundle2 stream
550 def getchunks(self):
550 def getchunks(self):
551 if self.ui.debugflag:
551 if self.ui.debugflag:
552 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
552 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
553 if self._params:
553 if self._params:
554 msg.append(' (%i params)' % len(self._params))
554 msg.append(' (%i params)' % len(self._params))
555 msg.append(' %i parts total\n' % len(self._parts))
555 msg.append(' %i parts total\n' % len(self._parts))
556 self.ui.debug(''.join(msg))
556 self.ui.debug(''.join(msg))
557 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
557 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
558 yield self._magicstring
558 yield self._magicstring
559 param = self._paramchunk()
559 param = self._paramchunk()
560 outdebug(self.ui, 'bundle parameter: %s' % param)
560 outdebug(self.ui, 'bundle parameter: %s' % param)
561 yield _pack(_fstreamparamsize, len(param))
561 yield _pack(_fstreamparamsize, len(param))
562 if param:
562 if param:
563 yield param
563 yield param
564 # starting compression
564 # starting compression
565 for chunk in self._getcorechunk():
565 for chunk in self._getcorechunk():
566 yield self._compressor.compress(chunk)
566 yield self._compressor.compress(chunk)
567 yield self._compressor.flush()
567 yield self._compressor.flush()
568
568
569 def _paramchunk(self):
569 def _paramchunk(self):
570 """return a encoded version of all stream parameters"""
570 """return a encoded version of all stream parameters"""
571 blocks = []
571 blocks = []
572 for par, value in self._params:
572 for par, value in self._params:
573 par = urllib.quote(par)
573 par = urllib.quote(par)
574 if value is not None:
574 if value is not None:
575 value = urllib.quote(value)
575 value = urllib.quote(value)
576 par = '%s=%s' % (par, value)
576 par = '%s=%s' % (par, value)
577 blocks.append(par)
577 blocks.append(par)
578 return ' '.join(blocks)
578 return ' '.join(blocks)
579
579
580 def _getcorechunk(self):
580 def _getcorechunk(self):
581 """yield chunk for the core part of the bundle
581 """yield chunk for the core part of the bundle
582
582
583 (all but headers and parameters)"""
583 (all but headers and parameters)"""
584 outdebug(self.ui, 'start of parts')
584 outdebug(self.ui, 'start of parts')
585 for part in self._parts:
585 for part in self._parts:
586 outdebug(self.ui, 'bundle part: "%s"' % part.type)
586 outdebug(self.ui, 'bundle part: "%s"' % part.type)
587 for chunk in part.getchunks(ui=self.ui):
587 for chunk in part.getchunks(ui=self.ui):
588 yield chunk
588 yield chunk
589 outdebug(self.ui, 'end of bundle')
589 outdebug(self.ui, 'end of bundle')
590 yield _pack(_fpartheadersize, 0)
590 yield _pack(_fpartheadersize, 0)
591
591
592
592
593 def salvageoutput(self):
593 def salvageoutput(self):
594 """return a list with a copy of all output parts in the bundle
594 """return a list with a copy of all output parts in the bundle
595
595
596 This is meant to be used during error handling to make sure we preserve
596 This is meant to be used during error handling to make sure we preserve
597 server output"""
597 server output"""
598 salvaged = []
598 salvaged = []
599 for part in self._parts:
599 for part in self._parts:
600 if part.type.startswith('output'):
600 if part.type.startswith('output'):
601 salvaged.append(part.copy())
601 salvaged.append(part.copy())
602 return salvaged
602 return salvaged
603
603
604
604
605 class unpackermixin(object):
605 class unpackermixin(object):
606 """A mixin to extract bytes and struct data from a stream"""
606 """A mixin to extract bytes and struct data from a stream"""
607
607
608 def __init__(self, fp):
608 def __init__(self, fp):
609 self._fp = fp
609 self._fp = fp
610 self._seekable = (util.safehasattr(fp, 'seek') and
610 self._seekable = (util.safehasattr(fp, 'seek') and
611 util.safehasattr(fp, 'tell'))
611 util.safehasattr(fp, 'tell'))
612
612
613 def _unpack(self, format):
613 def _unpack(self, format):
614 """unpack this struct format from the stream"""
614 """unpack this struct format from the stream"""
615 data = self._readexact(struct.calcsize(format))
615 data = self._readexact(struct.calcsize(format))
616 return _unpack(format, data)
616 return _unpack(format, data)
617
617
618 def _readexact(self, size):
618 def _readexact(self, size):
619 """read exactly <size> bytes from the stream"""
619 """read exactly <size> bytes from the stream"""
620 return changegroup.readexactly(self._fp, size)
620 return changegroup.readexactly(self._fp, size)
621
621
622 def seek(self, offset, whence=0):
622 def seek(self, offset, whence=0):
623 """move the underlying file pointer"""
623 """move the underlying file pointer"""
624 if self._seekable:
624 if self._seekable:
625 return self._fp.seek(offset, whence)
625 return self._fp.seek(offset, whence)
626 else:
626 else:
627 raise NotImplementedError(_('File pointer is not seekable'))
627 raise NotImplementedError(_('File pointer is not seekable'))
628
628
629 def tell(self):
629 def tell(self):
630 """return the file offset, or None if file is not seekable"""
630 """return the file offset, or None if file is not seekable"""
631 if self._seekable:
631 if self._seekable:
632 try:
632 try:
633 return self._fp.tell()
633 return self._fp.tell()
634 except IOError as e:
634 except IOError as e:
635 if e.errno == errno.ESPIPE:
635 if e.errno == errno.ESPIPE:
636 self._seekable = False
636 self._seekable = False
637 else:
637 else:
638 raise
638 raise
639 return None
639 return None
640
640
641 def close(self):
641 def close(self):
642 """close underlying file"""
642 """close underlying file"""
643 if util.safehasattr(self._fp, 'close'):
643 if util.safehasattr(self._fp, 'close'):
644 return self._fp.close()
644 return self._fp.close()
645
645
646 def getunbundler(ui, fp, magicstring=None):
646 def getunbundler(ui, fp, magicstring=None):
647 """return a valid unbundler object for a given magicstring"""
647 """return a valid unbundler object for a given magicstring"""
648 if magicstring is None:
648 if magicstring is None:
649 magicstring = changegroup.readexactly(fp, 4)
649 magicstring = changegroup.readexactly(fp, 4)
650 magic, version = magicstring[0:2], magicstring[2:4]
650 magic, version = magicstring[0:2], magicstring[2:4]
651 if magic != 'HG':
651 if magic != 'HG':
652 raise error.Abort(_('not a Mercurial bundle'))
652 raise error.Abort(_('not a Mercurial bundle'))
653 unbundlerclass = formatmap.get(version)
653 unbundlerclass = formatmap.get(version)
654 if unbundlerclass is None:
654 if unbundlerclass is None:
655 raise error.Abort(_('unknown bundle version %s') % version)
655 raise error.Abort(_('unknown bundle version %s') % version)
656 unbundler = unbundlerclass(ui, fp)
656 unbundler = unbundlerclass(ui, fp)
657 indebug(ui, 'start processing of %s stream' % magicstring)
657 indebug(ui, 'start processing of %s stream' % magicstring)
658 return unbundler
658 return unbundler
659
659
660 class unbundle20(unpackermixin):
660 class unbundle20(unpackermixin):
661 """interpret a bundle2 stream
661 """interpret a bundle2 stream
662
662
663 This class is fed with a binary stream and yields parts through its
663 This class is fed with a binary stream and yields parts through its
664 `iterparts` methods."""
664 `iterparts` methods."""
665
665
666 _magicstring = 'HG20'
666 _magicstring = 'HG20'
667
667
668 def __init__(self, ui, fp):
668 def __init__(self, ui, fp):
669 """If header is specified, we do not read it out of the stream."""
669 """If header is specified, we do not read it out of the stream."""
670 self.ui = ui
670 self.ui = ui
671 self._decompressor = util.decompressors[None]
671 self._decompressor = util.decompressors[None]
672 self._compressed = None
672 self._compressed = None
673 super(unbundle20, self).__init__(fp)
673 super(unbundle20, self).__init__(fp)
674
674
675 @util.propertycache
675 @util.propertycache
676 def params(self):
676 def params(self):
677 """dictionary of stream level parameters"""
677 """dictionary of stream level parameters"""
678 indebug(self.ui, 'reading bundle2 stream parameters')
678 indebug(self.ui, 'reading bundle2 stream parameters')
679 params = {}
679 params = {}
680 paramssize = self._unpack(_fstreamparamsize)[0]
680 paramssize = self._unpack(_fstreamparamsize)[0]
681 if paramssize < 0:
681 if paramssize < 0:
682 raise error.BundleValueError('negative bundle param size: %i'
682 raise error.BundleValueError('negative bundle param size: %i'
683 % paramssize)
683 % paramssize)
684 if paramssize:
684 if paramssize:
685 params = self._readexact(paramssize)
685 params = self._readexact(paramssize)
686 params = self._processallparams(params)
686 params = self._processallparams(params)
687 return params
687 return params
688
688
689 def _processallparams(self, paramsblock):
689 def _processallparams(self, paramsblock):
690 """"""
690 """"""
691 params = {}
691 params = {}
692 for p in paramsblock.split(' '):
692 for p in paramsblock.split(' '):
693 p = p.split('=', 1)
693 p = p.split('=', 1)
694 p = [urllib.unquote(i) for i in p]
694 p = [urllib.unquote(i) for i in p]
695 if len(p) < 2:
695 if len(p) < 2:
696 p.append(None)
696 p.append(None)
697 self._processparam(*p)
697 self._processparam(*p)
698 params[p[0]] = p[1]
698 params[p[0]] = p[1]
699 return params
699 return params
700
700
701
701
702 def _processparam(self, name, value):
702 def _processparam(self, name, value):
703 """process a parameter, applying its effect if needed
703 """process a parameter, applying its effect if needed
704
704
705 Parameter starting with a lower case letter are advisory and will be
705 Parameter starting with a lower case letter are advisory and will be
706 ignored when unknown. Those starting with an upper case letter are
706 ignored when unknown. Those starting with an upper case letter are
707 mandatory and will this function will raise a KeyError when unknown.
707 mandatory and will this function will raise a KeyError when unknown.
708
708
709 Note: no option are currently supported. Any input will be either
709 Note: no option are currently supported. Any input will be either
710 ignored or failing.
710 ignored or failing.
711 """
711 """
712 if not name:
712 if not name:
713 raise ValueError('empty parameter name')
713 raise ValueError('empty parameter name')
714 if name[0] not in string.letters:
714 if name[0] not in string.letters:
715 raise ValueError('non letter first character: %r' % name)
715 raise ValueError('non letter first character: %r' % name)
716 try:
716 try:
717 handler = b2streamparamsmap[name.lower()]
717 handler = b2streamparamsmap[name.lower()]
718 except KeyError:
718 except KeyError:
719 if name[0].islower():
719 if name[0].islower():
720 indebug(self.ui, "ignoring unknown parameter %r" % name)
720 indebug(self.ui, "ignoring unknown parameter %r" % name)
721 else:
721 else:
722 raise error.BundleUnknownFeatureError(params=(name,))
722 raise error.BundleUnknownFeatureError(params=(name,))
723 else:
723 else:
724 handler(self, name, value)
724 handler(self, name, value)
725
725
726 def _forwardchunks(self):
726 def _forwardchunks(self):
727 """utility to transfer a bundle2 as binary
727 """utility to transfer a bundle2 as binary
728
728
729 This is made necessary by the fact the 'getbundle' command over 'ssh'
729 This is made necessary by the fact the 'getbundle' command over 'ssh'
730 have no way to know then the reply end, relying on the bundle to be
730 have no way to know then the reply end, relying on the bundle to be
731 interpreted to know its end. This is terrible and we are sorry, but we
731 interpreted to know its end. This is terrible and we are sorry, but we
732 needed to move forward to get general delta enabled.
732 needed to move forward to get general delta enabled.
733 """
733 """
734 yield self._magicstring
734 yield self._magicstring
735 assert 'params' not in vars(self)
735 assert 'params' not in vars(self)
736 paramssize = self._unpack(_fstreamparamsize)[0]
736 paramssize = self._unpack(_fstreamparamsize)[0]
737 if paramssize < 0:
737 if paramssize < 0:
738 raise error.BundleValueError('negative bundle param size: %i'
738 raise error.BundleValueError('negative bundle param size: %i'
739 % paramssize)
739 % paramssize)
740 yield _pack(_fstreamparamsize, paramssize)
740 yield _pack(_fstreamparamsize, paramssize)
741 if paramssize:
741 if paramssize:
742 params = self._readexact(paramssize)
742 params = self._readexact(paramssize)
743 self._processallparams(params)
743 self._processallparams(params)
744 yield params
744 yield params
745 assert self._decompressor is util.decompressors[None]
745 assert self._decompressor is util.decompressors[None]
746 # From there, payload might need to be decompressed
746 # From there, payload might need to be decompressed
747 self._fp = self._decompressor(self._fp)
747 self._fp = self._decompressor(self._fp)
748 emptycount = 0
748 emptycount = 0
749 while emptycount < 2:
749 while emptycount < 2:
750 # so we can brainlessly loop
750 # so we can brainlessly loop
751 assert _fpartheadersize == _fpayloadsize
751 assert _fpartheadersize == _fpayloadsize
752 size = self._unpack(_fpartheadersize)[0]
752 size = self._unpack(_fpartheadersize)[0]
753 yield _pack(_fpartheadersize, size)
753 yield _pack(_fpartheadersize, size)
754 if size:
754 if size:
755 emptycount = 0
755 emptycount = 0
756 else:
756 else:
757 emptycount += 1
757 emptycount += 1
758 continue
758 continue
759 if size == flaginterrupt:
759 if size == flaginterrupt:
760 continue
760 continue
761 elif size < 0:
761 elif size < 0:
762 raise error.BundleValueError('negative chunk size: %i')
762 raise error.BundleValueError('negative chunk size: %i')
763 yield self._readexact(size)
763 yield self._readexact(size)
764
764
765
765
766 def iterparts(self):
766 def iterparts(self):
767 """yield all parts contained in the stream"""
767 """yield all parts contained in the stream"""
768 # make sure param have been loaded
768 # make sure param have been loaded
769 self.params
769 self.params
770 # From there, payload need to be decompressed
770 # From there, payload need to be decompressed
771 self._fp = self._decompressor(self._fp)
771 self._fp = self._decompressor(self._fp)
772 indebug(self.ui, 'start extraction of bundle2 parts')
772 indebug(self.ui, 'start extraction of bundle2 parts')
773 headerblock = self._readpartheader()
773 headerblock = self._readpartheader()
774 while headerblock is not None:
774 while headerblock is not None:
775 part = unbundlepart(self.ui, headerblock, self._fp)
775 part = unbundlepart(self.ui, headerblock, self._fp)
776 yield part
776 yield part
777 part.seek(0, 2)
777 part.seek(0, 2)
778 headerblock = self._readpartheader()
778 headerblock = self._readpartheader()
779 indebug(self.ui, 'end of bundle2 stream')
779 indebug(self.ui, 'end of bundle2 stream')
780
780
781 def _readpartheader(self):
781 def _readpartheader(self):
782 """reads a part header size and return the bytes blob
782 """reads a part header size and return the bytes blob
783
783
784 returns None if empty"""
784 returns None if empty"""
785 headersize = self._unpack(_fpartheadersize)[0]
785 headersize = self._unpack(_fpartheadersize)[0]
786 if headersize < 0:
786 if headersize < 0:
787 raise error.BundleValueError('negative part header size: %i'
787 raise error.BundleValueError('negative part header size: %i'
788 % headersize)
788 % headersize)
789 indebug(self.ui, 'part header size: %i' % headersize)
789 indebug(self.ui, 'part header size: %i' % headersize)
790 if headersize:
790 if headersize:
791 return self._readexact(headersize)
791 return self._readexact(headersize)
792 return None
792 return None
793
793
794 def compressed(self):
794 def compressed(self):
795 self.params # load params
795 self.params # load params
796 return self._compressed
796 return self._compressed
797
797
798 formatmap = {'20': unbundle20}
798 formatmap = {'20': unbundle20}
799
799
800 b2streamparamsmap = {}
800 b2streamparamsmap = {}
801
801
802 def b2streamparamhandler(name):
802 def b2streamparamhandler(name):
803 """register a handler for a stream level parameter"""
803 """register a handler for a stream level parameter"""
804 def decorator(func):
804 def decorator(func):
805 assert name not in formatmap
805 assert name not in formatmap
806 b2streamparamsmap[name] = func
806 b2streamparamsmap[name] = func
807 return func
807 return func
808 return decorator
808 return decorator
809
809
810 @b2streamparamhandler('compression')
810 @b2streamparamhandler('compression')
811 def processcompression(unbundler, param, value):
811 def processcompression(unbundler, param, value):
812 """read compression parameter and install payload decompression"""
812 """read compression parameter and install payload decompression"""
813 if value not in util.decompressors:
813 if value not in util.decompressors:
814 raise error.BundleUnknownFeatureError(params=(param,),
814 raise error.BundleUnknownFeatureError(params=(param,),
815 values=(value,))
815 values=(value,))
816 unbundler._decompressor = util.decompressors[value]
816 unbundler._decompressor = util.decompressors[value]
817 if value is not None:
817 if value is not None:
818 unbundler._compressed = True
818 unbundler._compressed = True
819
819
820 class bundlepart(object):
820 class bundlepart(object):
821 """A bundle2 part contains application level payload
821 """A bundle2 part contains application level payload
822
822
823 The part `type` is used to route the part to the application level
823 The part `type` is used to route the part to the application level
824 handler.
824 handler.
825
825
826 The part payload is contained in ``part.data``. It could be raw bytes or a
826 The part payload is contained in ``part.data``. It could be raw bytes or a
827 generator of byte chunks.
827 generator of byte chunks.
828
828
829 You can add parameters to the part using the ``addparam`` method.
829 You can add parameters to the part using the ``addparam`` method.
830 Parameters can be either mandatory (default) or advisory. Remote side
830 Parameters can be either mandatory (default) or advisory. Remote side
831 should be able to safely ignore the advisory ones.
831 should be able to safely ignore the advisory ones.
832
832
833 Both data and parameters cannot be modified after the generation has begun.
833 Both data and parameters cannot be modified after the generation has begun.
834 """
834 """
835
835
836 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
836 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
837 data='', mandatory=True):
837 data='', mandatory=True):
838 validateparttype(parttype)
838 validateparttype(parttype)
839 self.id = None
839 self.id = None
840 self.type = parttype
840 self.type = parttype
841 self._data = data
841 self._data = data
842 self._mandatoryparams = list(mandatoryparams)
842 self._mandatoryparams = list(mandatoryparams)
843 self._advisoryparams = list(advisoryparams)
843 self._advisoryparams = list(advisoryparams)
844 # checking for duplicated entries
844 # checking for duplicated entries
845 self._seenparams = set()
845 self._seenparams = set()
846 for pname, __ in self._mandatoryparams + self._advisoryparams:
846 for pname, __ in self._mandatoryparams + self._advisoryparams:
847 if pname in self._seenparams:
847 if pname in self._seenparams:
848 raise RuntimeError('duplicated params: %s' % pname)
848 raise RuntimeError('duplicated params: %s' % pname)
849 self._seenparams.add(pname)
849 self._seenparams.add(pname)
850 # status of the part's generation:
850 # status of the part's generation:
851 # - None: not started,
851 # - None: not started,
852 # - False: currently generated,
852 # - False: currently generated,
853 # - True: generation done.
853 # - True: generation done.
854 self._generated = None
854 self._generated = None
855 self.mandatory = mandatory
855 self.mandatory = mandatory
856
856
857 def copy(self):
857 def copy(self):
858 """return a copy of the part
858 """return a copy of the part
859
859
860 The new part have the very same content but no partid assigned yet.
860 The new part have the very same content but no partid assigned yet.
861 Parts with generated data cannot be copied."""
861 Parts with generated data cannot be copied."""
862 assert not util.safehasattr(self.data, 'next')
862 assert not util.safehasattr(self.data, 'next')
863 return self.__class__(self.type, self._mandatoryparams,
863 return self.__class__(self.type, self._mandatoryparams,
864 self._advisoryparams, self._data, self.mandatory)
864 self._advisoryparams, self._data, self.mandatory)
865
865
866 # methods used to defines the part content
866 # methods used to defines the part content
867 @property
867 @property
868 def data(self):
868 def data(self):
869 return self._data
869 return self._data
870
870
871 @data.setter
871 @data.setter
872 def data(self, data):
872 def data(self, data):
873 if self._generated is not None:
873 if self._generated is not None:
874 raise error.ReadOnlyPartError('part is being generated')
874 raise error.ReadOnlyPartError('part is being generated')
875 self._data = data
875 self._data = data
876
876
877 @property
877 @property
878 def mandatoryparams(self):
878 def mandatoryparams(self):
879 # make it an immutable tuple to force people through ``addparam``
879 # make it an immutable tuple to force people through ``addparam``
880 return tuple(self._mandatoryparams)
880 return tuple(self._mandatoryparams)
881
881
882 @property
882 @property
883 def advisoryparams(self):
883 def advisoryparams(self):
884 # make it an immutable tuple to force people through ``addparam``
884 # make it an immutable tuple to force people through ``addparam``
885 return tuple(self._advisoryparams)
885 return tuple(self._advisoryparams)
886
886
887 def addparam(self, name, value='', mandatory=True):
887 def addparam(self, name, value='', mandatory=True):
888 if self._generated is not None:
888 if self._generated is not None:
889 raise error.ReadOnlyPartError('part is being generated')
889 raise error.ReadOnlyPartError('part is being generated')
890 if name in self._seenparams:
890 if name in self._seenparams:
891 raise ValueError('duplicated params: %s' % name)
891 raise ValueError('duplicated params: %s' % name)
892 self._seenparams.add(name)
892 self._seenparams.add(name)
893 params = self._advisoryparams
893 params = self._advisoryparams
894 if mandatory:
894 if mandatory:
895 params = self._mandatoryparams
895 params = self._mandatoryparams
896 params.append((name, value))
896 params.append((name, value))
897
897
898 # methods used to generates the bundle2 stream
898 # methods used to generates the bundle2 stream
899 def getchunks(self, ui):
899 def getchunks(self, ui):
900 if self._generated is not None:
900 if self._generated is not None:
901 raise RuntimeError('part can only be consumed once')
901 raise RuntimeError('part can only be consumed once')
902 self._generated = False
902 self._generated = False
903
903
904 if ui.debugflag:
904 if ui.debugflag:
905 msg = ['bundle2-output-part: "%s"' % self.type]
905 msg = ['bundle2-output-part: "%s"' % self.type]
906 if not self.mandatory:
906 if not self.mandatory:
907 msg.append(' (advisory)')
907 msg.append(' (advisory)')
908 nbmp = len(self.mandatoryparams)
908 nbmp = len(self.mandatoryparams)
909 nbap = len(self.advisoryparams)
909 nbap = len(self.advisoryparams)
910 if nbmp or nbap:
910 if nbmp or nbap:
911 msg.append(' (params:')
911 msg.append(' (params:')
912 if nbmp:
912 if nbmp:
913 msg.append(' %i mandatory' % nbmp)
913 msg.append(' %i mandatory' % nbmp)
914 if nbap:
914 if nbap:
915 msg.append(' %i advisory' % nbmp)
915 msg.append(' %i advisory' % nbmp)
916 msg.append(')')
916 msg.append(')')
917 if not self.data:
917 if not self.data:
918 msg.append(' empty payload')
918 msg.append(' empty payload')
919 elif util.safehasattr(self.data, 'next'):
919 elif util.safehasattr(self.data, 'next'):
920 msg.append(' streamed payload')
920 msg.append(' streamed payload')
921 else:
921 else:
922 msg.append(' %i bytes payload' % len(self.data))
922 msg.append(' %i bytes payload' % len(self.data))
923 msg.append('\n')
923 msg.append('\n')
924 ui.debug(''.join(msg))
924 ui.debug(''.join(msg))
925
925
926 #### header
926 #### header
927 if self.mandatory:
927 if self.mandatory:
928 parttype = self.type.upper()
928 parttype = self.type.upper()
929 else:
929 else:
930 parttype = self.type.lower()
930 parttype = self.type.lower()
931 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
931 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
932 ## parttype
932 ## parttype
933 header = [_pack(_fparttypesize, len(parttype)),
933 header = [_pack(_fparttypesize, len(parttype)),
934 parttype, _pack(_fpartid, self.id),
934 parttype, _pack(_fpartid, self.id),
935 ]
935 ]
936 ## parameters
936 ## parameters
937 # count
937 # count
938 manpar = self.mandatoryparams
938 manpar = self.mandatoryparams
939 advpar = self.advisoryparams
939 advpar = self.advisoryparams
940 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
940 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
941 # size
941 # size
942 parsizes = []
942 parsizes = []
943 for key, value in manpar:
943 for key, value in manpar:
944 parsizes.append(len(key))
944 parsizes.append(len(key))
945 parsizes.append(len(value))
945 parsizes.append(len(value))
946 for key, value in advpar:
946 for key, value in advpar:
947 parsizes.append(len(key))
947 parsizes.append(len(key))
948 parsizes.append(len(value))
948 parsizes.append(len(value))
949 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
949 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
950 header.append(paramsizes)
950 header.append(paramsizes)
951 # key, value
951 # key, value
952 for key, value in manpar:
952 for key, value in manpar:
953 header.append(key)
953 header.append(key)
954 header.append(value)
954 header.append(value)
955 for key, value in advpar:
955 for key, value in advpar:
956 header.append(key)
956 header.append(key)
957 header.append(value)
957 header.append(value)
958 ## finalize header
958 ## finalize header
959 headerchunk = ''.join(header)
959 headerchunk = ''.join(header)
960 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
960 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
961 yield _pack(_fpartheadersize, len(headerchunk))
961 yield _pack(_fpartheadersize, len(headerchunk))
962 yield headerchunk
962 yield headerchunk
963 ## payload
963 ## payload
964 try:
964 try:
965 for chunk in self._payloadchunks():
965 for chunk in self._payloadchunks():
966 outdebug(ui, 'payload chunk size: %i' % len(chunk))
966 outdebug(ui, 'payload chunk size: %i' % len(chunk))
967 yield _pack(_fpayloadsize, len(chunk))
967 yield _pack(_fpayloadsize, len(chunk))
968 yield chunk
968 yield chunk
969 except GeneratorExit:
969 except GeneratorExit:
970 # GeneratorExit means that nobody is listening for our
970 # GeneratorExit means that nobody is listening for our
971 # results anyway, so just bail quickly rather than trying
971 # results anyway, so just bail quickly rather than trying
972 # to produce an error part.
972 # to produce an error part.
973 ui.debug('bundle2-generatorexit\n')
973 ui.debug('bundle2-generatorexit\n')
974 raise
974 raise
975 except BaseException as exc:
975 except BaseException as exc:
976 # backup exception data for later
976 # backup exception data for later
977 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
977 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
978 % exc)
978 % exc)
979 exc_info = sys.exc_info()
979 exc_info = sys.exc_info()
980 msg = 'unexpected error: %s' % exc
980 msg = 'unexpected error: %s' % exc
981 interpart = bundlepart('error:abort', [('message', msg)],
981 interpart = bundlepart('error:abort', [('message', msg)],
982 mandatory=False)
982 mandatory=False)
983 interpart.id = 0
983 interpart.id = 0
984 yield _pack(_fpayloadsize, -1)
984 yield _pack(_fpayloadsize, -1)
985 for chunk in interpart.getchunks(ui=ui):
985 for chunk in interpart.getchunks(ui=ui):
986 yield chunk
986 yield chunk
987 outdebug(ui, 'closing payload chunk')
987 outdebug(ui, 'closing payload chunk')
988 # abort current part payload
988 # abort current part payload
989 yield _pack(_fpayloadsize, 0)
989 yield _pack(_fpayloadsize, 0)
990 raise exc_info[0], exc_info[1], exc_info[2]
990 raise exc_info[0], exc_info[1], exc_info[2]
991 # end of payload
991 # end of payload
992 outdebug(ui, 'closing payload chunk')
992 outdebug(ui, 'closing payload chunk')
993 yield _pack(_fpayloadsize, 0)
993 yield _pack(_fpayloadsize, 0)
994 self._generated = True
994 self._generated = True
995
995
996 def _payloadchunks(self):
996 def _payloadchunks(self):
997 """yield chunks of a the part payload
997 """yield chunks of a the part payload
998
998
999 Exists to handle the different methods to provide data to a part."""
999 Exists to handle the different methods to provide data to a part."""
1000 # we only support fixed size data now.
1000 # we only support fixed size data now.
1001 # This will be improved in the future.
1001 # This will be improved in the future.
1002 if util.safehasattr(self.data, 'next'):
1002 if util.safehasattr(self.data, 'next'):
1003 buff = util.chunkbuffer(self.data)
1003 buff = util.chunkbuffer(self.data)
1004 chunk = buff.read(preferedchunksize)
1004 chunk = buff.read(preferedchunksize)
1005 while chunk:
1005 while chunk:
1006 yield chunk
1006 yield chunk
1007 chunk = buff.read(preferedchunksize)
1007 chunk = buff.read(preferedchunksize)
1008 elif len(self.data):
1008 elif len(self.data):
1009 yield self.data
1009 yield self.data
1010
1010
1011
1011
1012 flaginterrupt = -1
1012 flaginterrupt = -1
1013
1013
1014 class interrupthandler(unpackermixin):
1014 class interrupthandler(unpackermixin):
1015 """read one part and process it with restricted capability
1015 """read one part and process it with restricted capability
1016
1016
1017 This allows to transmit exception raised on the producer size during part
1017 This allows to transmit exception raised on the producer size during part
1018 iteration while the consumer is reading a part.
1018 iteration while the consumer is reading a part.
1019
1019
1020 Part processed in this manner only have access to a ui object,"""
1020 Part processed in this manner only have access to a ui object,"""
1021
1021
1022 def __init__(self, ui, fp):
1022 def __init__(self, ui, fp):
1023 super(interrupthandler, self).__init__(fp)
1023 super(interrupthandler, self).__init__(fp)
1024 self.ui = ui
1024 self.ui = ui
1025
1025
1026 def _readpartheader(self):
1026 def _readpartheader(self):
1027 """reads a part header size and return the bytes blob
1027 """reads a part header size and return the bytes blob
1028
1028
1029 returns None if empty"""
1029 returns None if empty"""
1030 headersize = self._unpack(_fpartheadersize)[0]
1030 headersize = self._unpack(_fpartheadersize)[0]
1031 if headersize < 0:
1031 if headersize < 0:
1032 raise error.BundleValueError('negative part header size: %i'
1032 raise error.BundleValueError('negative part header size: %i'
1033 % headersize)
1033 % headersize)
1034 indebug(self.ui, 'part header size: %i\n' % headersize)
1034 indebug(self.ui, 'part header size: %i\n' % headersize)
1035 if headersize:
1035 if headersize:
1036 return self._readexact(headersize)
1036 return self._readexact(headersize)
1037 return None
1037 return None
1038
1038
1039 def __call__(self):
1039 def __call__(self):
1040
1040
1041 self.ui.debug('bundle2-input-stream-interrupt:'
1041 self.ui.debug('bundle2-input-stream-interrupt:'
1042 ' opening out of band context\n')
1042 ' opening out of band context\n')
1043 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1043 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1044 headerblock = self._readpartheader()
1044 headerblock = self._readpartheader()
1045 if headerblock is None:
1045 if headerblock is None:
1046 indebug(self.ui, 'no part found during interruption.')
1046 indebug(self.ui, 'no part found during interruption.')
1047 return
1047 return
1048 part = unbundlepart(self.ui, headerblock, self._fp)
1048 part = unbundlepart(self.ui, headerblock, self._fp)
1049 op = interruptoperation(self.ui)
1049 op = interruptoperation(self.ui)
1050 _processpart(op, part)
1050 _processpart(op, part)
1051 self.ui.debug('bundle2-input-stream-interrupt:'
1051 self.ui.debug('bundle2-input-stream-interrupt:'
1052 ' closing out of band context\n')
1052 ' closing out of band context\n')
1053
1053
1054 class interruptoperation(object):
1054 class interruptoperation(object):
1055 """A limited operation to be use by part handler during interruption
1055 """A limited operation to be use by part handler during interruption
1056
1056
1057 It only have access to an ui object.
1057 It only have access to an ui object.
1058 """
1058 """
1059
1059
1060 def __init__(self, ui):
1060 def __init__(self, ui):
1061 self.ui = ui
1061 self.ui = ui
1062 self.reply = None
1062 self.reply = None
1063 self.captureoutput = False
1063 self.captureoutput = False
1064
1064
1065 @property
1065 @property
1066 def repo(self):
1066 def repo(self):
1067 raise RuntimeError('no repo access from stream interruption')
1067 raise RuntimeError('no repo access from stream interruption')
1068
1068
1069 def gettransaction(self):
1069 def gettransaction(self):
1070 raise TransactionUnavailable('no repo access from stream interruption')
1070 raise TransactionUnavailable('no repo access from stream interruption')
1071
1071
1072 class unbundlepart(unpackermixin):
1072 class unbundlepart(unpackermixin):
1073 """a bundle part read from a bundle"""
1073 """a bundle part read from a bundle"""
1074
1074
1075 def __init__(self, ui, header, fp):
1075 def __init__(self, ui, header, fp):
1076 super(unbundlepart, self).__init__(fp)
1076 super(unbundlepart, self).__init__(fp)
1077 self.ui = ui
1077 self.ui = ui
1078 # unbundle state attr
1078 # unbundle state attr
1079 self._headerdata = header
1079 self._headerdata = header
1080 self._headeroffset = 0
1080 self._headeroffset = 0
1081 self._initialized = False
1081 self._initialized = False
1082 self.consumed = False
1082 self.consumed = False
1083 # part data
1083 # part data
1084 self.id = None
1084 self.id = None
1085 self.type = None
1085 self.type = None
1086 self.mandatoryparams = None
1086 self.mandatoryparams = None
1087 self.advisoryparams = None
1087 self.advisoryparams = None
1088 self.params = None
1088 self.params = None
1089 self.mandatorykeys = ()
1089 self.mandatorykeys = ()
1090 self._payloadstream = None
1090 self._payloadstream = None
1091 self._readheader()
1091 self._readheader()
1092 self._mandatory = None
1092 self._mandatory = None
1093 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1093 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1094 self._pos = 0
1094 self._pos = 0
1095
1095
1096 def _fromheader(self, size):
1096 def _fromheader(self, size):
1097 """return the next <size> byte from the header"""
1097 """return the next <size> byte from the header"""
1098 offset = self._headeroffset
1098 offset = self._headeroffset
1099 data = self._headerdata[offset:(offset + size)]
1099 data = self._headerdata[offset:(offset + size)]
1100 self._headeroffset = offset + size
1100 self._headeroffset = offset + size
1101 return data
1101 return data
1102
1102
1103 def _unpackheader(self, format):
1103 def _unpackheader(self, format):
1104 """read given format from header
1104 """read given format from header
1105
1105
1106 This automatically compute the size of the format to read."""
1106 This automatically compute the size of the format to read."""
1107 data = self._fromheader(struct.calcsize(format))
1107 data = self._fromheader(struct.calcsize(format))
1108 return _unpack(format, data)
1108 return _unpack(format, data)
1109
1109
1110 def _initparams(self, mandatoryparams, advisoryparams):
1110 def _initparams(self, mandatoryparams, advisoryparams):
1111 """internal function to setup all logic related parameters"""
1111 """internal function to setup all logic related parameters"""
1112 # make it read only to prevent people touching it by mistake.
1112 # make it read only to prevent people touching it by mistake.
1113 self.mandatoryparams = tuple(mandatoryparams)
1113 self.mandatoryparams = tuple(mandatoryparams)
1114 self.advisoryparams = tuple(advisoryparams)
1114 self.advisoryparams = tuple(advisoryparams)
1115 # user friendly UI
1115 # user friendly UI
1116 self.params = dict(self.mandatoryparams)
1116 self.params = dict(self.mandatoryparams)
1117 self.params.update(dict(self.advisoryparams))
1117 self.params.update(dict(self.advisoryparams))
1118 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1118 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1119
1119
1120 def _payloadchunks(self, chunknum=0):
1120 def _payloadchunks(self, chunknum=0):
1121 '''seek to specified chunk and start yielding data'''
1121 '''seek to specified chunk and start yielding data'''
1122 if len(self._chunkindex) == 0:
1122 if len(self._chunkindex) == 0:
1123 assert chunknum == 0, 'Must start with chunk 0'
1123 assert chunknum == 0, 'Must start with chunk 0'
1124 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1124 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1125 else:
1125 else:
1126 assert chunknum < len(self._chunkindex), \
1126 assert chunknum < len(self._chunkindex), \
1127 'Unknown chunk %d' % chunknum
1127 'Unknown chunk %d' % chunknum
1128 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1128 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1129
1129
1130 pos = self._chunkindex[chunknum][0]
1130 pos = self._chunkindex[chunknum][0]
1131 payloadsize = self._unpack(_fpayloadsize)[0]
1131 payloadsize = self._unpack(_fpayloadsize)[0]
1132 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1132 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1133 while payloadsize:
1133 while payloadsize:
1134 if payloadsize == flaginterrupt:
1134 if payloadsize == flaginterrupt:
1135 # interruption detection, the handler will now read a
1135 # interruption detection, the handler will now read a
1136 # single part and process it.
1136 # single part and process it.
1137 interrupthandler(self.ui, self._fp)()
1137 interrupthandler(self.ui, self._fp)()
1138 elif payloadsize < 0:
1138 elif payloadsize < 0:
1139 msg = 'negative payload chunk size: %i' % payloadsize
1139 msg = 'negative payload chunk size: %i' % payloadsize
1140 raise error.BundleValueError(msg)
1140 raise error.BundleValueError(msg)
1141 else:
1141 else:
1142 result = self._readexact(payloadsize)
1142 result = self._readexact(payloadsize)
1143 chunknum += 1
1143 chunknum += 1
1144 pos += payloadsize
1144 pos += payloadsize
1145 if chunknum == len(self._chunkindex):
1145 if chunknum == len(self._chunkindex):
1146 self._chunkindex.append((pos,
1146 self._chunkindex.append((pos,
1147 super(unbundlepart, self).tell()))
1147 super(unbundlepart, self).tell()))
1148 yield result
1148 yield result
1149 payloadsize = self._unpack(_fpayloadsize)[0]
1149 payloadsize = self._unpack(_fpayloadsize)[0]
1150 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1150 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1151
1151
1152 def _findchunk(self, pos):
1152 def _findchunk(self, pos):
1153 '''for a given payload position, return a chunk number and offset'''
1153 '''for a given payload position, return a chunk number and offset'''
1154 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1154 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1155 if ppos == pos:
1155 if ppos == pos:
1156 return chunk, 0
1156 return chunk, 0
1157 elif ppos > pos:
1157 elif ppos > pos:
1158 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1158 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1159 raise ValueError('Unknown chunk')
1159 raise ValueError('Unknown chunk')
1160
1160
1161 def _readheader(self):
1161 def _readheader(self):
1162 """read the header and setup the object"""
1162 """read the header and setup the object"""
1163 typesize = self._unpackheader(_fparttypesize)[0]
1163 typesize = self._unpackheader(_fparttypesize)[0]
1164 self.type = self._fromheader(typesize)
1164 self.type = self._fromheader(typesize)
1165 indebug(self.ui, 'part type: "%s"' % self.type)
1165 indebug(self.ui, 'part type: "%s"' % self.type)
1166 self.id = self._unpackheader(_fpartid)[0]
1166 self.id = self._unpackheader(_fpartid)[0]
1167 indebug(self.ui, 'part id: "%s"' % self.id)
1167 indebug(self.ui, 'part id: "%s"' % self.id)
1168 # extract mandatory bit from type
1168 # extract mandatory bit from type
1169 self.mandatory = (self.type != self.type.lower())
1169 self.mandatory = (self.type != self.type.lower())
1170 self.type = self.type.lower()
1170 self.type = self.type.lower()
1171 ## reading parameters
1171 ## reading parameters
1172 # param count
1172 # param count
1173 mancount, advcount = self._unpackheader(_fpartparamcount)
1173 mancount, advcount = self._unpackheader(_fpartparamcount)
1174 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1174 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1175 # param size
1175 # param size
1176 fparamsizes = _makefpartparamsizes(mancount + advcount)
1176 fparamsizes = _makefpartparamsizes(mancount + advcount)
1177 paramsizes = self._unpackheader(fparamsizes)
1177 paramsizes = self._unpackheader(fparamsizes)
1178 # make it a list of couple again
1178 # make it a list of couple again
1179 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1179 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1180 # split mandatory from advisory
1180 # split mandatory from advisory
1181 mansizes = paramsizes[:mancount]
1181 mansizes = paramsizes[:mancount]
1182 advsizes = paramsizes[mancount:]
1182 advsizes = paramsizes[mancount:]
1183 # retrieve param value
1183 # retrieve param value
1184 manparams = []
1184 manparams = []
1185 for key, value in mansizes:
1185 for key, value in mansizes:
1186 manparams.append((self._fromheader(key), self._fromheader(value)))
1186 manparams.append((self._fromheader(key), self._fromheader(value)))
1187 advparams = []
1187 advparams = []
1188 for key, value in advsizes:
1188 for key, value in advsizes:
1189 advparams.append((self._fromheader(key), self._fromheader(value)))
1189 advparams.append((self._fromheader(key), self._fromheader(value)))
1190 self._initparams(manparams, advparams)
1190 self._initparams(manparams, advparams)
1191 ## part payload
1191 ## part payload
1192 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1192 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1193 # we read the data, tell it
1193 # we read the data, tell it
1194 self._initialized = True
1194 self._initialized = True
1195
1195
1196 def read(self, size=None):
1196 def read(self, size=None):
1197 """read payload data"""
1197 """read payload data"""
1198 if not self._initialized:
1198 if not self._initialized:
1199 self._readheader()
1199 self._readheader()
1200 if size is None:
1200 if size is None:
1201 data = self._payloadstream.read()
1201 data = self._payloadstream.read()
1202 else:
1202 else:
1203 data = self._payloadstream.read(size)
1203 data = self._payloadstream.read(size)
1204 self._pos += len(data)
1204 self._pos += len(data)
1205 if size is None or len(data) < size:
1205 if size is None or len(data) < size:
1206 if not self.consumed and self._pos:
1206 if not self.consumed and self._pos:
1207 self.ui.debug('bundle2-input-part: total payload size %i\n'
1207 self.ui.debug('bundle2-input-part: total payload size %i\n'
1208 % self._pos)
1208 % self._pos)
1209 self.consumed = True
1209 self.consumed = True
1210 return data
1210 return data
1211
1211
1212 def tell(self):
1212 def tell(self):
1213 return self._pos
1213 return self._pos
1214
1214
1215 def seek(self, offset, whence=0):
1215 def seek(self, offset, whence=0):
1216 if whence == 0:
1216 if whence == 0:
1217 newpos = offset
1217 newpos = offset
1218 elif whence == 1:
1218 elif whence == 1:
1219 newpos = self._pos + offset
1219 newpos = self._pos + offset
1220 elif whence == 2:
1220 elif whence == 2:
1221 if not self.consumed:
1221 if not self.consumed:
1222 self.read()
1222 self.read()
1223 newpos = self._chunkindex[-1][0] - offset
1223 newpos = self._chunkindex[-1][0] - offset
1224 else:
1224 else:
1225 raise ValueError('Unknown whence value: %r' % (whence,))
1225 raise ValueError('Unknown whence value: %r' % (whence,))
1226
1226
1227 if newpos > self._chunkindex[-1][0] and not self.consumed:
1227 if newpos > self._chunkindex[-1][0] and not self.consumed:
1228 self.read()
1228 self.read()
1229 if not 0 <= newpos <= self._chunkindex[-1][0]:
1229 if not 0 <= newpos <= self._chunkindex[-1][0]:
1230 raise ValueError('Offset out of range')
1230 raise ValueError('Offset out of range')
1231
1231
1232 if self._pos != newpos:
1232 if self._pos != newpos:
1233 chunk, internaloffset = self._findchunk(newpos)
1233 chunk, internaloffset = self._findchunk(newpos)
1234 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1234 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1235 adjust = self.read(internaloffset)
1235 adjust = self.read(internaloffset)
1236 if len(adjust) != internaloffset:
1236 if len(adjust) != internaloffset:
1237 raise error.Abort(_('Seek failed\n'))
1237 raise error.Abort(_('Seek failed\n'))
1238 self._pos = newpos
1238 self._pos = newpos
1239
1239
1240 # These are only the static capabilities.
1240 # These are only the static capabilities.
1241 # Check the 'getrepocaps' function for the rest.
1241 # Check the 'getrepocaps' function for the rest.
1242 capabilities = {'HG20': (),
1242 capabilities = {'HG20': (),
1243 'error': ('abort', 'unsupportedcontent', 'pushraced',
1243 'error': ('abort', 'unsupportedcontent', 'pushraced',
1244 'pushkey'),
1244 'pushkey'),
1245 'listkeys': (),
1245 'listkeys': (),
1246 'pushkey': (),
1246 'pushkey': (),
1247 'digests': tuple(sorted(util.DIGESTS.keys())),
1247 'digests': tuple(sorted(util.DIGESTS.keys())),
1248 'remote-changegroup': ('http', 'https'),
1248 'remote-changegroup': ('http', 'https'),
1249 'hgtagsfnodes': (),
1249 'hgtagsfnodes': (),
1250 }
1250 }
1251
1251
1252 def getrepocaps(repo, allowpushback=False):
1252 def getrepocaps(repo, allowpushback=False):
1253 """return the bundle2 capabilities for a given repo
1253 """return the bundle2 capabilities for a given repo
1254
1254
1255 Exists to allow extensions (like evolution) to mutate the capabilities.
1255 Exists to allow extensions (like evolution) to mutate the capabilities.
1256 """
1256 """
1257 caps = capabilities.copy()
1257 caps = capabilities.copy()
1258 caps['changegroup'] = tuple(sorted(
1258 caps['changegroup'] = tuple(sorted(
1259 changegroup.supportedincomingversions(repo)))
1259 changegroup.supportedincomingversions(repo)))
1260 if obsolete.isenabled(repo, obsolete.exchangeopt):
1260 if obsolete.isenabled(repo, obsolete.exchangeopt):
1261 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1261 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1262 caps['obsmarkers'] = supportedformat
1262 caps['obsmarkers'] = supportedformat
1263 if allowpushback:
1263 if allowpushback:
1264 caps['pushback'] = ()
1264 caps['pushback'] = ()
1265 return caps
1265 return caps
1266
1266
1267 def bundle2caps(remote):
1267 def bundle2caps(remote):
1268 """return the bundle capabilities of a peer as dict"""
1268 """return the bundle capabilities of a peer as dict"""
1269 raw = remote.capable('bundle2')
1269 raw = remote.capable('bundle2')
1270 if not raw and raw != '':
1270 if not raw and raw != '':
1271 return {}
1271 return {}
1272 capsblob = urllib.unquote(remote.capable('bundle2'))
1272 capsblob = urllib.unquote(remote.capable('bundle2'))
1273 return decodecaps(capsblob)
1273 return decodecaps(capsblob)
1274
1274
1275 def obsmarkersversion(caps):
1275 def obsmarkersversion(caps):
1276 """extract the list of supported obsmarkers versions from a bundle2caps dict
1276 """extract the list of supported obsmarkers versions from a bundle2caps dict
1277 """
1277 """
1278 obscaps = caps.get('obsmarkers', ())
1278 obscaps = caps.get('obsmarkers', ())
1279 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1279 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1280
1280
1281 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
1281 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
1282 """Write a bundle file and return its filename.
1282 """Write a bundle file and return its filename.
1283
1283
1284 Existing files will not be overwritten.
1284 Existing files will not be overwritten.
1285 If no filename is specified, a temporary file is created.
1285 If no filename is specified, a temporary file is created.
1286 bz2 compression can be turned off.
1286 bz2 compression can be turned off.
1287 The bundle file will be deleted in case of errors.
1287 The bundle file will be deleted in case of errors.
1288 """
1288 """
1289
1289
1290 if bundletype == "HG20":
1290 if bundletype == "HG20":
1291 bundle = bundle20(ui)
1291 bundle = bundle20(ui)
1292 bundle.setcompression(compression)
1292 bundle.setcompression(compression)
1293 part = bundle.newpart('changegroup', data=cg.getchunks())
1293 part = bundle.newpart('changegroup', data=cg.getchunks())
1294 part.addparam('version', cg.version)
1294 part.addparam('version', cg.version)
1295 chunkiter = bundle.getchunks()
1295 chunkiter = bundle.getchunks()
1296 else:
1296 else:
1297 # compression argument is only for the bundle2 case
1297 # compression argument is only for the bundle2 case
1298 assert compression is None
1298 assert compression is None
1299 if cg.version != '01':
1299 if cg.version != '01':
1300 raise error.Abort(_('old bundle types only supports v1 '
1300 raise error.Abort(_('old bundle types only supports v1 '
1301 'changegroups'))
1301 'changegroups'))
1302 header, comp = bundletypes[bundletype]
1302 header, comp = bundletypes[bundletype]
1303 if comp not in util.compressors:
1303 if comp not in util.compressors:
1304 raise error.Abort(_('unknown stream compression type: %s')
1304 raise error.Abort(_('unknown stream compression type: %s')
1305 % comp)
1305 % comp)
1306 z = util.compressors[comp]()
1306 z = util.compressors[comp]()
1307 subchunkiter = cg.getchunks()
1307 subchunkiter = cg.getchunks()
1308 def chunkiter():
1308 def chunkiter():
1309 yield header
1309 yield header
1310 for chunk in subchunkiter:
1310 for chunk in subchunkiter:
1311 yield z.compress(chunk)
1311 yield z.compress(chunk)
1312 yield z.flush()
1312 yield z.flush()
1313 chunkiter = chunkiter()
1313 chunkiter = chunkiter()
1314
1314
1315 # parse the changegroup data, otherwise we will block
1315 # parse the changegroup data, otherwise we will block
1316 # in case of sshrepo because we don't know the end of the stream
1316 # in case of sshrepo because we don't know the end of the stream
1317
1318 # an empty chunkgroup is the end of the changegroup
1319 # a changegroup has at least 2 chunkgroups (changelog and manifest).
1320 # after that, an empty chunkgroup is the end of the changegroup
1321 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1317 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1322
1318
1323 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1319 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1324 def handlechangegroup(op, inpart):
1320 def handlechangegroup(op, inpart):
1325 """apply a changegroup part on the repo
1321 """apply a changegroup part on the repo
1326
1322
1327 This is a very early implementation that will massive rework before being
1323 This is a very early implementation that will massive rework before being
1328 inflicted to any end-user.
1324 inflicted to any end-user.
1329 """
1325 """
1330 # Make sure we trigger a transaction creation
1326 # Make sure we trigger a transaction creation
1331 #
1327 #
1332 # The addchangegroup function will get a transaction object by itself, but
1328 # The addchangegroup function will get a transaction object by itself, but
1333 # we need to make sure we trigger the creation of a transaction object used
1329 # we need to make sure we trigger the creation of a transaction object used
1334 # for the whole processing scope.
1330 # for the whole processing scope.
1335 op.gettransaction()
1331 op.gettransaction()
1336 unpackerversion = inpart.params.get('version', '01')
1332 unpackerversion = inpart.params.get('version', '01')
1337 # We should raise an appropriate exception here
1333 # We should raise an appropriate exception here
1338 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1334 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1339 # the source and url passed here are overwritten by the one contained in
1335 # the source and url passed here are overwritten by the one contained in
1340 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1336 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1341 nbchangesets = None
1337 nbchangesets = None
1342 if 'nbchanges' in inpart.params:
1338 if 'nbchanges' in inpart.params:
1343 nbchangesets = int(inpart.params.get('nbchanges'))
1339 nbchangesets = int(inpart.params.get('nbchanges'))
1344 if ('treemanifest' in inpart.params and
1340 if ('treemanifest' in inpart.params and
1345 'treemanifest' not in op.repo.requirements):
1341 'treemanifest' not in op.repo.requirements):
1346 if len(op.repo.changelog) != 0:
1342 if len(op.repo.changelog) != 0:
1347 raise error.Abort(_(
1343 raise error.Abort(_(
1348 "bundle contains tree manifests, but local repo is "
1344 "bundle contains tree manifests, but local repo is "
1349 "non-empty and does not use tree manifests"))
1345 "non-empty and does not use tree manifests"))
1350 op.repo.requirements.add('treemanifest')
1346 op.repo.requirements.add('treemanifest')
1351 op.repo._applyopenerreqs()
1347 op.repo._applyopenerreqs()
1352 op.repo._writerequirements()
1348 op.repo._writerequirements()
1353 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1349 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1354 op.records.add('changegroup', {'return': ret})
1350 op.records.add('changegroup', {'return': ret})
1355 if op.reply is not None:
1351 if op.reply is not None:
1356 # This is definitely not the final form of this
1352 # This is definitely not the final form of this
1357 # return. But one need to start somewhere.
1353 # return. But one need to start somewhere.
1358 part = op.reply.newpart('reply:changegroup', mandatory=False)
1354 part = op.reply.newpart('reply:changegroup', mandatory=False)
1359 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1355 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1360 part.addparam('return', '%i' % ret, mandatory=False)
1356 part.addparam('return', '%i' % ret, mandatory=False)
1361 assert not inpart.read()
1357 assert not inpart.read()
1362
1358
1363 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1359 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1364 ['digest:%s' % k for k in util.DIGESTS.keys()])
1360 ['digest:%s' % k for k in util.DIGESTS.keys()])
1365 @parthandler('remote-changegroup', _remotechangegroupparams)
1361 @parthandler('remote-changegroup', _remotechangegroupparams)
1366 def handleremotechangegroup(op, inpart):
1362 def handleremotechangegroup(op, inpart):
1367 """apply a bundle10 on the repo, given an url and validation information
1363 """apply a bundle10 on the repo, given an url and validation information
1368
1364
1369 All the information about the remote bundle to import are given as
1365 All the information about the remote bundle to import are given as
1370 parameters. The parameters include:
1366 parameters. The parameters include:
1371 - url: the url to the bundle10.
1367 - url: the url to the bundle10.
1372 - size: the bundle10 file size. It is used to validate what was
1368 - size: the bundle10 file size. It is used to validate what was
1373 retrieved by the client matches the server knowledge about the bundle.
1369 retrieved by the client matches the server knowledge about the bundle.
1374 - digests: a space separated list of the digest types provided as
1370 - digests: a space separated list of the digest types provided as
1375 parameters.
1371 parameters.
1376 - digest:<digest-type>: the hexadecimal representation of the digest with
1372 - digest:<digest-type>: the hexadecimal representation of the digest with
1377 that name. Like the size, it is used to validate what was retrieved by
1373 that name. Like the size, it is used to validate what was retrieved by
1378 the client matches what the server knows about the bundle.
1374 the client matches what the server knows about the bundle.
1379
1375
1380 When multiple digest types are given, all of them are checked.
1376 When multiple digest types are given, all of them are checked.
1381 """
1377 """
1382 try:
1378 try:
1383 raw_url = inpart.params['url']
1379 raw_url = inpart.params['url']
1384 except KeyError:
1380 except KeyError:
1385 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1381 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1386 parsed_url = util.url(raw_url)
1382 parsed_url = util.url(raw_url)
1387 if parsed_url.scheme not in capabilities['remote-changegroup']:
1383 if parsed_url.scheme not in capabilities['remote-changegroup']:
1388 raise error.Abort(_('remote-changegroup does not support %s urls') %
1384 raise error.Abort(_('remote-changegroup does not support %s urls') %
1389 parsed_url.scheme)
1385 parsed_url.scheme)
1390
1386
1391 try:
1387 try:
1392 size = int(inpart.params['size'])
1388 size = int(inpart.params['size'])
1393 except ValueError:
1389 except ValueError:
1394 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1390 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1395 % 'size')
1391 % 'size')
1396 except KeyError:
1392 except KeyError:
1397 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1393 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1398
1394
1399 digests = {}
1395 digests = {}
1400 for typ in inpart.params.get('digests', '').split():
1396 for typ in inpart.params.get('digests', '').split():
1401 param = 'digest:%s' % typ
1397 param = 'digest:%s' % typ
1402 try:
1398 try:
1403 value = inpart.params[param]
1399 value = inpart.params[param]
1404 except KeyError:
1400 except KeyError:
1405 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1401 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1406 param)
1402 param)
1407 digests[typ] = value
1403 digests[typ] = value
1408
1404
1409 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1405 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1410
1406
1411 # Make sure we trigger a transaction creation
1407 # Make sure we trigger a transaction creation
1412 #
1408 #
1413 # The addchangegroup function will get a transaction object by itself, but
1409 # The addchangegroup function will get a transaction object by itself, but
1414 # we need to make sure we trigger the creation of a transaction object used
1410 # we need to make sure we trigger the creation of a transaction object used
1415 # for the whole processing scope.
1411 # for the whole processing scope.
1416 op.gettransaction()
1412 op.gettransaction()
1417 from . import exchange
1413 from . import exchange
1418 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1414 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1419 if not isinstance(cg, changegroup.cg1unpacker):
1415 if not isinstance(cg, changegroup.cg1unpacker):
1420 raise error.Abort(_('%s: not a bundle version 1.0') %
1416 raise error.Abort(_('%s: not a bundle version 1.0') %
1421 util.hidepassword(raw_url))
1417 util.hidepassword(raw_url))
1422 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1418 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1423 op.records.add('changegroup', {'return': ret})
1419 op.records.add('changegroup', {'return': ret})
1424 if op.reply is not None:
1420 if op.reply is not None:
1425 # This is definitely not the final form of this
1421 # This is definitely not the final form of this
1426 # return. But one need to start somewhere.
1422 # return. But one need to start somewhere.
1427 part = op.reply.newpart('reply:changegroup')
1423 part = op.reply.newpart('reply:changegroup')
1428 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1424 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1429 part.addparam('return', '%i' % ret, mandatory=False)
1425 part.addparam('return', '%i' % ret, mandatory=False)
1430 try:
1426 try:
1431 real_part.validate()
1427 real_part.validate()
1432 except error.Abort as e:
1428 except error.Abort as e:
1433 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1429 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1434 (util.hidepassword(raw_url), str(e)))
1430 (util.hidepassword(raw_url), str(e)))
1435 assert not inpart.read()
1431 assert not inpart.read()
1436
1432
1437 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1433 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1438 def handlereplychangegroup(op, inpart):
1434 def handlereplychangegroup(op, inpart):
1439 ret = int(inpart.params['return'])
1435 ret = int(inpart.params['return'])
1440 replyto = int(inpart.params['in-reply-to'])
1436 replyto = int(inpart.params['in-reply-to'])
1441 op.records.add('changegroup', {'return': ret}, replyto)
1437 op.records.add('changegroup', {'return': ret}, replyto)
1442
1438
1443 @parthandler('check:heads')
1439 @parthandler('check:heads')
1444 def handlecheckheads(op, inpart):
1440 def handlecheckheads(op, inpart):
1445 """check that head of the repo did not change
1441 """check that head of the repo did not change
1446
1442
1447 This is used to detect a push race when using unbundle.
1443 This is used to detect a push race when using unbundle.
1448 This replaces the "heads" argument of unbundle."""
1444 This replaces the "heads" argument of unbundle."""
1449 h = inpart.read(20)
1445 h = inpart.read(20)
1450 heads = []
1446 heads = []
1451 while len(h) == 20:
1447 while len(h) == 20:
1452 heads.append(h)
1448 heads.append(h)
1453 h = inpart.read(20)
1449 h = inpart.read(20)
1454 assert not h
1450 assert not h
1455 # Trigger a transaction so that we are guaranteed to have the lock now.
1451 # Trigger a transaction so that we are guaranteed to have the lock now.
1456 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1452 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1457 op.gettransaction()
1453 op.gettransaction()
1458 if heads != op.repo.heads():
1454 if heads != op.repo.heads():
1459 raise error.PushRaced('repository changed while pushing - '
1455 raise error.PushRaced('repository changed while pushing - '
1460 'please try again')
1456 'please try again')
1461
1457
1462 @parthandler('output')
1458 @parthandler('output')
1463 def handleoutput(op, inpart):
1459 def handleoutput(op, inpart):
1464 """forward output captured on the server to the client"""
1460 """forward output captured on the server to the client"""
1465 for line in inpart.read().splitlines():
1461 for line in inpart.read().splitlines():
1466 op.ui.status(('remote: %s\n' % line))
1462 op.ui.status(('remote: %s\n' % line))
1467
1463
1468 @parthandler('replycaps')
1464 @parthandler('replycaps')
1469 def handlereplycaps(op, inpart):
1465 def handlereplycaps(op, inpart):
1470 """Notify that a reply bundle should be created
1466 """Notify that a reply bundle should be created
1471
1467
1472 The payload contains the capabilities information for the reply"""
1468 The payload contains the capabilities information for the reply"""
1473 caps = decodecaps(inpart.read())
1469 caps = decodecaps(inpart.read())
1474 if op.reply is None:
1470 if op.reply is None:
1475 op.reply = bundle20(op.ui, caps)
1471 op.reply = bundle20(op.ui, caps)
1476
1472
1477 class AbortFromPart(error.Abort):
1473 class AbortFromPart(error.Abort):
1478 """Sub-class of Abort that denotes an error from a bundle2 part."""
1474 """Sub-class of Abort that denotes an error from a bundle2 part."""
1479
1475
1480 @parthandler('error:abort', ('message', 'hint'))
1476 @parthandler('error:abort', ('message', 'hint'))
1481 def handleerrorabort(op, inpart):
1477 def handleerrorabort(op, inpart):
1482 """Used to transmit abort error over the wire"""
1478 """Used to transmit abort error over the wire"""
1483 raise AbortFromPart(inpart.params['message'],
1479 raise AbortFromPart(inpart.params['message'],
1484 hint=inpart.params.get('hint'))
1480 hint=inpart.params.get('hint'))
1485
1481
1486 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1482 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1487 'in-reply-to'))
1483 'in-reply-to'))
1488 def handleerrorpushkey(op, inpart):
1484 def handleerrorpushkey(op, inpart):
1489 """Used to transmit failure of a mandatory pushkey over the wire"""
1485 """Used to transmit failure of a mandatory pushkey over the wire"""
1490 kwargs = {}
1486 kwargs = {}
1491 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1487 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1492 value = inpart.params.get(name)
1488 value = inpart.params.get(name)
1493 if value is not None:
1489 if value is not None:
1494 kwargs[name] = value
1490 kwargs[name] = value
1495 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1491 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1496
1492
1497 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1493 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1498 def handleerrorunsupportedcontent(op, inpart):
1494 def handleerrorunsupportedcontent(op, inpart):
1499 """Used to transmit unknown content error over the wire"""
1495 """Used to transmit unknown content error over the wire"""
1500 kwargs = {}
1496 kwargs = {}
1501 parttype = inpart.params.get('parttype')
1497 parttype = inpart.params.get('parttype')
1502 if parttype is not None:
1498 if parttype is not None:
1503 kwargs['parttype'] = parttype
1499 kwargs['parttype'] = parttype
1504 params = inpart.params.get('params')
1500 params = inpart.params.get('params')
1505 if params is not None:
1501 if params is not None:
1506 kwargs['params'] = params.split('\0')
1502 kwargs['params'] = params.split('\0')
1507
1503
1508 raise error.BundleUnknownFeatureError(**kwargs)
1504 raise error.BundleUnknownFeatureError(**kwargs)
1509
1505
1510 @parthandler('error:pushraced', ('message',))
1506 @parthandler('error:pushraced', ('message',))
1511 def handleerrorpushraced(op, inpart):
1507 def handleerrorpushraced(op, inpart):
1512 """Used to transmit push race error over the wire"""
1508 """Used to transmit push race error over the wire"""
1513 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1509 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1514
1510
1515 @parthandler('listkeys', ('namespace',))
1511 @parthandler('listkeys', ('namespace',))
1516 def handlelistkeys(op, inpart):
1512 def handlelistkeys(op, inpart):
1517 """retrieve pushkey namespace content stored in a bundle2"""
1513 """retrieve pushkey namespace content stored in a bundle2"""
1518 namespace = inpart.params['namespace']
1514 namespace = inpart.params['namespace']
1519 r = pushkey.decodekeys(inpart.read())
1515 r = pushkey.decodekeys(inpart.read())
1520 op.records.add('listkeys', (namespace, r))
1516 op.records.add('listkeys', (namespace, r))
1521
1517
1522 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1518 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1523 def handlepushkey(op, inpart):
1519 def handlepushkey(op, inpart):
1524 """process a pushkey request"""
1520 """process a pushkey request"""
1525 dec = pushkey.decode
1521 dec = pushkey.decode
1526 namespace = dec(inpart.params['namespace'])
1522 namespace = dec(inpart.params['namespace'])
1527 key = dec(inpart.params['key'])
1523 key = dec(inpart.params['key'])
1528 old = dec(inpart.params['old'])
1524 old = dec(inpart.params['old'])
1529 new = dec(inpart.params['new'])
1525 new = dec(inpart.params['new'])
1530 # Grab the transaction to ensure that we have the lock before performing the
1526 # Grab the transaction to ensure that we have the lock before performing the
1531 # pushkey.
1527 # pushkey.
1532 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1528 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1533 op.gettransaction()
1529 op.gettransaction()
1534 ret = op.repo.pushkey(namespace, key, old, new)
1530 ret = op.repo.pushkey(namespace, key, old, new)
1535 record = {'namespace': namespace,
1531 record = {'namespace': namespace,
1536 'key': key,
1532 'key': key,
1537 'old': old,
1533 'old': old,
1538 'new': new}
1534 'new': new}
1539 op.records.add('pushkey', record)
1535 op.records.add('pushkey', record)
1540 if op.reply is not None:
1536 if op.reply is not None:
1541 rpart = op.reply.newpart('reply:pushkey')
1537 rpart = op.reply.newpart('reply:pushkey')
1542 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1538 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1543 rpart.addparam('return', '%i' % ret, mandatory=False)
1539 rpart.addparam('return', '%i' % ret, mandatory=False)
1544 if inpart.mandatory and not ret:
1540 if inpart.mandatory and not ret:
1545 kwargs = {}
1541 kwargs = {}
1546 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1542 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1547 if key in inpart.params:
1543 if key in inpart.params:
1548 kwargs[key] = inpart.params[key]
1544 kwargs[key] = inpart.params[key]
1549 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1545 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1550
1546
1551 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1547 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1552 def handlepushkeyreply(op, inpart):
1548 def handlepushkeyreply(op, inpart):
1553 """retrieve the result of a pushkey request"""
1549 """retrieve the result of a pushkey request"""
1554 ret = int(inpart.params['return'])
1550 ret = int(inpart.params['return'])
1555 partid = int(inpart.params['in-reply-to'])
1551 partid = int(inpart.params['in-reply-to'])
1556 op.records.add('pushkey', {'return': ret}, partid)
1552 op.records.add('pushkey', {'return': ret}, partid)
1557
1553
1558 @parthandler('obsmarkers')
1554 @parthandler('obsmarkers')
1559 def handleobsmarker(op, inpart):
1555 def handleobsmarker(op, inpart):
1560 """add a stream of obsmarkers to the repo"""
1556 """add a stream of obsmarkers to the repo"""
1561 tr = op.gettransaction()
1557 tr = op.gettransaction()
1562 markerdata = inpart.read()
1558 markerdata = inpart.read()
1563 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1559 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1564 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1560 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1565 % len(markerdata))
1561 % len(markerdata))
1566 # The mergemarkers call will crash if marker creation is not enabled.
1562 # The mergemarkers call will crash if marker creation is not enabled.
1567 # we want to avoid this if the part is advisory.
1563 # we want to avoid this if the part is advisory.
1568 if not inpart.mandatory and op.repo.obsstore.readonly:
1564 if not inpart.mandatory and op.repo.obsstore.readonly:
1569 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1565 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1570 return
1566 return
1571 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1567 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1572 if new:
1568 if new:
1573 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1569 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1574 op.records.add('obsmarkers', {'new': new})
1570 op.records.add('obsmarkers', {'new': new})
1575 if op.reply is not None:
1571 if op.reply is not None:
1576 rpart = op.reply.newpart('reply:obsmarkers')
1572 rpart = op.reply.newpart('reply:obsmarkers')
1577 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1573 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1578 rpart.addparam('new', '%i' % new, mandatory=False)
1574 rpart.addparam('new', '%i' % new, mandatory=False)
1579
1575
1580
1576
1581 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1577 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1582 def handleobsmarkerreply(op, inpart):
1578 def handleobsmarkerreply(op, inpart):
1583 """retrieve the result of a pushkey request"""
1579 """retrieve the result of a pushkey request"""
1584 ret = int(inpart.params['new'])
1580 ret = int(inpart.params['new'])
1585 partid = int(inpart.params['in-reply-to'])
1581 partid = int(inpart.params['in-reply-to'])
1586 op.records.add('obsmarkers', {'new': ret}, partid)
1582 op.records.add('obsmarkers', {'new': ret}, partid)
1587
1583
1588 @parthandler('hgtagsfnodes')
1584 @parthandler('hgtagsfnodes')
1589 def handlehgtagsfnodes(op, inpart):
1585 def handlehgtagsfnodes(op, inpart):
1590 """Applies .hgtags fnodes cache entries to the local repo.
1586 """Applies .hgtags fnodes cache entries to the local repo.
1591
1587
1592 Payload is pairs of 20 byte changeset nodes and filenodes.
1588 Payload is pairs of 20 byte changeset nodes and filenodes.
1593 """
1589 """
1594 # Grab the transaction so we ensure that we have the lock at this point.
1590 # Grab the transaction so we ensure that we have the lock at this point.
1595 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1591 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1596 op.gettransaction()
1592 op.gettransaction()
1597 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1593 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1598
1594
1599 count = 0
1595 count = 0
1600 while True:
1596 while True:
1601 node = inpart.read(20)
1597 node = inpart.read(20)
1602 fnode = inpart.read(20)
1598 fnode = inpart.read(20)
1603 if len(node) < 20 or len(fnode) < 20:
1599 if len(node) < 20 or len(fnode) < 20:
1604 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1600 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1605 break
1601 break
1606 cache.setfnode(node, fnode)
1602 cache.setfnode(node, fnode)
1607 count += 1
1603 count += 1
1608
1604
1609 cache.write()
1605 cache.write()
1610 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1606 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now