##// END OF EJS Templates
bundle2: clarify in docstring that header size is for a single header...
Martin von Zweigbergk -
r25507:5bee4837 default
parent child Browse files
Show More
@@ -1,1406 +1,1406 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 import errno
148 import errno
149 import sys
149 import sys
150 import util
150 import util
151 import struct
151 import struct
152 import urllib
152 import urllib
153 import string
153 import string
154 import obsolete
154 import obsolete
155 import pushkey
155 import pushkey
156 import url
156 import url
157 import re
157 import re
158
158
159 import changegroup, error, tags
159 import changegroup, error, tags
160 from i18n import _
160 from i18n import _
161
161
162 _pack = struct.pack
162 _pack = struct.pack
163 _unpack = struct.unpack
163 _unpack = struct.unpack
164
164
165 _fstreamparamsize = '>i'
165 _fstreamparamsize = '>i'
166 _fpartheadersize = '>i'
166 _fpartheadersize = '>i'
167 _fparttypesize = '>B'
167 _fparttypesize = '>B'
168 _fpartid = '>I'
168 _fpartid = '>I'
169 _fpayloadsize = '>i'
169 _fpayloadsize = '>i'
170 _fpartparamcount = '>BB'
170 _fpartparamcount = '>BB'
171
171
172 preferedchunksize = 4096
172 preferedchunksize = 4096
173
173
174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
175
175
176 def outdebug(ui, message):
176 def outdebug(ui, message):
177 """debug regarding output stream (bundling)"""
177 """debug regarding output stream (bundling)"""
178 if ui.configbool('devel', 'bundle2.debug', False):
178 if ui.configbool('devel', 'bundle2.debug', False):
179 ui.debug('bundle2-output: %s\n' % message)
179 ui.debug('bundle2-output: %s\n' % message)
180
180
181 def indebug(ui, message):
181 def indebug(ui, message):
182 """debug on input stream (unbundling)"""
182 """debug on input stream (unbundling)"""
183 if ui.configbool('devel', 'bundle2.debug', False):
183 if ui.configbool('devel', 'bundle2.debug', False):
184 ui.debug('bundle2-input: %s\n' % message)
184 ui.debug('bundle2-input: %s\n' % message)
185
185
186 def validateparttype(parttype):
186 def validateparttype(parttype):
187 """raise ValueError if a parttype contains invalid character"""
187 """raise ValueError if a parttype contains invalid character"""
188 if _parttypeforbidden.search(parttype):
188 if _parttypeforbidden.search(parttype):
189 raise ValueError(parttype)
189 raise ValueError(parttype)
190
190
191 def _makefpartparamsizes(nbparams):
191 def _makefpartparamsizes(nbparams):
192 """return a struct format to read part parameter sizes
192 """return a struct format to read part parameter sizes
193
193
194 The number parameters is variable so we need to build that format
194 The number parameters is variable so we need to build that format
195 dynamically.
195 dynamically.
196 """
196 """
197 return '>'+('BB'*nbparams)
197 return '>'+('BB'*nbparams)
198
198
199 parthandlermapping = {}
199 parthandlermapping = {}
200
200
201 def parthandler(parttype, params=()):
201 def parthandler(parttype, params=()):
202 """decorator that register a function as a bundle2 part handler
202 """decorator that register a function as a bundle2 part handler
203
203
204 eg::
204 eg::
205
205
206 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
206 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
207 def myparttypehandler(...):
207 def myparttypehandler(...):
208 '''process a part of type "my part".'''
208 '''process a part of type "my part".'''
209 ...
209 ...
210 """
210 """
211 validateparttype(parttype)
211 validateparttype(parttype)
212 def _decorator(func):
212 def _decorator(func):
213 lparttype = parttype.lower() # enforce lower case matching.
213 lparttype = parttype.lower() # enforce lower case matching.
214 assert lparttype not in parthandlermapping
214 assert lparttype not in parthandlermapping
215 parthandlermapping[lparttype] = func
215 parthandlermapping[lparttype] = func
216 func.params = frozenset(params)
216 func.params = frozenset(params)
217 return func
217 return func
218 return _decorator
218 return _decorator
219
219
220 class unbundlerecords(object):
220 class unbundlerecords(object):
221 """keep record of what happens during and unbundle
221 """keep record of what happens during and unbundle
222
222
223 New records are added using `records.add('cat', obj)`. Where 'cat' is a
223 New records are added using `records.add('cat', obj)`. Where 'cat' is a
224 category of record and obj is an arbitrary object.
224 category of record and obj is an arbitrary object.
225
225
226 `records['cat']` will return all entries of this category 'cat'.
226 `records['cat']` will return all entries of this category 'cat'.
227
227
228 Iterating on the object itself will yield `('category', obj)` tuples
228 Iterating on the object itself will yield `('category', obj)` tuples
229 for all entries.
229 for all entries.
230
230
231 All iterations happens in chronological order.
231 All iterations happens in chronological order.
232 """
232 """
233
233
234 def __init__(self):
234 def __init__(self):
235 self._categories = {}
235 self._categories = {}
236 self._sequences = []
236 self._sequences = []
237 self._replies = {}
237 self._replies = {}
238
238
239 def add(self, category, entry, inreplyto=None):
239 def add(self, category, entry, inreplyto=None):
240 """add a new record of a given category.
240 """add a new record of a given category.
241
241
242 The entry can then be retrieved in the list returned by
242 The entry can then be retrieved in the list returned by
243 self['category']."""
243 self['category']."""
244 self._categories.setdefault(category, []).append(entry)
244 self._categories.setdefault(category, []).append(entry)
245 self._sequences.append((category, entry))
245 self._sequences.append((category, entry))
246 if inreplyto is not None:
246 if inreplyto is not None:
247 self.getreplies(inreplyto).add(category, entry)
247 self.getreplies(inreplyto).add(category, entry)
248
248
249 def getreplies(self, partid):
249 def getreplies(self, partid):
250 """get the records that are replies to a specific part"""
250 """get the records that are replies to a specific part"""
251 return self._replies.setdefault(partid, unbundlerecords())
251 return self._replies.setdefault(partid, unbundlerecords())
252
252
253 def __getitem__(self, cat):
253 def __getitem__(self, cat):
254 return tuple(self._categories.get(cat, ()))
254 return tuple(self._categories.get(cat, ()))
255
255
256 def __iter__(self):
256 def __iter__(self):
257 return iter(self._sequences)
257 return iter(self._sequences)
258
258
259 def __len__(self):
259 def __len__(self):
260 return len(self._sequences)
260 return len(self._sequences)
261
261
262 def __nonzero__(self):
262 def __nonzero__(self):
263 return bool(self._sequences)
263 return bool(self._sequences)
264
264
265 class bundleoperation(object):
265 class bundleoperation(object):
266 """an object that represents a single bundling process
266 """an object that represents a single bundling process
267
267
268 Its purpose is to carry unbundle-related objects and states.
268 Its purpose is to carry unbundle-related objects and states.
269
269
270 A new object should be created at the beginning of each bundle processing.
270 A new object should be created at the beginning of each bundle processing.
271 The object is to be returned by the processing function.
271 The object is to be returned by the processing function.
272
272
273 The object has very little content now it will ultimately contain:
273 The object has very little content now it will ultimately contain:
274 * an access to the repo the bundle is applied to,
274 * an access to the repo the bundle is applied to,
275 * a ui object,
275 * a ui object,
276 * a way to retrieve a transaction to add changes to the repo,
276 * a way to retrieve a transaction to add changes to the repo,
277 * a way to record the result of processing each part,
277 * a way to record the result of processing each part,
278 * a way to construct a bundle response when applicable.
278 * a way to construct a bundle response when applicable.
279 """
279 """
280
280
281 def __init__(self, repo, transactiongetter, captureoutput=True):
281 def __init__(self, repo, transactiongetter, captureoutput=True):
282 self.repo = repo
282 self.repo = repo
283 self.ui = repo.ui
283 self.ui = repo.ui
284 self.records = unbundlerecords()
284 self.records = unbundlerecords()
285 self.gettransaction = transactiongetter
285 self.gettransaction = transactiongetter
286 self.reply = None
286 self.reply = None
287 self.captureoutput = captureoutput
287 self.captureoutput = captureoutput
288
288
289 class TransactionUnavailable(RuntimeError):
289 class TransactionUnavailable(RuntimeError):
290 pass
290 pass
291
291
292 def _notransaction():
292 def _notransaction():
293 """default method to get a transaction while processing a bundle
293 """default method to get a transaction while processing a bundle
294
294
295 Raise an exception to highlight the fact that no transaction was expected
295 Raise an exception to highlight the fact that no transaction was expected
296 to be created"""
296 to be created"""
297 raise TransactionUnavailable()
297 raise TransactionUnavailable()
298
298
299 def processbundle(repo, unbundler, transactiongetter=None, op=None):
299 def processbundle(repo, unbundler, transactiongetter=None, op=None):
300 """This function process a bundle, apply effect to/from a repo
300 """This function process a bundle, apply effect to/from a repo
301
301
302 It iterates over each part then searches for and uses the proper handling
302 It iterates over each part then searches for and uses the proper handling
303 code to process the part. Parts are processed in order.
303 code to process the part. Parts are processed in order.
304
304
305 This is very early version of this function that will be strongly reworked
305 This is very early version of this function that will be strongly reworked
306 before final usage.
306 before final usage.
307
307
308 Unknown Mandatory part will abort the process.
308 Unknown Mandatory part will abort the process.
309
309
310 It is temporarily possible to provide a prebuilt bundleoperation to the
310 It is temporarily possible to provide a prebuilt bundleoperation to the
311 function. This is used to ensure output is properly propagated in case of
311 function. This is used to ensure output is properly propagated in case of
312 an error during the unbundling. This output capturing part will likely be
312 an error during the unbundling. This output capturing part will likely be
313 reworked and this ability will probably go away in the process.
313 reworked and this ability will probably go away in the process.
314 """
314 """
315 if op is None:
315 if op is None:
316 if transactiongetter is None:
316 if transactiongetter is None:
317 transactiongetter = _notransaction
317 transactiongetter = _notransaction
318 op = bundleoperation(repo, transactiongetter)
318 op = bundleoperation(repo, transactiongetter)
319 # todo:
319 # todo:
320 # - replace this is a init function soon.
320 # - replace this is a init function soon.
321 # - exception catching
321 # - exception catching
322 unbundler.params
322 unbundler.params
323 if repo.ui.debugflag:
323 if repo.ui.debugflag:
324 msg = ['bundle2-input-bundle:']
324 msg = ['bundle2-input-bundle:']
325 if unbundler.params:
325 if unbundler.params:
326 msg.append(' %i params')
326 msg.append(' %i params')
327 if op.gettransaction is None:
327 if op.gettransaction is None:
328 msg.append(' no-transaction')
328 msg.append(' no-transaction')
329 else:
329 else:
330 msg.append(' with-transaction')
330 msg.append(' with-transaction')
331 msg.append('\n')
331 msg.append('\n')
332 repo.ui.debug(''.join(msg))
332 repo.ui.debug(''.join(msg))
333 iterparts = enumerate(unbundler.iterparts())
333 iterparts = enumerate(unbundler.iterparts())
334 part = None
334 part = None
335 nbpart = 0
335 nbpart = 0
336 try:
336 try:
337 for nbpart, part in iterparts:
337 for nbpart, part in iterparts:
338 _processpart(op, part)
338 _processpart(op, part)
339 except BaseException, exc:
339 except BaseException, exc:
340 for nbpart, part in iterparts:
340 for nbpart, part in iterparts:
341 # consume the bundle content
341 # consume the bundle content
342 part.seek(0, 2)
342 part.seek(0, 2)
343 # Small hack to let caller code distinguish exceptions from bundle2
343 # Small hack to let caller code distinguish exceptions from bundle2
344 # processing from processing the old format. This is mostly
344 # processing from processing the old format. This is mostly
345 # needed to handle different return codes to unbundle according to the
345 # needed to handle different return codes to unbundle according to the
346 # type of bundle. We should probably clean up or drop this return code
346 # type of bundle. We should probably clean up or drop this return code
347 # craziness in a future version.
347 # craziness in a future version.
348 exc.duringunbundle2 = True
348 exc.duringunbundle2 = True
349 salvaged = []
349 salvaged = []
350 replycaps = None
350 replycaps = None
351 if op.reply is not None:
351 if op.reply is not None:
352 salvaged = op.reply.salvageoutput()
352 salvaged = op.reply.salvageoutput()
353 replycaps = op.reply.capabilities
353 replycaps = op.reply.capabilities
354 exc._replycaps = replycaps
354 exc._replycaps = replycaps
355 exc._bundle2salvagedoutput = salvaged
355 exc._bundle2salvagedoutput = salvaged
356 raise
356 raise
357 finally:
357 finally:
358 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
358 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
359
359
360 return op
360 return op
361
361
362 def _processpart(op, part):
362 def _processpart(op, part):
363 """process a single part from a bundle
363 """process a single part from a bundle
364
364
365 The part is guaranteed to have been fully consumed when the function exits
365 The part is guaranteed to have been fully consumed when the function exits
366 (even if an exception is raised)."""
366 (even if an exception is raised)."""
367 status = 'unknown' # used by debug output
367 status = 'unknown' # used by debug output
368 try:
368 try:
369 try:
369 try:
370 handler = parthandlermapping.get(part.type)
370 handler = parthandlermapping.get(part.type)
371 if handler is None:
371 if handler is None:
372 status = 'unsupported-type'
372 status = 'unsupported-type'
373 raise error.UnsupportedPartError(parttype=part.type)
373 raise error.UnsupportedPartError(parttype=part.type)
374 indebug(op.ui, 'found a handler for part %r' % part.type)
374 indebug(op.ui, 'found a handler for part %r' % part.type)
375 unknownparams = part.mandatorykeys - handler.params
375 unknownparams = part.mandatorykeys - handler.params
376 if unknownparams:
376 if unknownparams:
377 unknownparams = list(unknownparams)
377 unknownparams = list(unknownparams)
378 unknownparams.sort()
378 unknownparams.sort()
379 status = 'unsupported-params (%s)' % unknownparams
379 status = 'unsupported-params (%s)' % unknownparams
380 raise error.UnsupportedPartError(parttype=part.type,
380 raise error.UnsupportedPartError(parttype=part.type,
381 params=unknownparams)
381 params=unknownparams)
382 status = 'supported'
382 status = 'supported'
383 except error.UnsupportedPartError, exc:
383 except error.UnsupportedPartError, exc:
384 if part.mandatory: # mandatory parts
384 if part.mandatory: # mandatory parts
385 raise
385 raise
386 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
386 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
387 return # skip to part processing
387 return # skip to part processing
388 finally:
388 finally:
389 if op.ui.debugflag:
389 if op.ui.debugflag:
390 msg = ['bundle2-input-part: "%s"' % part.type]
390 msg = ['bundle2-input-part: "%s"' % part.type]
391 if not part.mandatory:
391 if not part.mandatory:
392 msg.append(' (advisory)')
392 msg.append(' (advisory)')
393 nbmp = len(part.mandatorykeys)
393 nbmp = len(part.mandatorykeys)
394 nbap = len(part.params) - nbmp
394 nbap = len(part.params) - nbmp
395 if nbmp or nbap:
395 if nbmp or nbap:
396 msg.append(' (params:')
396 msg.append(' (params:')
397 if nbmp:
397 if nbmp:
398 msg.append(' %i mandatory' % nbmp)
398 msg.append(' %i mandatory' % nbmp)
399 if nbap:
399 if nbap:
400 msg.append(' %i advisory' % nbmp)
400 msg.append(' %i advisory' % nbmp)
401 msg.append(')')
401 msg.append(')')
402 msg.append(' %s\n' % status)
402 msg.append(' %s\n' % status)
403 op.ui.debug(''.join(msg))
403 op.ui.debug(''.join(msg))
404
404
405 # handler is called outside the above try block so that we don't
405 # handler is called outside the above try block so that we don't
406 # risk catching KeyErrors from anything other than the
406 # risk catching KeyErrors from anything other than the
407 # parthandlermapping lookup (any KeyError raised by handler()
407 # parthandlermapping lookup (any KeyError raised by handler()
408 # itself represents a defect of a different variety).
408 # itself represents a defect of a different variety).
409 output = None
409 output = None
410 if op.captureoutput and op.reply is not None:
410 if op.captureoutput and op.reply is not None:
411 op.ui.pushbuffer(error=True, subproc=True)
411 op.ui.pushbuffer(error=True, subproc=True)
412 output = ''
412 output = ''
413 try:
413 try:
414 handler(op, part)
414 handler(op, part)
415 finally:
415 finally:
416 if output is not None:
416 if output is not None:
417 output = op.ui.popbuffer()
417 output = op.ui.popbuffer()
418 if output:
418 if output:
419 outpart = op.reply.newpart('output', data=output,
419 outpart = op.reply.newpart('output', data=output,
420 mandatory=False)
420 mandatory=False)
421 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
421 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
422 finally:
422 finally:
423 # consume the part content to not corrupt the stream.
423 # consume the part content to not corrupt the stream.
424 part.seek(0, 2)
424 part.seek(0, 2)
425
425
426
426
427 def decodecaps(blob):
427 def decodecaps(blob):
428 """decode a bundle2 caps bytes blob into a dictionary
428 """decode a bundle2 caps bytes blob into a dictionary
429
429
430 The blob is a list of capabilities (one per line)
430 The blob is a list of capabilities (one per line)
431 Capabilities may have values using a line of the form::
431 Capabilities may have values using a line of the form::
432
432
433 capability=value1,value2,value3
433 capability=value1,value2,value3
434
434
435 The values are always a list."""
435 The values are always a list."""
436 caps = {}
436 caps = {}
437 for line in blob.splitlines():
437 for line in blob.splitlines():
438 if not line:
438 if not line:
439 continue
439 continue
440 if '=' not in line:
440 if '=' not in line:
441 key, vals = line, ()
441 key, vals = line, ()
442 else:
442 else:
443 key, vals = line.split('=', 1)
443 key, vals = line.split('=', 1)
444 vals = vals.split(',')
444 vals = vals.split(',')
445 key = urllib.unquote(key)
445 key = urllib.unquote(key)
446 vals = [urllib.unquote(v) for v in vals]
446 vals = [urllib.unquote(v) for v in vals]
447 caps[key] = vals
447 caps[key] = vals
448 return caps
448 return caps
449
449
450 def encodecaps(caps):
450 def encodecaps(caps):
451 """encode a bundle2 caps dictionary into a bytes blob"""
451 """encode a bundle2 caps dictionary into a bytes blob"""
452 chunks = []
452 chunks = []
453 for ca in sorted(caps):
453 for ca in sorted(caps):
454 vals = caps[ca]
454 vals = caps[ca]
455 ca = urllib.quote(ca)
455 ca = urllib.quote(ca)
456 vals = [urllib.quote(v) for v in vals]
456 vals = [urllib.quote(v) for v in vals]
457 if vals:
457 if vals:
458 ca = "%s=%s" % (ca, ','.join(vals))
458 ca = "%s=%s" % (ca, ','.join(vals))
459 chunks.append(ca)
459 chunks.append(ca)
460 return '\n'.join(chunks)
460 return '\n'.join(chunks)
461
461
462 class bundle20(object):
462 class bundle20(object):
463 """represent an outgoing bundle2 container
463 """represent an outgoing bundle2 container
464
464
465 Use the `addparam` method to add stream level parameter. and `newpart` to
465 Use the `addparam` method to add stream level parameter. and `newpart` to
466 populate it. Then call `getchunks` to retrieve all the binary chunks of
466 populate it. Then call `getchunks` to retrieve all the binary chunks of
467 data that compose the bundle2 container."""
467 data that compose the bundle2 container."""
468
468
469 _magicstring = 'HG20'
469 _magicstring = 'HG20'
470
470
471 def __init__(self, ui, capabilities=()):
471 def __init__(self, ui, capabilities=()):
472 self.ui = ui
472 self.ui = ui
473 self._params = []
473 self._params = []
474 self._parts = []
474 self._parts = []
475 self.capabilities = dict(capabilities)
475 self.capabilities = dict(capabilities)
476
476
477 @property
477 @property
478 def nbparts(self):
478 def nbparts(self):
479 """total number of parts added to the bundler"""
479 """total number of parts added to the bundler"""
480 return len(self._parts)
480 return len(self._parts)
481
481
482 # methods used to defines the bundle2 content
482 # methods used to defines the bundle2 content
483 def addparam(self, name, value=None):
483 def addparam(self, name, value=None):
484 """add a stream level parameter"""
484 """add a stream level parameter"""
485 if not name:
485 if not name:
486 raise ValueError('empty parameter name')
486 raise ValueError('empty parameter name')
487 if name[0] not in string.letters:
487 if name[0] not in string.letters:
488 raise ValueError('non letter first character: %r' % name)
488 raise ValueError('non letter first character: %r' % name)
489 self._params.append((name, value))
489 self._params.append((name, value))
490
490
491 def addpart(self, part):
491 def addpart(self, part):
492 """add a new part to the bundle2 container
492 """add a new part to the bundle2 container
493
493
494 Parts contains the actual applicative payload."""
494 Parts contains the actual applicative payload."""
495 assert part.id is None
495 assert part.id is None
496 part.id = len(self._parts) # very cheap counter
496 part.id = len(self._parts) # very cheap counter
497 self._parts.append(part)
497 self._parts.append(part)
498
498
499 def newpart(self, typeid, *args, **kwargs):
499 def newpart(self, typeid, *args, **kwargs):
500 """create a new part and add it to the containers
500 """create a new part and add it to the containers
501
501
502 As the part is directly added to the containers. For now, this means
502 As the part is directly added to the containers. For now, this means
503 that any failure to properly initialize the part after calling
503 that any failure to properly initialize the part after calling
504 ``newpart`` should result in a failure of the whole bundling process.
504 ``newpart`` should result in a failure of the whole bundling process.
505
505
506 You can still fall back to manually create and add if you need better
506 You can still fall back to manually create and add if you need better
507 control."""
507 control."""
508 part = bundlepart(typeid, *args, **kwargs)
508 part = bundlepart(typeid, *args, **kwargs)
509 self.addpart(part)
509 self.addpart(part)
510 return part
510 return part
511
511
512 # methods used to generate the bundle2 stream
512 # methods used to generate the bundle2 stream
513 def getchunks(self):
513 def getchunks(self):
514 if self.ui.debugflag:
514 if self.ui.debugflag:
515 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
515 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
516 if self._params:
516 if self._params:
517 msg.append(' (%i params)' % len(self._params))
517 msg.append(' (%i params)' % len(self._params))
518 msg.append(' %i parts total\n' % len(self._parts))
518 msg.append(' %i parts total\n' % len(self._parts))
519 self.ui.debug(''.join(msg))
519 self.ui.debug(''.join(msg))
520 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
520 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
521 yield self._magicstring
521 yield self._magicstring
522 param = self._paramchunk()
522 param = self._paramchunk()
523 outdebug(self.ui, 'bundle parameter: %s' % param)
523 outdebug(self.ui, 'bundle parameter: %s' % param)
524 yield _pack(_fstreamparamsize, len(param))
524 yield _pack(_fstreamparamsize, len(param))
525 if param:
525 if param:
526 yield param
526 yield param
527
527
528 outdebug(self.ui, 'start of parts')
528 outdebug(self.ui, 'start of parts')
529 for part in self._parts:
529 for part in self._parts:
530 outdebug(self.ui, 'bundle part: "%s"' % part.type)
530 outdebug(self.ui, 'bundle part: "%s"' % part.type)
531 for chunk in part.getchunks(ui=self.ui):
531 for chunk in part.getchunks(ui=self.ui):
532 yield chunk
532 yield chunk
533 outdebug(self.ui, 'end of bundle')
533 outdebug(self.ui, 'end of bundle')
534 yield _pack(_fpartheadersize, 0)
534 yield _pack(_fpartheadersize, 0)
535
535
536 def _paramchunk(self):
536 def _paramchunk(self):
537 """return a encoded version of all stream parameters"""
537 """return a encoded version of all stream parameters"""
538 blocks = []
538 blocks = []
539 for par, value in self._params:
539 for par, value in self._params:
540 par = urllib.quote(par)
540 par = urllib.quote(par)
541 if value is not None:
541 if value is not None:
542 value = urllib.quote(value)
542 value = urllib.quote(value)
543 par = '%s=%s' % (par, value)
543 par = '%s=%s' % (par, value)
544 blocks.append(par)
544 blocks.append(par)
545 return ' '.join(blocks)
545 return ' '.join(blocks)
546
546
547 def salvageoutput(self):
547 def salvageoutput(self):
548 """return a list with a copy of all output parts in the bundle
548 """return a list with a copy of all output parts in the bundle
549
549
550 This is meant to be used during error handling to make sure we preserve
550 This is meant to be used during error handling to make sure we preserve
551 server output"""
551 server output"""
552 salvaged = []
552 salvaged = []
553 for part in self._parts:
553 for part in self._parts:
554 if part.type.startswith('output'):
554 if part.type.startswith('output'):
555 salvaged.append(part.copy())
555 salvaged.append(part.copy())
556 return salvaged
556 return salvaged
557
557
558
558
559 class unpackermixin(object):
559 class unpackermixin(object):
560 """A mixin to extract bytes and struct data from a stream"""
560 """A mixin to extract bytes and struct data from a stream"""
561
561
562 def __init__(self, fp):
562 def __init__(self, fp):
563 self._fp = fp
563 self._fp = fp
564 self._seekable = (util.safehasattr(fp, 'seek') and
564 self._seekable = (util.safehasattr(fp, 'seek') and
565 util.safehasattr(fp, 'tell'))
565 util.safehasattr(fp, 'tell'))
566
566
567 def _unpack(self, format):
567 def _unpack(self, format):
568 """unpack this struct format from the stream"""
568 """unpack this struct format from the stream"""
569 data = self._readexact(struct.calcsize(format))
569 data = self._readexact(struct.calcsize(format))
570 return _unpack(format, data)
570 return _unpack(format, data)
571
571
572 def _readexact(self, size):
572 def _readexact(self, size):
573 """read exactly <size> bytes from the stream"""
573 """read exactly <size> bytes from the stream"""
574 return changegroup.readexactly(self._fp, size)
574 return changegroup.readexactly(self._fp, size)
575
575
576 def seek(self, offset, whence=0):
576 def seek(self, offset, whence=0):
577 """move the underlying file pointer"""
577 """move the underlying file pointer"""
578 if self._seekable:
578 if self._seekable:
579 return self._fp.seek(offset, whence)
579 return self._fp.seek(offset, whence)
580 else:
580 else:
581 raise NotImplementedError(_('File pointer is not seekable'))
581 raise NotImplementedError(_('File pointer is not seekable'))
582
582
583 def tell(self):
583 def tell(self):
584 """return the file offset, or None if file is not seekable"""
584 """return the file offset, or None if file is not seekable"""
585 if self._seekable:
585 if self._seekable:
586 try:
586 try:
587 return self._fp.tell()
587 return self._fp.tell()
588 except IOError, e:
588 except IOError, e:
589 if e.errno == errno.ESPIPE:
589 if e.errno == errno.ESPIPE:
590 self._seekable = False
590 self._seekable = False
591 else:
591 else:
592 raise
592 raise
593 return None
593 return None
594
594
595 def close(self):
595 def close(self):
596 """close underlying file"""
596 """close underlying file"""
597 if util.safehasattr(self._fp, 'close'):
597 if util.safehasattr(self._fp, 'close'):
598 return self._fp.close()
598 return self._fp.close()
599
599
600 def getunbundler(ui, fp, header=None):
600 def getunbundler(ui, fp, header=None):
601 """return a valid unbundler object for a given header"""
601 """return a valid unbundler object for a given header"""
602 if header is None:
602 if header is None:
603 header = changegroup.readexactly(fp, 4)
603 header = changegroup.readexactly(fp, 4)
604 magic, version = header[0:2], header[2:4]
604 magic, version = header[0:2], header[2:4]
605 if magic != 'HG':
605 if magic != 'HG':
606 raise util.Abort(_('not a Mercurial bundle'))
606 raise util.Abort(_('not a Mercurial bundle'))
607 unbundlerclass = formatmap.get(version)
607 unbundlerclass = formatmap.get(version)
608 if unbundlerclass is None:
608 if unbundlerclass is None:
609 raise util.Abort(_('unknown bundle version %s') % version)
609 raise util.Abort(_('unknown bundle version %s') % version)
610 unbundler = unbundlerclass(ui, fp)
610 unbundler = unbundlerclass(ui, fp)
611 indebug(ui, 'start processing of %s stream' % header)
611 indebug(ui, 'start processing of %s stream' % header)
612 return unbundler
612 return unbundler
613
613
614 class unbundle20(unpackermixin):
614 class unbundle20(unpackermixin):
615 """interpret a bundle2 stream
615 """interpret a bundle2 stream
616
616
617 This class is fed with a binary stream and yields parts through its
617 This class is fed with a binary stream and yields parts through its
618 `iterparts` methods."""
618 `iterparts` methods."""
619
619
620 def __init__(self, ui, fp):
620 def __init__(self, ui, fp):
621 """If header is specified, we do not read it out of the stream."""
621 """If header is specified, we do not read it out of the stream."""
622 self.ui = ui
622 self.ui = ui
623 super(unbundle20, self).__init__(fp)
623 super(unbundle20, self).__init__(fp)
624
624
625 @util.propertycache
625 @util.propertycache
626 def params(self):
626 def params(self):
627 """dictionary of stream level parameters"""
627 """dictionary of stream level parameters"""
628 indebug(self.ui, 'reading bundle2 stream parameters')
628 indebug(self.ui, 'reading bundle2 stream parameters')
629 params = {}
629 params = {}
630 paramssize = self._unpack(_fstreamparamsize)[0]
630 paramssize = self._unpack(_fstreamparamsize)[0]
631 if paramssize < 0:
631 if paramssize < 0:
632 raise error.BundleValueError('negative bundle param size: %i'
632 raise error.BundleValueError('negative bundle param size: %i'
633 % paramssize)
633 % paramssize)
634 if paramssize:
634 if paramssize:
635 for p in self._readexact(paramssize).split(' '):
635 for p in self._readexact(paramssize).split(' '):
636 p = p.split('=', 1)
636 p = p.split('=', 1)
637 p = [urllib.unquote(i) for i in p]
637 p = [urllib.unquote(i) for i in p]
638 if len(p) < 2:
638 if len(p) < 2:
639 p.append(None)
639 p.append(None)
640 self._processparam(*p)
640 self._processparam(*p)
641 params[p[0]] = p[1]
641 params[p[0]] = p[1]
642 return params
642 return params
643
643
644 def _processparam(self, name, value):
644 def _processparam(self, name, value):
645 """process a parameter, applying its effect if needed
645 """process a parameter, applying its effect if needed
646
646
647 Parameter starting with a lower case letter are advisory and will be
647 Parameter starting with a lower case letter are advisory and will be
648 ignored when unknown. Those starting with an upper case letter are
648 ignored when unknown. Those starting with an upper case letter are
649 mandatory and will this function will raise a KeyError when unknown.
649 mandatory and will this function will raise a KeyError when unknown.
650
650
651 Note: no option are currently supported. Any input will be either
651 Note: no option are currently supported. Any input will be either
652 ignored or failing.
652 ignored or failing.
653 """
653 """
654 if not name:
654 if not name:
655 raise ValueError('empty parameter name')
655 raise ValueError('empty parameter name')
656 if name[0] not in string.letters:
656 if name[0] not in string.letters:
657 raise ValueError('non letter first character: %r' % name)
657 raise ValueError('non letter first character: %r' % name)
658 # Some logic will be later added here to try to process the option for
658 # Some logic will be later added here to try to process the option for
659 # a dict of known parameter.
659 # a dict of known parameter.
660 if name[0].islower():
660 if name[0].islower():
661 indebug(self.ui, "ignoring unknown parameter %r" % name)
661 indebug(self.ui, "ignoring unknown parameter %r" % name)
662 else:
662 else:
663 raise error.UnsupportedPartError(params=(name,))
663 raise error.UnsupportedPartError(params=(name,))
664
664
665
665
666 def iterparts(self):
666 def iterparts(self):
667 """yield all parts contained in the stream"""
667 """yield all parts contained in the stream"""
668 # make sure param have been loaded
668 # make sure param have been loaded
669 self.params
669 self.params
670 indebug(self.ui, 'start extraction of bundle2 parts')
670 indebug(self.ui, 'start extraction of bundle2 parts')
671 headerblock = self._readpartheader()
671 headerblock = self._readpartheader()
672 while headerblock is not None:
672 while headerblock is not None:
673 part = unbundlepart(self.ui, headerblock, self._fp)
673 part = unbundlepart(self.ui, headerblock, self._fp)
674 yield part
674 yield part
675 part.seek(0, 2)
675 part.seek(0, 2)
676 headerblock = self._readpartheader()
676 headerblock = self._readpartheader()
677 indebug(self.ui, 'end of bundle2 stream')
677 indebug(self.ui, 'end of bundle2 stream')
678
678
679 def _readpartheader(self):
679 def _readpartheader(self):
680 """reads a part header size and return the bytes blob
680 """reads a part header size and return the bytes blob
681
681
682 returns None if empty"""
682 returns None if empty"""
683 headersize = self._unpack(_fpartheadersize)[0]
683 headersize = self._unpack(_fpartheadersize)[0]
684 if headersize < 0:
684 if headersize < 0:
685 raise error.BundleValueError('negative part header size: %i'
685 raise error.BundleValueError('negative part header size: %i'
686 % headersize)
686 % headersize)
687 indebug(self.ui, 'part header size: %i' % headersize)
687 indebug(self.ui, 'part header size: %i' % headersize)
688 if headersize:
688 if headersize:
689 return self._readexact(headersize)
689 return self._readexact(headersize)
690 return None
690 return None
691
691
692 def compressed(self):
692 def compressed(self):
693 return False
693 return False
694
694
695 formatmap = {'20': unbundle20}
695 formatmap = {'20': unbundle20}
696
696
697 class bundlepart(object):
697 class bundlepart(object):
698 """A bundle2 part contains application level payload
698 """A bundle2 part contains application level payload
699
699
700 The part `type` is used to route the part to the application level
700 The part `type` is used to route the part to the application level
701 handler.
701 handler.
702
702
703 The part payload is contained in ``part.data``. It could be raw bytes or a
703 The part payload is contained in ``part.data``. It could be raw bytes or a
704 generator of byte chunks.
704 generator of byte chunks.
705
705
706 You can add parameters to the part using the ``addparam`` method.
706 You can add parameters to the part using the ``addparam`` method.
707 Parameters can be either mandatory (default) or advisory. Remote side
707 Parameters can be either mandatory (default) or advisory. Remote side
708 should be able to safely ignore the advisory ones.
708 should be able to safely ignore the advisory ones.
709
709
710 Both data and parameters cannot be modified after the generation has begun.
710 Both data and parameters cannot be modified after the generation has begun.
711 """
711 """
712
712
713 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
713 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
714 data='', mandatory=True):
714 data='', mandatory=True):
715 validateparttype(parttype)
715 validateparttype(parttype)
716 self.id = None
716 self.id = None
717 self.type = parttype
717 self.type = parttype
718 self._data = data
718 self._data = data
719 self._mandatoryparams = list(mandatoryparams)
719 self._mandatoryparams = list(mandatoryparams)
720 self._advisoryparams = list(advisoryparams)
720 self._advisoryparams = list(advisoryparams)
721 # checking for duplicated entries
721 # checking for duplicated entries
722 self._seenparams = set()
722 self._seenparams = set()
723 for pname, __ in self._mandatoryparams + self._advisoryparams:
723 for pname, __ in self._mandatoryparams + self._advisoryparams:
724 if pname in self._seenparams:
724 if pname in self._seenparams:
725 raise RuntimeError('duplicated params: %s' % pname)
725 raise RuntimeError('duplicated params: %s' % pname)
726 self._seenparams.add(pname)
726 self._seenparams.add(pname)
727 # status of the part's generation:
727 # status of the part's generation:
728 # - None: not started,
728 # - None: not started,
729 # - False: currently generated,
729 # - False: currently generated,
730 # - True: generation done.
730 # - True: generation done.
731 self._generated = None
731 self._generated = None
732 self.mandatory = mandatory
732 self.mandatory = mandatory
733
733
734 def copy(self):
734 def copy(self):
735 """return a copy of the part
735 """return a copy of the part
736
736
737 The new part have the very same content but no partid assigned yet.
737 The new part have the very same content but no partid assigned yet.
738 Parts with generated data cannot be copied."""
738 Parts with generated data cannot be copied."""
739 assert not util.safehasattr(self.data, 'next')
739 assert not util.safehasattr(self.data, 'next')
740 return self.__class__(self.type, self._mandatoryparams,
740 return self.__class__(self.type, self._mandatoryparams,
741 self._advisoryparams, self._data, self.mandatory)
741 self._advisoryparams, self._data, self.mandatory)
742
742
743 # methods used to defines the part content
743 # methods used to defines the part content
744 def __setdata(self, data):
744 def __setdata(self, data):
745 if self._generated is not None:
745 if self._generated is not None:
746 raise error.ReadOnlyPartError('part is being generated')
746 raise error.ReadOnlyPartError('part is being generated')
747 self._data = data
747 self._data = data
748 def __getdata(self):
748 def __getdata(self):
749 return self._data
749 return self._data
750 data = property(__getdata, __setdata)
750 data = property(__getdata, __setdata)
751
751
752 @property
752 @property
753 def mandatoryparams(self):
753 def mandatoryparams(self):
754 # make it an immutable tuple to force people through ``addparam``
754 # make it an immutable tuple to force people through ``addparam``
755 return tuple(self._mandatoryparams)
755 return tuple(self._mandatoryparams)
756
756
757 @property
757 @property
758 def advisoryparams(self):
758 def advisoryparams(self):
759 # make it an immutable tuple to force people through ``addparam``
759 # make it an immutable tuple to force people through ``addparam``
760 return tuple(self._advisoryparams)
760 return tuple(self._advisoryparams)
761
761
762 def addparam(self, name, value='', mandatory=True):
762 def addparam(self, name, value='', mandatory=True):
763 if self._generated is not None:
763 if self._generated is not None:
764 raise error.ReadOnlyPartError('part is being generated')
764 raise error.ReadOnlyPartError('part is being generated')
765 if name in self._seenparams:
765 if name in self._seenparams:
766 raise ValueError('duplicated params: %s' % name)
766 raise ValueError('duplicated params: %s' % name)
767 self._seenparams.add(name)
767 self._seenparams.add(name)
768 params = self._advisoryparams
768 params = self._advisoryparams
769 if mandatory:
769 if mandatory:
770 params = self._mandatoryparams
770 params = self._mandatoryparams
771 params.append((name, value))
771 params.append((name, value))
772
772
773 # methods used to generates the bundle2 stream
773 # methods used to generates the bundle2 stream
774 def getchunks(self, ui):
774 def getchunks(self, ui):
775 if self._generated is not None:
775 if self._generated is not None:
776 raise RuntimeError('part can only be consumed once')
776 raise RuntimeError('part can only be consumed once')
777 self._generated = False
777 self._generated = False
778
778
779 if ui.debugflag:
779 if ui.debugflag:
780 msg = ['bundle2-output-part: "%s"' % self.type]
780 msg = ['bundle2-output-part: "%s"' % self.type]
781 if not self.mandatory:
781 if not self.mandatory:
782 msg.append(' (advisory)')
782 msg.append(' (advisory)')
783 nbmp = len(self.mandatoryparams)
783 nbmp = len(self.mandatoryparams)
784 nbap = len(self.advisoryparams)
784 nbap = len(self.advisoryparams)
785 if nbmp or nbap:
785 if nbmp or nbap:
786 msg.append(' (params:')
786 msg.append(' (params:')
787 if nbmp:
787 if nbmp:
788 msg.append(' %i mandatory' % nbmp)
788 msg.append(' %i mandatory' % nbmp)
789 if nbap:
789 if nbap:
790 msg.append(' %i advisory' % nbmp)
790 msg.append(' %i advisory' % nbmp)
791 msg.append(')')
791 msg.append(')')
792 if not self.data:
792 if not self.data:
793 msg.append(' empty payload')
793 msg.append(' empty payload')
794 elif util.safehasattr(self.data, 'next'):
794 elif util.safehasattr(self.data, 'next'):
795 msg.append(' streamed payload')
795 msg.append(' streamed payload')
796 else:
796 else:
797 msg.append(' %i bytes payload' % len(self.data))
797 msg.append(' %i bytes payload' % len(self.data))
798 msg.append('\n')
798 msg.append('\n')
799 ui.debug(''.join(msg))
799 ui.debug(''.join(msg))
800
800
801 #### header
801 #### header
802 if self.mandatory:
802 if self.mandatory:
803 parttype = self.type.upper()
803 parttype = self.type.upper()
804 else:
804 else:
805 parttype = self.type.lower()
805 parttype = self.type.lower()
806 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
806 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
807 ## parttype
807 ## parttype
808 header = [_pack(_fparttypesize, len(parttype)),
808 header = [_pack(_fparttypesize, len(parttype)),
809 parttype, _pack(_fpartid, self.id),
809 parttype, _pack(_fpartid, self.id),
810 ]
810 ]
811 ## parameters
811 ## parameters
812 # count
812 # count
813 manpar = self.mandatoryparams
813 manpar = self.mandatoryparams
814 advpar = self.advisoryparams
814 advpar = self.advisoryparams
815 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
815 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
816 # size
816 # size
817 parsizes = []
817 parsizes = []
818 for key, value in manpar:
818 for key, value in manpar:
819 parsizes.append(len(key))
819 parsizes.append(len(key))
820 parsizes.append(len(value))
820 parsizes.append(len(value))
821 for key, value in advpar:
821 for key, value in advpar:
822 parsizes.append(len(key))
822 parsizes.append(len(key))
823 parsizes.append(len(value))
823 parsizes.append(len(value))
824 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
824 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
825 header.append(paramsizes)
825 header.append(paramsizes)
826 # key, value
826 # key, value
827 for key, value in manpar:
827 for key, value in manpar:
828 header.append(key)
828 header.append(key)
829 header.append(value)
829 header.append(value)
830 for key, value in advpar:
830 for key, value in advpar:
831 header.append(key)
831 header.append(key)
832 header.append(value)
832 header.append(value)
833 ## finalize header
833 ## finalize header
834 headerchunk = ''.join(header)
834 headerchunk = ''.join(header)
835 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
835 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
836 yield _pack(_fpartheadersize, len(headerchunk))
836 yield _pack(_fpartheadersize, len(headerchunk))
837 yield headerchunk
837 yield headerchunk
838 ## payload
838 ## payload
839 try:
839 try:
840 for chunk in self._payloadchunks():
840 for chunk in self._payloadchunks():
841 outdebug(ui, 'payload chunk size: %i' % len(chunk))
841 outdebug(ui, 'payload chunk size: %i' % len(chunk))
842 yield _pack(_fpayloadsize, len(chunk))
842 yield _pack(_fpayloadsize, len(chunk))
843 yield chunk
843 yield chunk
844 except BaseException, exc:
844 except BaseException, exc:
845 # backup exception data for later
845 # backup exception data for later
846 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
846 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
847 % exc)
847 % exc)
848 exc_info = sys.exc_info()
848 exc_info = sys.exc_info()
849 msg = 'unexpected error: %s' % exc
849 msg = 'unexpected error: %s' % exc
850 interpart = bundlepart('error:abort', [('message', msg)],
850 interpart = bundlepart('error:abort', [('message', msg)],
851 mandatory=False)
851 mandatory=False)
852 interpart.id = 0
852 interpart.id = 0
853 yield _pack(_fpayloadsize, -1)
853 yield _pack(_fpayloadsize, -1)
854 for chunk in interpart.getchunks(ui=ui):
854 for chunk in interpart.getchunks(ui=ui):
855 yield chunk
855 yield chunk
856 outdebug(ui, 'closing payload chunk')
856 outdebug(ui, 'closing payload chunk')
857 # abort current part payload
857 # abort current part payload
858 yield _pack(_fpayloadsize, 0)
858 yield _pack(_fpayloadsize, 0)
859 raise exc_info[0], exc_info[1], exc_info[2]
859 raise exc_info[0], exc_info[1], exc_info[2]
860 # end of payload
860 # end of payload
861 outdebug(ui, 'closing payload chunk')
861 outdebug(ui, 'closing payload chunk')
862 yield _pack(_fpayloadsize, 0)
862 yield _pack(_fpayloadsize, 0)
863 self._generated = True
863 self._generated = True
864
864
865 def _payloadchunks(self):
865 def _payloadchunks(self):
866 """yield chunks of a the part payload
866 """yield chunks of a the part payload
867
867
868 Exists to handle the different methods to provide data to a part."""
868 Exists to handle the different methods to provide data to a part."""
869 # we only support fixed size data now.
869 # we only support fixed size data now.
870 # This will be improved in the future.
870 # This will be improved in the future.
871 if util.safehasattr(self.data, 'next'):
871 if util.safehasattr(self.data, 'next'):
872 buff = util.chunkbuffer(self.data)
872 buff = util.chunkbuffer(self.data)
873 chunk = buff.read(preferedchunksize)
873 chunk = buff.read(preferedchunksize)
874 while chunk:
874 while chunk:
875 yield chunk
875 yield chunk
876 chunk = buff.read(preferedchunksize)
876 chunk = buff.read(preferedchunksize)
877 elif len(self.data):
877 elif len(self.data):
878 yield self.data
878 yield self.data
879
879
880
880
881 flaginterrupt = -1
881 flaginterrupt = -1
882
882
883 class interrupthandler(unpackermixin):
883 class interrupthandler(unpackermixin):
884 """read one part and process it with restricted capability
884 """read one part and process it with restricted capability
885
885
886 This allows to transmit exception raised on the producer size during part
886 This allows to transmit exception raised on the producer size during part
887 iteration while the consumer is reading a part.
887 iteration while the consumer is reading a part.
888
888
889 Part processed in this manner only have access to a ui object,"""
889 Part processed in this manner only have access to a ui object,"""
890
890
891 def __init__(self, ui, fp):
891 def __init__(self, ui, fp):
892 super(interrupthandler, self).__init__(fp)
892 super(interrupthandler, self).__init__(fp)
893 self.ui = ui
893 self.ui = ui
894
894
895 def _readpartheader(self):
895 def _readpartheader(self):
896 """reads a part header size and return the bytes blob
896 """reads a part header size and return the bytes blob
897
897
898 returns None if empty"""
898 returns None if empty"""
899 headersize = self._unpack(_fpartheadersize)[0]
899 headersize = self._unpack(_fpartheadersize)[0]
900 if headersize < 0:
900 if headersize < 0:
901 raise error.BundleValueError('negative part header size: %i'
901 raise error.BundleValueError('negative part header size: %i'
902 % headersize)
902 % headersize)
903 indebug(self.ui, 'part header size: %i\n' % headersize)
903 indebug(self.ui, 'part header size: %i\n' % headersize)
904 if headersize:
904 if headersize:
905 return self._readexact(headersize)
905 return self._readexact(headersize)
906 return None
906 return None
907
907
908 def __call__(self):
908 def __call__(self):
909
909
910 self.ui.debug('bundle2-input-stream-interrupt:'
910 self.ui.debug('bundle2-input-stream-interrupt:'
911 ' opening out of band context\n')
911 ' opening out of band context\n')
912 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
912 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
913 headerblock = self._readpartheader()
913 headerblock = self._readpartheader()
914 if headerblock is None:
914 if headerblock is None:
915 indebug(self.ui, 'no part found during interruption.')
915 indebug(self.ui, 'no part found during interruption.')
916 return
916 return
917 part = unbundlepart(self.ui, headerblock, self._fp)
917 part = unbundlepart(self.ui, headerblock, self._fp)
918 op = interruptoperation(self.ui)
918 op = interruptoperation(self.ui)
919 _processpart(op, part)
919 _processpart(op, part)
920 self.ui.debug('bundle2-input-stream-interrupt:'
920 self.ui.debug('bundle2-input-stream-interrupt:'
921 ' closing out of band context\n')
921 ' closing out of band context\n')
922
922
923 class interruptoperation(object):
923 class interruptoperation(object):
924 """A limited operation to be use by part handler during interruption
924 """A limited operation to be use by part handler during interruption
925
925
926 It only have access to an ui object.
926 It only have access to an ui object.
927 """
927 """
928
928
929 def __init__(self, ui):
929 def __init__(self, ui):
930 self.ui = ui
930 self.ui = ui
931 self.reply = None
931 self.reply = None
932 self.captureoutput = False
932 self.captureoutput = False
933
933
934 @property
934 @property
935 def repo(self):
935 def repo(self):
936 raise RuntimeError('no repo access from stream interruption')
936 raise RuntimeError('no repo access from stream interruption')
937
937
938 def gettransaction(self):
938 def gettransaction(self):
939 raise TransactionUnavailable('no repo access from stream interruption')
939 raise TransactionUnavailable('no repo access from stream interruption')
940
940
941 class unbundlepart(unpackermixin):
941 class unbundlepart(unpackermixin):
942 """a bundle part read from a bundle"""
942 """a bundle part read from a bundle"""
943
943
944 def __init__(self, ui, header, fp):
944 def __init__(self, ui, header, fp):
945 super(unbundlepart, self).__init__(fp)
945 super(unbundlepart, self).__init__(fp)
946 self.ui = ui
946 self.ui = ui
947 # unbundle state attr
947 # unbundle state attr
948 self._headerdata = header
948 self._headerdata = header
949 self._headeroffset = 0
949 self._headeroffset = 0
950 self._initialized = False
950 self._initialized = False
951 self.consumed = False
951 self.consumed = False
952 # part data
952 # part data
953 self.id = None
953 self.id = None
954 self.type = None
954 self.type = None
955 self.mandatoryparams = None
955 self.mandatoryparams = None
956 self.advisoryparams = None
956 self.advisoryparams = None
957 self.params = None
957 self.params = None
958 self.mandatorykeys = ()
958 self.mandatorykeys = ()
959 self._payloadstream = None
959 self._payloadstream = None
960 self._readheader()
960 self._readheader()
961 self._mandatory = None
961 self._mandatory = None
962 self._chunkindex = [] #(payload, file) position tuples for chunk starts
962 self._chunkindex = [] #(payload, file) position tuples for chunk starts
963 self._pos = 0
963 self._pos = 0
964
964
965 def _fromheader(self, size):
965 def _fromheader(self, size):
966 """return the next <size> byte from the header"""
966 """return the next <size> byte from the header"""
967 offset = self._headeroffset
967 offset = self._headeroffset
968 data = self._headerdata[offset:(offset + size)]
968 data = self._headerdata[offset:(offset + size)]
969 self._headeroffset = offset + size
969 self._headeroffset = offset + size
970 return data
970 return data
971
971
972 def _unpackheader(self, format):
972 def _unpackheader(self, format):
973 """read given format from header
973 """read given format from header
974
974
975 This automatically compute the size of the format to read."""
975 This automatically compute the size of the format to read."""
976 data = self._fromheader(struct.calcsize(format))
976 data = self._fromheader(struct.calcsize(format))
977 return _unpack(format, data)
977 return _unpack(format, data)
978
978
979 def _initparams(self, mandatoryparams, advisoryparams):
979 def _initparams(self, mandatoryparams, advisoryparams):
980 """internal function to setup all logic related parameters"""
980 """internal function to setup all logic related parameters"""
981 # make it read only to prevent people touching it by mistake.
981 # make it read only to prevent people touching it by mistake.
982 self.mandatoryparams = tuple(mandatoryparams)
982 self.mandatoryparams = tuple(mandatoryparams)
983 self.advisoryparams = tuple(advisoryparams)
983 self.advisoryparams = tuple(advisoryparams)
984 # user friendly UI
984 # user friendly UI
985 self.params = dict(self.mandatoryparams)
985 self.params = dict(self.mandatoryparams)
986 self.params.update(dict(self.advisoryparams))
986 self.params.update(dict(self.advisoryparams))
987 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
987 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
988
988
989 def _payloadchunks(self, chunknum=0):
989 def _payloadchunks(self, chunknum=0):
990 '''seek to specified chunk and start yielding data'''
990 '''seek to specified chunk and start yielding data'''
991 if len(self._chunkindex) == 0:
991 if len(self._chunkindex) == 0:
992 assert chunknum == 0, 'Must start with chunk 0'
992 assert chunknum == 0, 'Must start with chunk 0'
993 self._chunkindex.append((0, super(unbundlepart, self).tell()))
993 self._chunkindex.append((0, super(unbundlepart, self).tell()))
994 else:
994 else:
995 assert chunknum < len(self._chunkindex), \
995 assert chunknum < len(self._chunkindex), \
996 'Unknown chunk %d' % chunknum
996 'Unknown chunk %d' % chunknum
997 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
997 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
998
998
999 pos = self._chunkindex[chunknum][0]
999 pos = self._chunkindex[chunknum][0]
1000 payloadsize = self._unpack(_fpayloadsize)[0]
1000 payloadsize = self._unpack(_fpayloadsize)[0]
1001 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1001 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1002 while payloadsize:
1002 while payloadsize:
1003 if payloadsize == flaginterrupt:
1003 if payloadsize == flaginterrupt:
1004 # interruption detection, the handler will now read a
1004 # interruption detection, the handler will now read a
1005 # single part and process it.
1005 # single part and process it.
1006 interrupthandler(self.ui, self._fp)()
1006 interrupthandler(self.ui, self._fp)()
1007 elif payloadsize < 0:
1007 elif payloadsize < 0:
1008 msg = 'negative payload chunk size: %i' % payloadsize
1008 msg = 'negative payload chunk size: %i' % payloadsize
1009 raise error.BundleValueError(msg)
1009 raise error.BundleValueError(msg)
1010 else:
1010 else:
1011 result = self._readexact(payloadsize)
1011 result = self._readexact(payloadsize)
1012 chunknum += 1
1012 chunknum += 1
1013 pos += payloadsize
1013 pos += payloadsize
1014 if chunknum == len(self._chunkindex):
1014 if chunknum == len(self._chunkindex):
1015 self._chunkindex.append((pos,
1015 self._chunkindex.append((pos,
1016 super(unbundlepart, self).tell()))
1016 super(unbundlepart, self).tell()))
1017 yield result
1017 yield result
1018 payloadsize = self._unpack(_fpayloadsize)[0]
1018 payloadsize = self._unpack(_fpayloadsize)[0]
1019 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1019 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1020
1020
1021 def _findchunk(self, pos):
1021 def _findchunk(self, pos):
1022 '''for a given payload position, return a chunk number and offset'''
1022 '''for a given payload position, return a chunk number and offset'''
1023 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1023 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1024 if ppos == pos:
1024 if ppos == pos:
1025 return chunk, 0
1025 return chunk, 0
1026 elif ppos > pos:
1026 elif ppos > pos:
1027 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1027 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1028 raise ValueError('Unknown chunk')
1028 raise ValueError('Unknown chunk')
1029
1029
1030 def _readheader(self):
1030 def _readheader(self):
1031 """read the header and setup the object"""
1031 """read the header and setup the object"""
1032 typesize = self._unpackheader(_fparttypesize)[0]
1032 typesize = self._unpackheader(_fparttypesize)[0]
1033 self.type = self._fromheader(typesize)
1033 self.type = self._fromheader(typesize)
1034 indebug(self.ui, 'part type: "%s"' % self.type)
1034 indebug(self.ui, 'part type: "%s"' % self.type)
1035 self.id = self._unpackheader(_fpartid)[0]
1035 self.id = self._unpackheader(_fpartid)[0]
1036 indebug(self.ui, 'part id: "%s"' % self.id)
1036 indebug(self.ui, 'part id: "%s"' % self.id)
1037 # extract mandatory bit from type
1037 # extract mandatory bit from type
1038 self.mandatory = (self.type != self.type.lower())
1038 self.mandatory = (self.type != self.type.lower())
1039 self.type = self.type.lower()
1039 self.type = self.type.lower()
1040 ## reading parameters
1040 ## reading parameters
1041 # param count
1041 # param count
1042 mancount, advcount = self._unpackheader(_fpartparamcount)
1042 mancount, advcount = self._unpackheader(_fpartparamcount)
1043 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1043 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1044 # param size
1044 # param size
1045 fparamsizes = _makefpartparamsizes(mancount + advcount)
1045 fparamsizes = _makefpartparamsizes(mancount + advcount)
1046 paramsizes = self._unpackheader(fparamsizes)
1046 paramsizes = self._unpackheader(fparamsizes)
1047 # make it a list of couple again
1047 # make it a list of couple again
1048 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1048 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1049 # split mandatory from advisory
1049 # split mandatory from advisory
1050 mansizes = paramsizes[:mancount]
1050 mansizes = paramsizes[:mancount]
1051 advsizes = paramsizes[mancount:]
1051 advsizes = paramsizes[mancount:]
1052 # retrieve param value
1052 # retrieve param value
1053 manparams = []
1053 manparams = []
1054 for key, value in mansizes:
1054 for key, value in mansizes:
1055 manparams.append((self._fromheader(key), self._fromheader(value)))
1055 manparams.append((self._fromheader(key), self._fromheader(value)))
1056 advparams = []
1056 advparams = []
1057 for key, value in advsizes:
1057 for key, value in advsizes:
1058 advparams.append((self._fromheader(key), self._fromheader(value)))
1058 advparams.append((self._fromheader(key), self._fromheader(value)))
1059 self._initparams(manparams, advparams)
1059 self._initparams(manparams, advparams)
1060 ## part payload
1060 ## part payload
1061 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1061 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1062 # we read the data, tell it
1062 # we read the data, tell it
1063 self._initialized = True
1063 self._initialized = True
1064
1064
1065 def read(self, size=None):
1065 def read(self, size=None):
1066 """read payload data"""
1066 """read payload data"""
1067 if not self._initialized:
1067 if not self._initialized:
1068 self._readheader()
1068 self._readheader()
1069 if size is None:
1069 if size is None:
1070 data = self._payloadstream.read()
1070 data = self._payloadstream.read()
1071 else:
1071 else:
1072 data = self._payloadstream.read(size)
1072 data = self._payloadstream.read(size)
1073 self._pos += len(data)
1073 self._pos += len(data)
1074 if size is None or len(data) < size:
1074 if size is None or len(data) < size:
1075 if not self.consumed and self._pos:
1075 if not self.consumed and self._pos:
1076 self.ui.debug('bundle2-input-part: total payload size %i\n'
1076 self.ui.debug('bundle2-input-part: total payload size %i\n'
1077 % self._pos)
1077 % self._pos)
1078 self.consumed = True
1078 self.consumed = True
1079 return data
1079 return data
1080
1080
1081 def tell(self):
1081 def tell(self):
1082 return self._pos
1082 return self._pos
1083
1083
1084 def seek(self, offset, whence=0):
1084 def seek(self, offset, whence=0):
1085 if whence == 0:
1085 if whence == 0:
1086 newpos = offset
1086 newpos = offset
1087 elif whence == 1:
1087 elif whence == 1:
1088 newpos = self._pos + offset
1088 newpos = self._pos + offset
1089 elif whence == 2:
1089 elif whence == 2:
1090 if not self.consumed:
1090 if not self.consumed:
1091 self.read()
1091 self.read()
1092 newpos = self._chunkindex[-1][0] - offset
1092 newpos = self._chunkindex[-1][0] - offset
1093 else:
1093 else:
1094 raise ValueError('Unknown whence value: %r' % (whence,))
1094 raise ValueError('Unknown whence value: %r' % (whence,))
1095
1095
1096 if newpos > self._chunkindex[-1][0] and not self.consumed:
1096 if newpos > self._chunkindex[-1][0] and not self.consumed:
1097 self.read()
1097 self.read()
1098 if not 0 <= newpos <= self._chunkindex[-1][0]:
1098 if not 0 <= newpos <= self._chunkindex[-1][0]:
1099 raise ValueError('Offset out of range')
1099 raise ValueError('Offset out of range')
1100
1100
1101 if self._pos != newpos:
1101 if self._pos != newpos:
1102 chunk, internaloffset = self._findchunk(newpos)
1102 chunk, internaloffset = self._findchunk(newpos)
1103 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1103 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1104 adjust = self.read(internaloffset)
1104 adjust = self.read(internaloffset)
1105 if len(adjust) != internaloffset:
1105 if len(adjust) != internaloffset:
1106 raise util.Abort(_('Seek failed\n'))
1106 raise util.Abort(_('Seek failed\n'))
1107 self._pos = newpos
1107 self._pos = newpos
1108
1108
1109 # These are only the static capabilities.
1109 # These are only the static capabilities.
1110 # Check the 'getrepocaps' function for the rest.
1110 # Check the 'getrepocaps' function for the rest.
1111 capabilities = {'HG20': (),
1111 capabilities = {'HG20': (),
1112 'error': ('abort', 'unsupportedcontent', 'pushraced',
1112 'error': ('abort', 'unsupportedcontent', 'pushraced',
1113 'pushkey'),
1113 'pushkey'),
1114 'listkeys': (),
1114 'listkeys': (),
1115 'pushkey': (),
1115 'pushkey': (),
1116 'digests': tuple(sorted(util.DIGESTS.keys())),
1116 'digests': tuple(sorted(util.DIGESTS.keys())),
1117 'remote-changegroup': ('http', 'https'),
1117 'remote-changegroup': ('http', 'https'),
1118 'hgtagsfnodes': (),
1118 'hgtagsfnodes': (),
1119 }
1119 }
1120
1120
1121 def getrepocaps(repo, allowpushback=False):
1121 def getrepocaps(repo, allowpushback=False):
1122 """return the bundle2 capabilities for a given repo
1122 """return the bundle2 capabilities for a given repo
1123
1123
1124 Exists to allow extensions (like evolution) to mutate the capabilities.
1124 Exists to allow extensions (like evolution) to mutate the capabilities.
1125 """
1125 """
1126 caps = capabilities.copy()
1126 caps = capabilities.copy()
1127 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1127 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1128 if obsolete.isenabled(repo, obsolete.exchangeopt):
1128 if obsolete.isenabled(repo, obsolete.exchangeopt):
1129 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1129 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1130 caps['obsmarkers'] = supportedformat
1130 caps['obsmarkers'] = supportedformat
1131 if allowpushback:
1131 if allowpushback:
1132 caps['pushback'] = ()
1132 caps['pushback'] = ()
1133 return caps
1133 return caps
1134
1134
1135 def bundle2caps(remote):
1135 def bundle2caps(remote):
1136 """return the bundle capabilities of a peer as dict"""
1136 """return the bundle capabilities of a peer as dict"""
1137 raw = remote.capable('bundle2')
1137 raw = remote.capable('bundle2')
1138 if not raw and raw != '':
1138 if not raw and raw != '':
1139 return {}
1139 return {}
1140 capsblob = urllib.unquote(remote.capable('bundle2'))
1140 capsblob = urllib.unquote(remote.capable('bundle2'))
1141 return decodecaps(capsblob)
1141 return decodecaps(capsblob)
1142
1142
1143 def obsmarkersversion(caps):
1143 def obsmarkersversion(caps):
1144 """extract the list of supported obsmarkers versions from a bundle2caps dict
1144 """extract the list of supported obsmarkers versions from a bundle2caps dict
1145 """
1145 """
1146 obscaps = caps.get('obsmarkers', ())
1146 obscaps = caps.get('obsmarkers', ())
1147 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1147 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1148
1148
1149 @parthandler('changegroup', ('version',))
1149 @parthandler('changegroup', ('version',))
1150 def handlechangegroup(op, inpart):
1150 def handlechangegroup(op, inpart):
1151 """apply a changegroup part on the repo
1151 """apply a changegroup part on the repo
1152
1152
1153 This is a very early implementation that will massive rework before being
1153 This is a very early implementation that will massive rework before being
1154 inflicted to any end-user.
1154 inflicted to any end-user.
1155 """
1155 """
1156 # Make sure we trigger a transaction creation
1156 # Make sure we trigger a transaction creation
1157 #
1157 #
1158 # The addchangegroup function will get a transaction object by itself, but
1158 # The addchangegroup function will get a transaction object by itself, but
1159 # we need to make sure we trigger the creation of a transaction object used
1159 # we need to make sure we trigger the creation of a transaction object used
1160 # for the whole processing scope.
1160 # for the whole processing scope.
1161 op.gettransaction()
1161 op.gettransaction()
1162 unpackerversion = inpart.params.get('version', '01')
1162 unpackerversion = inpart.params.get('version', '01')
1163 # We should raise an appropriate exception here
1163 # We should raise an appropriate exception here
1164 unpacker = changegroup.packermap[unpackerversion][1]
1164 unpacker = changegroup.packermap[unpackerversion][1]
1165 cg = unpacker(inpart, 'UN')
1165 cg = unpacker(inpart, 'UN')
1166 # the source and url passed here are overwritten by the one contained in
1166 # the source and url passed here are overwritten by the one contained in
1167 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1167 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1168 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1168 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1169 op.records.add('changegroup', {'return': ret})
1169 op.records.add('changegroup', {'return': ret})
1170 if op.reply is not None:
1170 if op.reply is not None:
1171 # This is definitely not the final form of this
1171 # This is definitely not the final form of this
1172 # return. But one need to start somewhere.
1172 # return. But one need to start somewhere.
1173 part = op.reply.newpart('reply:changegroup', mandatory=False)
1173 part = op.reply.newpart('reply:changegroup', mandatory=False)
1174 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1174 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1175 part.addparam('return', '%i' % ret, mandatory=False)
1175 part.addparam('return', '%i' % ret, mandatory=False)
1176 assert not inpart.read()
1176 assert not inpart.read()
1177
1177
1178 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1178 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1179 ['digest:%s' % k for k in util.DIGESTS.keys()])
1179 ['digest:%s' % k for k in util.DIGESTS.keys()])
1180 @parthandler('remote-changegroup', _remotechangegroupparams)
1180 @parthandler('remote-changegroup', _remotechangegroupparams)
1181 def handleremotechangegroup(op, inpart):
1181 def handleremotechangegroup(op, inpart):
1182 """apply a bundle10 on the repo, given an url and validation information
1182 """apply a bundle10 on the repo, given an url and validation information
1183
1183
1184 All the information about the remote bundle to import are given as
1184 All the information about the remote bundle to import are given as
1185 parameters. The parameters include:
1185 parameters. The parameters include:
1186 - url: the url to the bundle10.
1186 - url: the url to the bundle10.
1187 - size: the bundle10 file size. It is used to validate what was
1187 - size: the bundle10 file size. It is used to validate what was
1188 retrieved by the client matches the server knowledge about the bundle.
1188 retrieved by the client matches the server knowledge about the bundle.
1189 - digests: a space separated list of the digest types provided as
1189 - digests: a space separated list of the digest types provided as
1190 parameters.
1190 parameters.
1191 - digest:<digest-type>: the hexadecimal representation of the digest with
1191 - digest:<digest-type>: the hexadecimal representation of the digest with
1192 that name. Like the size, it is used to validate what was retrieved by
1192 that name. Like the size, it is used to validate what was retrieved by
1193 the client matches what the server knows about the bundle.
1193 the client matches what the server knows about the bundle.
1194
1194
1195 When multiple digest types are given, all of them are checked.
1195 When multiple digest types are given, all of them are checked.
1196 """
1196 """
1197 try:
1197 try:
1198 raw_url = inpart.params['url']
1198 raw_url = inpart.params['url']
1199 except KeyError:
1199 except KeyError:
1200 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1200 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1201 parsed_url = util.url(raw_url)
1201 parsed_url = util.url(raw_url)
1202 if parsed_url.scheme not in capabilities['remote-changegroup']:
1202 if parsed_url.scheme not in capabilities['remote-changegroup']:
1203 raise util.Abort(_('remote-changegroup does not support %s urls') %
1203 raise util.Abort(_('remote-changegroup does not support %s urls') %
1204 parsed_url.scheme)
1204 parsed_url.scheme)
1205
1205
1206 try:
1206 try:
1207 size = int(inpart.params['size'])
1207 size = int(inpart.params['size'])
1208 except ValueError:
1208 except ValueError:
1209 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1209 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1210 % 'size')
1210 % 'size')
1211 except KeyError:
1211 except KeyError:
1212 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1212 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1213
1213
1214 digests = {}
1214 digests = {}
1215 for typ in inpart.params.get('digests', '').split():
1215 for typ in inpart.params.get('digests', '').split():
1216 param = 'digest:%s' % typ
1216 param = 'digest:%s' % typ
1217 try:
1217 try:
1218 value = inpart.params[param]
1218 value = inpart.params[param]
1219 except KeyError:
1219 except KeyError:
1220 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1220 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1221 param)
1221 param)
1222 digests[typ] = value
1222 digests[typ] = value
1223
1223
1224 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1224 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1225
1225
1226 # Make sure we trigger a transaction creation
1226 # Make sure we trigger a transaction creation
1227 #
1227 #
1228 # The addchangegroup function will get a transaction object by itself, but
1228 # The addchangegroup function will get a transaction object by itself, but
1229 # we need to make sure we trigger the creation of a transaction object used
1229 # we need to make sure we trigger the creation of a transaction object used
1230 # for the whole processing scope.
1230 # for the whole processing scope.
1231 op.gettransaction()
1231 op.gettransaction()
1232 import exchange
1232 import exchange
1233 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1233 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1234 if not isinstance(cg, changegroup.cg1unpacker):
1234 if not isinstance(cg, changegroup.cg1unpacker):
1235 raise util.Abort(_('%s: not a bundle version 1.0') %
1235 raise util.Abort(_('%s: not a bundle version 1.0') %
1236 util.hidepassword(raw_url))
1236 util.hidepassword(raw_url))
1237 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1237 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1238 op.records.add('changegroup', {'return': ret})
1238 op.records.add('changegroup', {'return': ret})
1239 if op.reply is not None:
1239 if op.reply is not None:
1240 # This is definitely not the final form of this
1240 # This is definitely not the final form of this
1241 # return. But one need to start somewhere.
1241 # return. But one need to start somewhere.
1242 part = op.reply.newpart('reply:changegroup')
1242 part = op.reply.newpart('reply:changegroup')
1243 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1243 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1244 part.addparam('return', '%i' % ret, mandatory=False)
1244 part.addparam('return', '%i' % ret, mandatory=False)
1245 try:
1245 try:
1246 real_part.validate()
1246 real_part.validate()
1247 except util.Abort, e:
1247 except util.Abort, e:
1248 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1248 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1249 (util.hidepassword(raw_url), str(e)))
1249 (util.hidepassword(raw_url), str(e)))
1250 assert not inpart.read()
1250 assert not inpart.read()
1251
1251
1252 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1252 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1253 def handlereplychangegroup(op, inpart):
1253 def handlereplychangegroup(op, inpart):
1254 ret = int(inpart.params['return'])
1254 ret = int(inpart.params['return'])
1255 replyto = int(inpart.params['in-reply-to'])
1255 replyto = int(inpart.params['in-reply-to'])
1256 op.records.add('changegroup', {'return': ret}, replyto)
1256 op.records.add('changegroup', {'return': ret}, replyto)
1257
1257
1258 @parthandler('check:heads')
1258 @parthandler('check:heads')
1259 def handlecheckheads(op, inpart):
1259 def handlecheckheads(op, inpart):
1260 """check that head of the repo did not change
1260 """check that head of the repo did not change
1261
1261
1262 This is used to detect a push race when using unbundle.
1262 This is used to detect a push race when using unbundle.
1263 This replaces the "heads" argument of unbundle."""
1263 This replaces the "heads" argument of unbundle."""
1264 h = inpart.read(20)
1264 h = inpart.read(20)
1265 heads = []
1265 heads = []
1266 while len(h) == 20:
1266 while len(h) == 20:
1267 heads.append(h)
1267 heads.append(h)
1268 h = inpart.read(20)
1268 h = inpart.read(20)
1269 assert not h
1269 assert not h
1270 if heads != op.repo.heads():
1270 if heads != op.repo.heads():
1271 raise error.PushRaced('repository changed while pushing - '
1271 raise error.PushRaced('repository changed while pushing - '
1272 'please try again')
1272 'please try again')
1273
1273
1274 @parthandler('output')
1274 @parthandler('output')
1275 def handleoutput(op, inpart):
1275 def handleoutput(op, inpart):
1276 """forward output captured on the server to the client"""
1276 """forward output captured on the server to the client"""
1277 for line in inpart.read().splitlines():
1277 for line in inpart.read().splitlines():
1278 op.ui.status(('remote: %s\n' % line))
1278 op.ui.status(('remote: %s\n' % line))
1279
1279
1280 @parthandler('replycaps')
1280 @parthandler('replycaps')
1281 def handlereplycaps(op, inpart):
1281 def handlereplycaps(op, inpart):
1282 """Notify that a reply bundle should be created
1282 """Notify that a reply bundle should be created
1283
1283
1284 The payload contains the capabilities information for the reply"""
1284 The payload contains the capabilities information for the reply"""
1285 caps = decodecaps(inpart.read())
1285 caps = decodecaps(inpart.read())
1286 if op.reply is None:
1286 if op.reply is None:
1287 op.reply = bundle20(op.ui, caps)
1287 op.reply = bundle20(op.ui, caps)
1288
1288
1289 @parthandler('error:abort', ('message', 'hint'))
1289 @parthandler('error:abort', ('message', 'hint'))
1290 def handleerrorabort(op, inpart):
1290 def handleerrorabort(op, inpart):
1291 """Used to transmit abort error over the wire"""
1291 """Used to transmit abort error over the wire"""
1292 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1292 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1293
1293
1294 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1294 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1295 'in-reply-to'))
1295 'in-reply-to'))
1296 def handleerrorpushkey(op, inpart):
1296 def handleerrorpushkey(op, inpart):
1297 """Used to transmit failure of a mandatory pushkey over the wire"""
1297 """Used to transmit failure of a mandatory pushkey over the wire"""
1298 kwargs = {}
1298 kwargs = {}
1299 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1299 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1300 value = inpart.params.get(name)
1300 value = inpart.params.get(name)
1301 if value is not None:
1301 if value is not None:
1302 kwargs[name] = value
1302 kwargs[name] = value
1303 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1303 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1304
1304
1305 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1305 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1306 def handleerrorunsupportedcontent(op, inpart):
1306 def handleerrorunsupportedcontent(op, inpart):
1307 """Used to transmit unknown content error over the wire"""
1307 """Used to transmit unknown content error over the wire"""
1308 kwargs = {}
1308 kwargs = {}
1309 parttype = inpart.params.get('parttype')
1309 parttype = inpart.params.get('parttype')
1310 if parttype is not None:
1310 if parttype is not None:
1311 kwargs['parttype'] = parttype
1311 kwargs['parttype'] = parttype
1312 params = inpart.params.get('params')
1312 params = inpart.params.get('params')
1313 if params is not None:
1313 if params is not None:
1314 kwargs['params'] = params.split('\0')
1314 kwargs['params'] = params.split('\0')
1315
1315
1316 raise error.UnsupportedPartError(**kwargs)
1316 raise error.UnsupportedPartError(**kwargs)
1317
1317
1318 @parthandler('error:pushraced', ('message',))
1318 @parthandler('error:pushraced', ('message',))
1319 def handleerrorpushraced(op, inpart):
1319 def handleerrorpushraced(op, inpart):
1320 """Used to transmit push race error over the wire"""
1320 """Used to transmit push race error over the wire"""
1321 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1321 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1322
1322
1323 @parthandler('listkeys', ('namespace',))
1323 @parthandler('listkeys', ('namespace',))
1324 def handlelistkeys(op, inpart):
1324 def handlelistkeys(op, inpart):
1325 """retrieve pushkey namespace content stored in a bundle2"""
1325 """retrieve pushkey namespace content stored in a bundle2"""
1326 namespace = inpart.params['namespace']
1326 namespace = inpart.params['namespace']
1327 r = pushkey.decodekeys(inpart.read())
1327 r = pushkey.decodekeys(inpart.read())
1328 op.records.add('listkeys', (namespace, r))
1328 op.records.add('listkeys', (namespace, r))
1329
1329
1330 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1330 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1331 def handlepushkey(op, inpart):
1331 def handlepushkey(op, inpart):
1332 """process a pushkey request"""
1332 """process a pushkey request"""
1333 dec = pushkey.decode
1333 dec = pushkey.decode
1334 namespace = dec(inpart.params['namespace'])
1334 namespace = dec(inpart.params['namespace'])
1335 key = dec(inpart.params['key'])
1335 key = dec(inpart.params['key'])
1336 old = dec(inpart.params['old'])
1336 old = dec(inpart.params['old'])
1337 new = dec(inpart.params['new'])
1337 new = dec(inpart.params['new'])
1338 ret = op.repo.pushkey(namespace, key, old, new)
1338 ret = op.repo.pushkey(namespace, key, old, new)
1339 record = {'namespace': namespace,
1339 record = {'namespace': namespace,
1340 'key': key,
1340 'key': key,
1341 'old': old,
1341 'old': old,
1342 'new': new}
1342 'new': new}
1343 op.records.add('pushkey', record)
1343 op.records.add('pushkey', record)
1344 if op.reply is not None:
1344 if op.reply is not None:
1345 rpart = op.reply.newpart('reply:pushkey')
1345 rpart = op.reply.newpart('reply:pushkey')
1346 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1346 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1347 rpart.addparam('return', '%i' % ret, mandatory=False)
1347 rpart.addparam('return', '%i' % ret, mandatory=False)
1348 if inpart.mandatory and not ret:
1348 if inpart.mandatory and not ret:
1349 kwargs = {}
1349 kwargs = {}
1350 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1350 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1351 if key in inpart.params:
1351 if key in inpart.params:
1352 kwargs[key] = inpart.params[key]
1352 kwargs[key] = inpart.params[key]
1353 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1353 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1354
1354
1355 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1355 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1356 def handlepushkeyreply(op, inpart):
1356 def handlepushkeyreply(op, inpart):
1357 """retrieve the result of a pushkey request"""
1357 """retrieve the result of a pushkey request"""
1358 ret = int(inpart.params['return'])
1358 ret = int(inpart.params['return'])
1359 partid = int(inpart.params['in-reply-to'])
1359 partid = int(inpart.params['in-reply-to'])
1360 op.records.add('pushkey', {'return': ret}, partid)
1360 op.records.add('pushkey', {'return': ret}, partid)
1361
1361
1362 @parthandler('obsmarkers')
1362 @parthandler('obsmarkers')
1363 def handleobsmarker(op, inpart):
1363 def handleobsmarker(op, inpart):
1364 """add a stream of obsmarkers to the repo"""
1364 """add a stream of obsmarkers to the repo"""
1365 tr = op.gettransaction()
1365 tr = op.gettransaction()
1366 markerdata = inpart.read()
1366 markerdata = inpart.read()
1367 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1367 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1368 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1368 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1369 % len(markerdata))
1369 % len(markerdata))
1370 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1370 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1371 if new:
1371 if new:
1372 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1372 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1373 op.records.add('obsmarkers', {'new': new})
1373 op.records.add('obsmarkers', {'new': new})
1374 if op.reply is not None:
1374 if op.reply is not None:
1375 rpart = op.reply.newpart('reply:obsmarkers')
1375 rpart = op.reply.newpart('reply:obsmarkers')
1376 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1376 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1377 rpart.addparam('new', '%i' % new, mandatory=False)
1377 rpart.addparam('new', '%i' % new, mandatory=False)
1378
1378
1379
1379
1380 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1380 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1381 def handleobsmarkerreply(op, inpart):
1381 def handleobsmarkerreply(op, inpart):
1382 """retrieve the result of a pushkey request"""
1382 """retrieve the result of a pushkey request"""
1383 ret = int(inpart.params['new'])
1383 ret = int(inpart.params['new'])
1384 partid = int(inpart.params['in-reply-to'])
1384 partid = int(inpart.params['in-reply-to'])
1385 op.records.add('obsmarkers', {'new': ret}, partid)
1385 op.records.add('obsmarkers', {'new': ret}, partid)
1386
1386
1387 @parthandler('hgtagsfnodes')
1387 @parthandler('hgtagsfnodes')
1388 def handlehgtagsfnodes(op, inpart):
1388 def handlehgtagsfnodes(op, inpart):
1389 """Applies .hgtags fnodes cache entries to the local repo.
1389 """Applies .hgtags fnodes cache entries to the local repo.
1390
1390
1391 Payload is pairs of 20 byte changeset nodes and filenodes.
1391 Payload is pairs of 20 byte changeset nodes and filenodes.
1392 """
1392 """
1393 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1393 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1394
1394
1395 count = 0
1395 count = 0
1396 while True:
1396 while True:
1397 node = inpart.read(20)
1397 node = inpart.read(20)
1398 fnode = inpart.read(20)
1398 fnode = inpart.read(20)
1399 if len(node) < 20 or len(fnode) < 20:
1399 if len(node) < 20 or len(fnode) < 20:
1400 op.ui.debug('received incomplete .hgtags fnodes data, ignoring\n')
1400 op.ui.debug('received incomplete .hgtags fnodes data, ignoring\n')
1401 break
1401 break
1402 cache.setfnode(node, fnode)
1402 cache.setfnode(node, fnode)
1403 count += 1
1403 count += 1
1404
1404
1405 cache.write()
1405 cache.write()
1406 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1406 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now