##// END OF EJS Templates
bundle2: localize handleoutput remote prompts...
Akihiko Odaki -
r29851:ccd436f7 3.9.1 stable
parent child Browse files
Show More
@@ -1,1618 +1,1618
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 pushkey,
161 pushkey,
162 tags,
162 tags,
163 url,
163 url,
164 util,
164 util,
165 )
165 )
166
166
167 urlerr = util.urlerr
167 urlerr = util.urlerr
168 urlreq = util.urlreq
168 urlreq = util.urlreq
169
169
170 _pack = struct.pack
170 _pack = struct.pack
171 _unpack = struct.unpack
171 _unpack = struct.unpack
172
172
173 _fstreamparamsize = '>i'
173 _fstreamparamsize = '>i'
174 _fpartheadersize = '>i'
174 _fpartheadersize = '>i'
175 _fparttypesize = '>B'
175 _fparttypesize = '>B'
176 _fpartid = '>I'
176 _fpartid = '>I'
177 _fpayloadsize = '>i'
177 _fpayloadsize = '>i'
178 _fpartparamcount = '>BB'
178 _fpartparamcount = '>BB'
179
179
180 preferedchunksize = 4096
180 preferedchunksize = 4096
181
181
182 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
182 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
183
183
184 def outdebug(ui, message):
184 def outdebug(ui, message):
185 """debug regarding output stream (bundling)"""
185 """debug regarding output stream (bundling)"""
186 if ui.configbool('devel', 'bundle2.debug', False):
186 if ui.configbool('devel', 'bundle2.debug', False):
187 ui.debug('bundle2-output: %s\n' % message)
187 ui.debug('bundle2-output: %s\n' % message)
188
188
189 def indebug(ui, message):
189 def indebug(ui, message):
190 """debug on input stream (unbundling)"""
190 """debug on input stream (unbundling)"""
191 if ui.configbool('devel', 'bundle2.debug', False):
191 if ui.configbool('devel', 'bundle2.debug', False):
192 ui.debug('bundle2-input: %s\n' % message)
192 ui.debug('bundle2-input: %s\n' % message)
193
193
194 def validateparttype(parttype):
194 def validateparttype(parttype):
195 """raise ValueError if a parttype contains invalid character"""
195 """raise ValueError if a parttype contains invalid character"""
196 if _parttypeforbidden.search(parttype):
196 if _parttypeforbidden.search(parttype):
197 raise ValueError(parttype)
197 raise ValueError(parttype)
198
198
199 def _makefpartparamsizes(nbparams):
199 def _makefpartparamsizes(nbparams):
200 """return a struct format to read part parameter sizes
200 """return a struct format to read part parameter sizes
201
201
202 The number parameters is variable so we need to build that format
202 The number parameters is variable so we need to build that format
203 dynamically.
203 dynamically.
204 """
204 """
205 return '>'+('BB'*nbparams)
205 return '>'+('BB'*nbparams)
206
206
207 parthandlermapping = {}
207 parthandlermapping = {}
208
208
209 def parthandler(parttype, params=()):
209 def parthandler(parttype, params=()):
210 """decorator that register a function as a bundle2 part handler
210 """decorator that register a function as a bundle2 part handler
211
211
212 eg::
212 eg::
213
213
214 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
214 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
215 def myparttypehandler(...):
215 def myparttypehandler(...):
216 '''process a part of type "my part".'''
216 '''process a part of type "my part".'''
217 ...
217 ...
218 """
218 """
219 validateparttype(parttype)
219 validateparttype(parttype)
220 def _decorator(func):
220 def _decorator(func):
221 lparttype = parttype.lower() # enforce lower case matching.
221 lparttype = parttype.lower() # enforce lower case matching.
222 assert lparttype not in parthandlermapping
222 assert lparttype not in parthandlermapping
223 parthandlermapping[lparttype] = func
223 parthandlermapping[lparttype] = func
224 func.params = frozenset(params)
224 func.params = frozenset(params)
225 return func
225 return func
226 return _decorator
226 return _decorator
227
227
228 class unbundlerecords(object):
228 class unbundlerecords(object):
229 """keep record of what happens during and unbundle
229 """keep record of what happens during and unbundle
230
230
231 New records are added using `records.add('cat', obj)`. Where 'cat' is a
231 New records are added using `records.add('cat', obj)`. Where 'cat' is a
232 category of record and obj is an arbitrary object.
232 category of record and obj is an arbitrary object.
233
233
234 `records['cat']` will return all entries of this category 'cat'.
234 `records['cat']` will return all entries of this category 'cat'.
235
235
236 Iterating on the object itself will yield `('category', obj)` tuples
236 Iterating on the object itself will yield `('category', obj)` tuples
237 for all entries.
237 for all entries.
238
238
239 All iterations happens in chronological order.
239 All iterations happens in chronological order.
240 """
240 """
241
241
242 def __init__(self):
242 def __init__(self):
243 self._categories = {}
243 self._categories = {}
244 self._sequences = []
244 self._sequences = []
245 self._replies = {}
245 self._replies = {}
246
246
247 def add(self, category, entry, inreplyto=None):
247 def add(self, category, entry, inreplyto=None):
248 """add a new record of a given category.
248 """add a new record of a given category.
249
249
250 The entry can then be retrieved in the list returned by
250 The entry can then be retrieved in the list returned by
251 self['category']."""
251 self['category']."""
252 self._categories.setdefault(category, []).append(entry)
252 self._categories.setdefault(category, []).append(entry)
253 self._sequences.append((category, entry))
253 self._sequences.append((category, entry))
254 if inreplyto is not None:
254 if inreplyto is not None:
255 self.getreplies(inreplyto).add(category, entry)
255 self.getreplies(inreplyto).add(category, entry)
256
256
257 def getreplies(self, partid):
257 def getreplies(self, partid):
258 """get the records that are replies to a specific part"""
258 """get the records that are replies to a specific part"""
259 return self._replies.setdefault(partid, unbundlerecords())
259 return self._replies.setdefault(partid, unbundlerecords())
260
260
261 def __getitem__(self, cat):
261 def __getitem__(self, cat):
262 return tuple(self._categories.get(cat, ()))
262 return tuple(self._categories.get(cat, ()))
263
263
264 def __iter__(self):
264 def __iter__(self):
265 return iter(self._sequences)
265 return iter(self._sequences)
266
266
267 def __len__(self):
267 def __len__(self):
268 return len(self._sequences)
268 return len(self._sequences)
269
269
270 def __nonzero__(self):
270 def __nonzero__(self):
271 return bool(self._sequences)
271 return bool(self._sequences)
272
272
273 class bundleoperation(object):
273 class bundleoperation(object):
274 """an object that represents a single bundling process
274 """an object that represents a single bundling process
275
275
276 Its purpose is to carry unbundle-related objects and states.
276 Its purpose is to carry unbundle-related objects and states.
277
277
278 A new object should be created at the beginning of each bundle processing.
278 A new object should be created at the beginning of each bundle processing.
279 The object is to be returned by the processing function.
279 The object is to be returned by the processing function.
280
280
281 The object has very little content now it will ultimately contain:
281 The object has very little content now it will ultimately contain:
282 * an access to the repo the bundle is applied to,
282 * an access to the repo the bundle is applied to,
283 * a ui object,
283 * a ui object,
284 * a way to retrieve a transaction to add changes to the repo,
284 * a way to retrieve a transaction to add changes to the repo,
285 * a way to record the result of processing each part,
285 * a way to record the result of processing each part,
286 * a way to construct a bundle response when applicable.
286 * a way to construct a bundle response when applicable.
287 """
287 """
288
288
289 def __init__(self, repo, transactiongetter, captureoutput=True):
289 def __init__(self, repo, transactiongetter, captureoutput=True):
290 self.repo = repo
290 self.repo = repo
291 self.ui = repo.ui
291 self.ui = repo.ui
292 self.records = unbundlerecords()
292 self.records = unbundlerecords()
293 self.gettransaction = transactiongetter
293 self.gettransaction = transactiongetter
294 self.reply = None
294 self.reply = None
295 self.captureoutput = captureoutput
295 self.captureoutput = captureoutput
296
296
297 class TransactionUnavailable(RuntimeError):
297 class TransactionUnavailable(RuntimeError):
298 pass
298 pass
299
299
300 def _notransaction():
300 def _notransaction():
301 """default method to get a transaction while processing a bundle
301 """default method to get a transaction while processing a bundle
302
302
303 Raise an exception to highlight the fact that no transaction was expected
303 Raise an exception to highlight the fact that no transaction was expected
304 to be created"""
304 to be created"""
305 raise TransactionUnavailable()
305 raise TransactionUnavailable()
306
306
307 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
307 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
308 # transform me into unbundler.apply() as soon as the freeze is lifted
308 # transform me into unbundler.apply() as soon as the freeze is lifted
309 tr.hookargs['bundle2'] = '1'
309 tr.hookargs['bundle2'] = '1'
310 if source is not None and 'source' not in tr.hookargs:
310 if source is not None and 'source' not in tr.hookargs:
311 tr.hookargs['source'] = source
311 tr.hookargs['source'] = source
312 if url is not None and 'url' not in tr.hookargs:
312 if url is not None and 'url' not in tr.hookargs:
313 tr.hookargs['url'] = url
313 tr.hookargs['url'] = url
314 return processbundle(repo, unbundler, lambda: tr, op=op)
314 return processbundle(repo, unbundler, lambda: tr, op=op)
315
315
316 def processbundle(repo, unbundler, transactiongetter=None, op=None):
316 def processbundle(repo, unbundler, transactiongetter=None, op=None):
317 """This function process a bundle, apply effect to/from a repo
317 """This function process a bundle, apply effect to/from a repo
318
318
319 It iterates over each part then searches for and uses the proper handling
319 It iterates over each part then searches for and uses the proper handling
320 code to process the part. Parts are processed in order.
320 code to process the part. Parts are processed in order.
321
321
322 This is very early version of this function that will be strongly reworked
322 This is very early version of this function that will be strongly reworked
323 before final usage.
323 before final usage.
324
324
325 Unknown Mandatory part will abort the process.
325 Unknown Mandatory part will abort the process.
326
326
327 It is temporarily possible to provide a prebuilt bundleoperation to the
327 It is temporarily possible to provide a prebuilt bundleoperation to the
328 function. This is used to ensure output is properly propagated in case of
328 function. This is used to ensure output is properly propagated in case of
329 an error during the unbundling. This output capturing part will likely be
329 an error during the unbundling. This output capturing part will likely be
330 reworked and this ability will probably go away in the process.
330 reworked and this ability will probably go away in the process.
331 """
331 """
332 if op is None:
332 if op is None:
333 if transactiongetter is None:
333 if transactiongetter is None:
334 transactiongetter = _notransaction
334 transactiongetter = _notransaction
335 op = bundleoperation(repo, transactiongetter)
335 op = bundleoperation(repo, transactiongetter)
336 # todo:
336 # todo:
337 # - replace this is a init function soon.
337 # - replace this is a init function soon.
338 # - exception catching
338 # - exception catching
339 unbundler.params
339 unbundler.params
340 if repo.ui.debugflag:
340 if repo.ui.debugflag:
341 msg = ['bundle2-input-bundle:']
341 msg = ['bundle2-input-bundle:']
342 if unbundler.params:
342 if unbundler.params:
343 msg.append(' %i params')
343 msg.append(' %i params')
344 if op.gettransaction is None:
344 if op.gettransaction is None:
345 msg.append(' no-transaction')
345 msg.append(' no-transaction')
346 else:
346 else:
347 msg.append(' with-transaction')
347 msg.append(' with-transaction')
348 msg.append('\n')
348 msg.append('\n')
349 repo.ui.debug(''.join(msg))
349 repo.ui.debug(''.join(msg))
350 iterparts = enumerate(unbundler.iterparts())
350 iterparts = enumerate(unbundler.iterparts())
351 part = None
351 part = None
352 nbpart = 0
352 nbpart = 0
353 try:
353 try:
354 for nbpart, part in iterparts:
354 for nbpart, part in iterparts:
355 _processpart(op, part)
355 _processpart(op, part)
356 except Exception as exc:
356 except Exception as exc:
357 for nbpart, part in iterparts:
357 for nbpart, part in iterparts:
358 # consume the bundle content
358 # consume the bundle content
359 part.seek(0, 2)
359 part.seek(0, 2)
360 # Small hack to let caller code distinguish exceptions from bundle2
360 # Small hack to let caller code distinguish exceptions from bundle2
361 # processing from processing the old format. This is mostly
361 # processing from processing the old format. This is mostly
362 # needed to handle different return codes to unbundle according to the
362 # needed to handle different return codes to unbundle according to the
363 # type of bundle. We should probably clean up or drop this return code
363 # type of bundle. We should probably clean up or drop this return code
364 # craziness in a future version.
364 # craziness in a future version.
365 exc.duringunbundle2 = True
365 exc.duringunbundle2 = True
366 salvaged = []
366 salvaged = []
367 replycaps = None
367 replycaps = None
368 if op.reply is not None:
368 if op.reply is not None:
369 salvaged = op.reply.salvageoutput()
369 salvaged = op.reply.salvageoutput()
370 replycaps = op.reply.capabilities
370 replycaps = op.reply.capabilities
371 exc._replycaps = replycaps
371 exc._replycaps = replycaps
372 exc._bundle2salvagedoutput = salvaged
372 exc._bundle2salvagedoutput = salvaged
373 raise
373 raise
374 finally:
374 finally:
375 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
375 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
376
376
377 return op
377 return op
378
378
379 def _processpart(op, part):
379 def _processpart(op, part):
380 """process a single part from a bundle
380 """process a single part from a bundle
381
381
382 The part is guaranteed to have been fully consumed when the function exits
382 The part is guaranteed to have been fully consumed when the function exits
383 (even if an exception is raised)."""
383 (even if an exception is raised)."""
384 status = 'unknown' # used by debug output
384 status = 'unknown' # used by debug output
385 hardabort = False
385 hardabort = False
386 try:
386 try:
387 try:
387 try:
388 handler = parthandlermapping.get(part.type)
388 handler = parthandlermapping.get(part.type)
389 if handler is None:
389 if handler is None:
390 status = 'unsupported-type'
390 status = 'unsupported-type'
391 raise error.BundleUnknownFeatureError(parttype=part.type)
391 raise error.BundleUnknownFeatureError(parttype=part.type)
392 indebug(op.ui, 'found a handler for part %r' % part.type)
392 indebug(op.ui, 'found a handler for part %r' % part.type)
393 unknownparams = part.mandatorykeys - handler.params
393 unknownparams = part.mandatorykeys - handler.params
394 if unknownparams:
394 if unknownparams:
395 unknownparams = list(unknownparams)
395 unknownparams = list(unknownparams)
396 unknownparams.sort()
396 unknownparams.sort()
397 status = 'unsupported-params (%s)' % unknownparams
397 status = 'unsupported-params (%s)' % unknownparams
398 raise error.BundleUnknownFeatureError(parttype=part.type,
398 raise error.BundleUnknownFeatureError(parttype=part.type,
399 params=unknownparams)
399 params=unknownparams)
400 status = 'supported'
400 status = 'supported'
401 except error.BundleUnknownFeatureError as exc:
401 except error.BundleUnknownFeatureError as exc:
402 if part.mandatory: # mandatory parts
402 if part.mandatory: # mandatory parts
403 raise
403 raise
404 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
404 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
405 return # skip to part processing
405 return # skip to part processing
406 finally:
406 finally:
407 if op.ui.debugflag:
407 if op.ui.debugflag:
408 msg = ['bundle2-input-part: "%s"' % part.type]
408 msg = ['bundle2-input-part: "%s"' % part.type]
409 if not part.mandatory:
409 if not part.mandatory:
410 msg.append(' (advisory)')
410 msg.append(' (advisory)')
411 nbmp = len(part.mandatorykeys)
411 nbmp = len(part.mandatorykeys)
412 nbap = len(part.params) - nbmp
412 nbap = len(part.params) - nbmp
413 if nbmp or nbap:
413 if nbmp or nbap:
414 msg.append(' (params:')
414 msg.append(' (params:')
415 if nbmp:
415 if nbmp:
416 msg.append(' %i mandatory' % nbmp)
416 msg.append(' %i mandatory' % nbmp)
417 if nbap:
417 if nbap:
418 msg.append(' %i advisory' % nbmp)
418 msg.append(' %i advisory' % nbmp)
419 msg.append(')')
419 msg.append(')')
420 msg.append(' %s\n' % status)
420 msg.append(' %s\n' % status)
421 op.ui.debug(''.join(msg))
421 op.ui.debug(''.join(msg))
422
422
423 # handler is called outside the above try block so that we don't
423 # handler is called outside the above try block so that we don't
424 # risk catching KeyErrors from anything other than the
424 # risk catching KeyErrors from anything other than the
425 # parthandlermapping lookup (any KeyError raised by handler()
425 # parthandlermapping lookup (any KeyError raised by handler()
426 # itself represents a defect of a different variety).
426 # itself represents a defect of a different variety).
427 output = None
427 output = None
428 if op.captureoutput and op.reply is not None:
428 if op.captureoutput and op.reply is not None:
429 op.ui.pushbuffer(error=True, subproc=True)
429 op.ui.pushbuffer(error=True, subproc=True)
430 output = ''
430 output = ''
431 try:
431 try:
432 handler(op, part)
432 handler(op, part)
433 finally:
433 finally:
434 if output is not None:
434 if output is not None:
435 output = op.ui.popbuffer()
435 output = op.ui.popbuffer()
436 if output:
436 if output:
437 outpart = op.reply.newpart('output', data=output,
437 outpart = op.reply.newpart('output', data=output,
438 mandatory=False)
438 mandatory=False)
439 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
439 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
440 # If exiting or interrupted, do not attempt to seek the stream in the
440 # If exiting or interrupted, do not attempt to seek the stream in the
441 # finally block below. This makes abort faster.
441 # finally block below. This makes abort faster.
442 except (SystemExit, KeyboardInterrupt):
442 except (SystemExit, KeyboardInterrupt):
443 hardabort = True
443 hardabort = True
444 raise
444 raise
445 finally:
445 finally:
446 # consume the part content to not corrupt the stream.
446 # consume the part content to not corrupt the stream.
447 if not hardabort:
447 if not hardabort:
448 part.seek(0, 2)
448 part.seek(0, 2)
449
449
450
450
451 def decodecaps(blob):
451 def decodecaps(blob):
452 """decode a bundle2 caps bytes blob into a dictionary
452 """decode a bundle2 caps bytes blob into a dictionary
453
453
454 The blob is a list of capabilities (one per line)
454 The blob is a list of capabilities (one per line)
455 Capabilities may have values using a line of the form::
455 Capabilities may have values using a line of the form::
456
456
457 capability=value1,value2,value3
457 capability=value1,value2,value3
458
458
459 The values are always a list."""
459 The values are always a list."""
460 caps = {}
460 caps = {}
461 for line in blob.splitlines():
461 for line in blob.splitlines():
462 if not line:
462 if not line:
463 continue
463 continue
464 if '=' not in line:
464 if '=' not in line:
465 key, vals = line, ()
465 key, vals = line, ()
466 else:
466 else:
467 key, vals = line.split('=', 1)
467 key, vals = line.split('=', 1)
468 vals = vals.split(',')
468 vals = vals.split(',')
469 key = urlreq.unquote(key)
469 key = urlreq.unquote(key)
470 vals = [urlreq.unquote(v) for v in vals]
470 vals = [urlreq.unquote(v) for v in vals]
471 caps[key] = vals
471 caps[key] = vals
472 return caps
472 return caps
473
473
474 def encodecaps(caps):
474 def encodecaps(caps):
475 """encode a bundle2 caps dictionary into a bytes blob"""
475 """encode a bundle2 caps dictionary into a bytes blob"""
476 chunks = []
476 chunks = []
477 for ca in sorted(caps):
477 for ca in sorted(caps):
478 vals = caps[ca]
478 vals = caps[ca]
479 ca = urlreq.quote(ca)
479 ca = urlreq.quote(ca)
480 vals = [urlreq.quote(v) for v in vals]
480 vals = [urlreq.quote(v) for v in vals]
481 if vals:
481 if vals:
482 ca = "%s=%s" % (ca, ','.join(vals))
482 ca = "%s=%s" % (ca, ','.join(vals))
483 chunks.append(ca)
483 chunks.append(ca)
484 return '\n'.join(chunks)
484 return '\n'.join(chunks)
485
485
486 bundletypes = {
486 bundletypes = {
487 "": ("", None), # only when using unbundle on ssh and old http servers
487 "": ("", None), # only when using unbundle on ssh and old http servers
488 # since the unification ssh accepts a header but there
488 # since the unification ssh accepts a header but there
489 # is no capability signaling it.
489 # is no capability signaling it.
490 "HG20": (), # special-cased below
490 "HG20": (), # special-cased below
491 "HG10UN": ("HG10UN", None),
491 "HG10UN": ("HG10UN", None),
492 "HG10BZ": ("HG10", 'BZ'),
492 "HG10BZ": ("HG10", 'BZ'),
493 "HG10GZ": ("HG10GZ", 'GZ'),
493 "HG10GZ": ("HG10GZ", 'GZ'),
494 }
494 }
495
495
496 # hgweb uses this list to communicate its preferred type
496 # hgweb uses this list to communicate its preferred type
497 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
497 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
498
498
499 class bundle20(object):
499 class bundle20(object):
500 """represent an outgoing bundle2 container
500 """represent an outgoing bundle2 container
501
501
502 Use the `addparam` method to add stream level parameter. and `newpart` to
502 Use the `addparam` method to add stream level parameter. and `newpart` to
503 populate it. Then call `getchunks` to retrieve all the binary chunks of
503 populate it. Then call `getchunks` to retrieve all the binary chunks of
504 data that compose the bundle2 container."""
504 data that compose the bundle2 container."""
505
505
506 _magicstring = 'HG20'
506 _magicstring = 'HG20'
507
507
508 def __init__(self, ui, capabilities=()):
508 def __init__(self, ui, capabilities=()):
509 self.ui = ui
509 self.ui = ui
510 self._params = []
510 self._params = []
511 self._parts = []
511 self._parts = []
512 self.capabilities = dict(capabilities)
512 self.capabilities = dict(capabilities)
513 self._compressor = util.compressors[None]()
513 self._compressor = util.compressors[None]()
514
514
515 def setcompression(self, alg):
515 def setcompression(self, alg):
516 """setup core part compression to <alg>"""
516 """setup core part compression to <alg>"""
517 if alg is None:
517 if alg is None:
518 return
518 return
519 assert not any(n.lower() == 'Compression' for n, v in self._params)
519 assert not any(n.lower() == 'Compression' for n, v in self._params)
520 self.addparam('Compression', alg)
520 self.addparam('Compression', alg)
521 self._compressor = util.compressors[alg]()
521 self._compressor = util.compressors[alg]()
522
522
523 @property
523 @property
524 def nbparts(self):
524 def nbparts(self):
525 """total number of parts added to the bundler"""
525 """total number of parts added to the bundler"""
526 return len(self._parts)
526 return len(self._parts)
527
527
528 # methods used to defines the bundle2 content
528 # methods used to defines the bundle2 content
529 def addparam(self, name, value=None):
529 def addparam(self, name, value=None):
530 """add a stream level parameter"""
530 """add a stream level parameter"""
531 if not name:
531 if not name:
532 raise ValueError('empty parameter name')
532 raise ValueError('empty parameter name')
533 if name[0] not in string.letters:
533 if name[0] not in string.letters:
534 raise ValueError('non letter first character: %r' % name)
534 raise ValueError('non letter first character: %r' % name)
535 self._params.append((name, value))
535 self._params.append((name, value))
536
536
537 def addpart(self, part):
537 def addpart(self, part):
538 """add a new part to the bundle2 container
538 """add a new part to the bundle2 container
539
539
540 Parts contains the actual applicative payload."""
540 Parts contains the actual applicative payload."""
541 assert part.id is None
541 assert part.id is None
542 part.id = len(self._parts) # very cheap counter
542 part.id = len(self._parts) # very cheap counter
543 self._parts.append(part)
543 self._parts.append(part)
544
544
545 def newpart(self, typeid, *args, **kwargs):
545 def newpart(self, typeid, *args, **kwargs):
546 """create a new part and add it to the containers
546 """create a new part and add it to the containers
547
547
548 As the part is directly added to the containers. For now, this means
548 As the part is directly added to the containers. For now, this means
549 that any failure to properly initialize the part after calling
549 that any failure to properly initialize the part after calling
550 ``newpart`` should result in a failure of the whole bundling process.
550 ``newpart`` should result in a failure of the whole bundling process.
551
551
552 You can still fall back to manually create and add if you need better
552 You can still fall back to manually create and add if you need better
553 control."""
553 control."""
554 part = bundlepart(typeid, *args, **kwargs)
554 part = bundlepart(typeid, *args, **kwargs)
555 self.addpart(part)
555 self.addpart(part)
556 return part
556 return part
557
557
558 # methods used to generate the bundle2 stream
558 # methods used to generate the bundle2 stream
559 def getchunks(self):
559 def getchunks(self):
560 if self.ui.debugflag:
560 if self.ui.debugflag:
561 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
561 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
562 if self._params:
562 if self._params:
563 msg.append(' (%i params)' % len(self._params))
563 msg.append(' (%i params)' % len(self._params))
564 msg.append(' %i parts total\n' % len(self._parts))
564 msg.append(' %i parts total\n' % len(self._parts))
565 self.ui.debug(''.join(msg))
565 self.ui.debug(''.join(msg))
566 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
566 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
567 yield self._magicstring
567 yield self._magicstring
568 param = self._paramchunk()
568 param = self._paramchunk()
569 outdebug(self.ui, 'bundle parameter: %s' % param)
569 outdebug(self.ui, 'bundle parameter: %s' % param)
570 yield _pack(_fstreamparamsize, len(param))
570 yield _pack(_fstreamparamsize, len(param))
571 if param:
571 if param:
572 yield param
572 yield param
573 # starting compression
573 # starting compression
574 for chunk in self._getcorechunk():
574 for chunk in self._getcorechunk():
575 yield self._compressor.compress(chunk)
575 yield self._compressor.compress(chunk)
576 yield self._compressor.flush()
576 yield self._compressor.flush()
577
577
578 def _paramchunk(self):
578 def _paramchunk(self):
579 """return a encoded version of all stream parameters"""
579 """return a encoded version of all stream parameters"""
580 blocks = []
580 blocks = []
581 for par, value in self._params:
581 for par, value in self._params:
582 par = urlreq.quote(par)
582 par = urlreq.quote(par)
583 if value is not None:
583 if value is not None:
584 value = urlreq.quote(value)
584 value = urlreq.quote(value)
585 par = '%s=%s' % (par, value)
585 par = '%s=%s' % (par, value)
586 blocks.append(par)
586 blocks.append(par)
587 return ' '.join(blocks)
587 return ' '.join(blocks)
588
588
589 def _getcorechunk(self):
589 def _getcorechunk(self):
590 """yield chunk for the core part of the bundle
590 """yield chunk for the core part of the bundle
591
591
592 (all but headers and parameters)"""
592 (all but headers and parameters)"""
593 outdebug(self.ui, 'start of parts')
593 outdebug(self.ui, 'start of parts')
594 for part in self._parts:
594 for part in self._parts:
595 outdebug(self.ui, 'bundle part: "%s"' % part.type)
595 outdebug(self.ui, 'bundle part: "%s"' % part.type)
596 for chunk in part.getchunks(ui=self.ui):
596 for chunk in part.getchunks(ui=self.ui):
597 yield chunk
597 yield chunk
598 outdebug(self.ui, 'end of bundle')
598 outdebug(self.ui, 'end of bundle')
599 yield _pack(_fpartheadersize, 0)
599 yield _pack(_fpartheadersize, 0)
600
600
601
601
602 def salvageoutput(self):
602 def salvageoutput(self):
603 """return a list with a copy of all output parts in the bundle
603 """return a list with a copy of all output parts in the bundle
604
604
605 This is meant to be used during error handling to make sure we preserve
605 This is meant to be used during error handling to make sure we preserve
606 server output"""
606 server output"""
607 salvaged = []
607 salvaged = []
608 for part in self._parts:
608 for part in self._parts:
609 if part.type.startswith('output'):
609 if part.type.startswith('output'):
610 salvaged.append(part.copy())
610 salvaged.append(part.copy())
611 return salvaged
611 return salvaged
612
612
613
613
614 class unpackermixin(object):
614 class unpackermixin(object):
615 """A mixin to extract bytes and struct data from a stream"""
615 """A mixin to extract bytes and struct data from a stream"""
616
616
617 def __init__(self, fp):
617 def __init__(self, fp):
618 self._fp = fp
618 self._fp = fp
619 self._seekable = (util.safehasattr(fp, 'seek') and
619 self._seekable = (util.safehasattr(fp, 'seek') and
620 util.safehasattr(fp, 'tell'))
620 util.safehasattr(fp, 'tell'))
621
621
622 def _unpack(self, format):
622 def _unpack(self, format):
623 """unpack this struct format from the stream"""
623 """unpack this struct format from the stream"""
624 data = self._readexact(struct.calcsize(format))
624 data = self._readexact(struct.calcsize(format))
625 return _unpack(format, data)
625 return _unpack(format, data)
626
626
627 def _readexact(self, size):
627 def _readexact(self, size):
628 """read exactly <size> bytes from the stream"""
628 """read exactly <size> bytes from the stream"""
629 return changegroup.readexactly(self._fp, size)
629 return changegroup.readexactly(self._fp, size)
630
630
631 def seek(self, offset, whence=0):
631 def seek(self, offset, whence=0):
632 """move the underlying file pointer"""
632 """move the underlying file pointer"""
633 if self._seekable:
633 if self._seekable:
634 return self._fp.seek(offset, whence)
634 return self._fp.seek(offset, whence)
635 else:
635 else:
636 raise NotImplementedError(_('File pointer is not seekable'))
636 raise NotImplementedError(_('File pointer is not seekable'))
637
637
638 def tell(self):
638 def tell(self):
639 """return the file offset, or None if file is not seekable"""
639 """return the file offset, or None if file is not seekable"""
640 if self._seekable:
640 if self._seekable:
641 try:
641 try:
642 return self._fp.tell()
642 return self._fp.tell()
643 except IOError as e:
643 except IOError as e:
644 if e.errno == errno.ESPIPE:
644 if e.errno == errno.ESPIPE:
645 self._seekable = False
645 self._seekable = False
646 else:
646 else:
647 raise
647 raise
648 return None
648 return None
649
649
650 def close(self):
650 def close(self):
651 """close underlying file"""
651 """close underlying file"""
652 if util.safehasattr(self._fp, 'close'):
652 if util.safehasattr(self._fp, 'close'):
653 return self._fp.close()
653 return self._fp.close()
654
654
655 def getunbundler(ui, fp, magicstring=None):
655 def getunbundler(ui, fp, magicstring=None):
656 """return a valid unbundler object for a given magicstring"""
656 """return a valid unbundler object for a given magicstring"""
657 if magicstring is None:
657 if magicstring is None:
658 magicstring = changegroup.readexactly(fp, 4)
658 magicstring = changegroup.readexactly(fp, 4)
659 magic, version = magicstring[0:2], magicstring[2:4]
659 magic, version = magicstring[0:2], magicstring[2:4]
660 if magic != 'HG':
660 if magic != 'HG':
661 raise error.Abort(_('not a Mercurial bundle'))
661 raise error.Abort(_('not a Mercurial bundle'))
662 unbundlerclass = formatmap.get(version)
662 unbundlerclass = formatmap.get(version)
663 if unbundlerclass is None:
663 if unbundlerclass is None:
664 raise error.Abort(_('unknown bundle version %s') % version)
664 raise error.Abort(_('unknown bundle version %s') % version)
665 unbundler = unbundlerclass(ui, fp)
665 unbundler = unbundlerclass(ui, fp)
666 indebug(ui, 'start processing of %s stream' % magicstring)
666 indebug(ui, 'start processing of %s stream' % magicstring)
667 return unbundler
667 return unbundler
668
668
669 class unbundle20(unpackermixin):
669 class unbundle20(unpackermixin):
670 """interpret a bundle2 stream
670 """interpret a bundle2 stream
671
671
672 This class is fed with a binary stream and yields parts through its
672 This class is fed with a binary stream and yields parts through its
673 `iterparts` methods."""
673 `iterparts` methods."""
674
674
675 _magicstring = 'HG20'
675 _magicstring = 'HG20'
676
676
677 def __init__(self, ui, fp):
677 def __init__(self, ui, fp):
678 """If header is specified, we do not read it out of the stream."""
678 """If header is specified, we do not read it out of the stream."""
679 self.ui = ui
679 self.ui = ui
680 self._decompressor = util.decompressors[None]
680 self._decompressor = util.decompressors[None]
681 self._compressed = None
681 self._compressed = None
682 super(unbundle20, self).__init__(fp)
682 super(unbundle20, self).__init__(fp)
683
683
684 @util.propertycache
684 @util.propertycache
685 def params(self):
685 def params(self):
686 """dictionary of stream level parameters"""
686 """dictionary of stream level parameters"""
687 indebug(self.ui, 'reading bundle2 stream parameters')
687 indebug(self.ui, 'reading bundle2 stream parameters')
688 params = {}
688 params = {}
689 paramssize = self._unpack(_fstreamparamsize)[0]
689 paramssize = self._unpack(_fstreamparamsize)[0]
690 if paramssize < 0:
690 if paramssize < 0:
691 raise error.BundleValueError('negative bundle param size: %i'
691 raise error.BundleValueError('negative bundle param size: %i'
692 % paramssize)
692 % paramssize)
693 if paramssize:
693 if paramssize:
694 params = self._readexact(paramssize)
694 params = self._readexact(paramssize)
695 params = self._processallparams(params)
695 params = self._processallparams(params)
696 return params
696 return params
697
697
698 def _processallparams(self, paramsblock):
698 def _processallparams(self, paramsblock):
699 """"""
699 """"""
700 params = util.sortdict()
700 params = util.sortdict()
701 for p in paramsblock.split(' '):
701 for p in paramsblock.split(' '):
702 p = p.split('=', 1)
702 p = p.split('=', 1)
703 p = [urlreq.unquote(i) for i in p]
703 p = [urlreq.unquote(i) for i in p]
704 if len(p) < 2:
704 if len(p) < 2:
705 p.append(None)
705 p.append(None)
706 self._processparam(*p)
706 self._processparam(*p)
707 params[p[0]] = p[1]
707 params[p[0]] = p[1]
708 return params
708 return params
709
709
710
710
711 def _processparam(self, name, value):
711 def _processparam(self, name, value):
712 """process a parameter, applying its effect if needed
712 """process a parameter, applying its effect if needed
713
713
714 Parameter starting with a lower case letter are advisory and will be
714 Parameter starting with a lower case letter are advisory and will be
715 ignored when unknown. Those starting with an upper case letter are
715 ignored when unknown. Those starting with an upper case letter are
716 mandatory and will this function will raise a KeyError when unknown.
716 mandatory and will this function will raise a KeyError when unknown.
717
717
718 Note: no option are currently supported. Any input will be either
718 Note: no option are currently supported. Any input will be either
719 ignored or failing.
719 ignored or failing.
720 """
720 """
721 if not name:
721 if not name:
722 raise ValueError('empty parameter name')
722 raise ValueError('empty parameter name')
723 if name[0] not in string.letters:
723 if name[0] not in string.letters:
724 raise ValueError('non letter first character: %r' % name)
724 raise ValueError('non letter first character: %r' % name)
725 try:
725 try:
726 handler = b2streamparamsmap[name.lower()]
726 handler = b2streamparamsmap[name.lower()]
727 except KeyError:
727 except KeyError:
728 if name[0].islower():
728 if name[0].islower():
729 indebug(self.ui, "ignoring unknown parameter %r" % name)
729 indebug(self.ui, "ignoring unknown parameter %r" % name)
730 else:
730 else:
731 raise error.BundleUnknownFeatureError(params=(name,))
731 raise error.BundleUnknownFeatureError(params=(name,))
732 else:
732 else:
733 handler(self, name, value)
733 handler(self, name, value)
734
734
735 def _forwardchunks(self):
735 def _forwardchunks(self):
736 """utility to transfer a bundle2 as binary
736 """utility to transfer a bundle2 as binary
737
737
738 This is made necessary by the fact the 'getbundle' command over 'ssh'
738 This is made necessary by the fact the 'getbundle' command over 'ssh'
739 have no way to know then the reply end, relying on the bundle to be
739 have no way to know then the reply end, relying on the bundle to be
740 interpreted to know its end. This is terrible and we are sorry, but we
740 interpreted to know its end. This is terrible and we are sorry, but we
741 needed to move forward to get general delta enabled.
741 needed to move forward to get general delta enabled.
742 """
742 """
743 yield self._magicstring
743 yield self._magicstring
744 assert 'params' not in vars(self)
744 assert 'params' not in vars(self)
745 paramssize = self._unpack(_fstreamparamsize)[0]
745 paramssize = self._unpack(_fstreamparamsize)[0]
746 if paramssize < 0:
746 if paramssize < 0:
747 raise error.BundleValueError('negative bundle param size: %i'
747 raise error.BundleValueError('negative bundle param size: %i'
748 % paramssize)
748 % paramssize)
749 yield _pack(_fstreamparamsize, paramssize)
749 yield _pack(_fstreamparamsize, paramssize)
750 if paramssize:
750 if paramssize:
751 params = self._readexact(paramssize)
751 params = self._readexact(paramssize)
752 self._processallparams(params)
752 self._processallparams(params)
753 yield params
753 yield params
754 assert self._decompressor is util.decompressors[None]
754 assert self._decompressor is util.decompressors[None]
755 # From there, payload might need to be decompressed
755 # From there, payload might need to be decompressed
756 self._fp = self._decompressor(self._fp)
756 self._fp = self._decompressor(self._fp)
757 emptycount = 0
757 emptycount = 0
758 while emptycount < 2:
758 while emptycount < 2:
759 # so we can brainlessly loop
759 # so we can brainlessly loop
760 assert _fpartheadersize == _fpayloadsize
760 assert _fpartheadersize == _fpayloadsize
761 size = self._unpack(_fpartheadersize)[0]
761 size = self._unpack(_fpartheadersize)[0]
762 yield _pack(_fpartheadersize, size)
762 yield _pack(_fpartheadersize, size)
763 if size:
763 if size:
764 emptycount = 0
764 emptycount = 0
765 else:
765 else:
766 emptycount += 1
766 emptycount += 1
767 continue
767 continue
768 if size == flaginterrupt:
768 if size == flaginterrupt:
769 continue
769 continue
770 elif size < 0:
770 elif size < 0:
771 raise error.BundleValueError('negative chunk size: %i')
771 raise error.BundleValueError('negative chunk size: %i')
772 yield self._readexact(size)
772 yield self._readexact(size)
773
773
774
774
775 def iterparts(self):
775 def iterparts(self):
776 """yield all parts contained in the stream"""
776 """yield all parts contained in the stream"""
777 # make sure param have been loaded
777 # make sure param have been loaded
778 self.params
778 self.params
779 # From there, payload need to be decompressed
779 # From there, payload need to be decompressed
780 self._fp = self._decompressor(self._fp)
780 self._fp = self._decompressor(self._fp)
781 indebug(self.ui, 'start extraction of bundle2 parts')
781 indebug(self.ui, 'start extraction of bundle2 parts')
782 headerblock = self._readpartheader()
782 headerblock = self._readpartheader()
783 while headerblock is not None:
783 while headerblock is not None:
784 part = unbundlepart(self.ui, headerblock, self._fp)
784 part = unbundlepart(self.ui, headerblock, self._fp)
785 yield part
785 yield part
786 part.seek(0, 2)
786 part.seek(0, 2)
787 headerblock = self._readpartheader()
787 headerblock = self._readpartheader()
788 indebug(self.ui, 'end of bundle2 stream')
788 indebug(self.ui, 'end of bundle2 stream')
789
789
790 def _readpartheader(self):
790 def _readpartheader(self):
791 """reads a part header size and return the bytes blob
791 """reads a part header size and return the bytes blob
792
792
793 returns None if empty"""
793 returns None if empty"""
794 headersize = self._unpack(_fpartheadersize)[0]
794 headersize = self._unpack(_fpartheadersize)[0]
795 if headersize < 0:
795 if headersize < 0:
796 raise error.BundleValueError('negative part header size: %i'
796 raise error.BundleValueError('negative part header size: %i'
797 % headersize)
797 % headersize)
798 indebug(self.ui, 'part header size: %i' % headersize)
798 indebug(self.ui, 'part header size: %i' % headersize)
799 if headersize:
799 if headersize:
800 return self._readexact(headersize)
800 return self._readexact(headersize)
801 return None
801 return None
802
802
803 def compressed(self):
803 def compressed(self):
804 self.params # load params
804 self.params # load params
805 return self._compressed
805 return self._compressed
806
806
807 formatmap = {'20': unbundle20}
807 formatmap = {'20': unbundle20}
808
808
809 b2streamparamsmap = {}
809 b2streamparamsmap = {}
810
810
811 def b2streamparamhandler(name):
811 def b2streamparamhandler(name):
812 """register a handler for a stream level parameter"""
812 """register a handler for a stream level parameter"""
813 def decorator(func):
813 def decorator(func):
814 assert name not in formatmap
814 assert name not in formatmap
815 b2streamparamsmap[name] = func
815 b2streamparamsmap[name] = func
816 return func
816 return func
817 return decorator
817 return decorator
818
818
819 @b2streamparamhandler('compression')
819 @b2streamparamhandler('compression')
820 def processcompression(unbundler, param, value):
820 def processcompression(unbundler, param, value):
821 """read compression parameter and install payload decompression"""
821 """read compression parameter and install payload decompression"""
822 if value not in util.decompressors:
822 if value not in util.decompressors:
823 raise error.BundleUnknownFeatureError(params=(param,),
823 raise error.BundleUnknownFeatureError(params=(param,),
824 values=(value,))
824 values=(value,))
825 unbundler._decompressor = util.decompressors[value]
825 unbundler._decompressor = util.decompressors[value]
826 if value is not None:
826 if value is not None:
827 unbundler._compressed = True
827 unbundler._compressed = True
828
828
829 class bundlepart(object):
829 class bundlepart(object):
830 """A bundle2 part contains application level payload
830 """A bundle2 part contains application level payload
831
831
832 The part `type` is used to route the part to the application level
832 The part `type` is used to route the part to the application level
833 handler.
833 handler.
834
834
835 The part payload is contained in ``part.data``. It could be raw bytes or a
835 The part payload is contained in ``part.data``. It could be raw bytes or a
836 generator of byte chunks.
836 generator of byte chunks.
837
837
838 You can add parameters to the part using the ``addparam`` method.
838 You can add parameters to the part using the ``addparam`` method.
839 Parameters can be either mandatory (default) or advisory. Remote side
839 Parameters can be either mandatory (default) or advisory. Remote side
840 should be able to safely ignore the advisory ones.
840 should be able to safely ignore the advisory ones.
841
841
842 Both data and parameters cannot be modified after the generation has begun.
842 Both data and parameters cannot be modified after the generation has begun.
843 """
843 """
844
844
845 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
845 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
846 data='', mandatory=True):
846 data='', mandatory=True):
847 validateparttype(parttype)
847 validateparttype(parttype)
848 self.id = None
848 self.id = None
849 self.type = parttype
849 self.type = parttype
850 self._data = data
850 self._data = data
851 self._mandatoryparams = list(mandatoryparams)
851 self._mandatoryparams = list(mandatoryparams)
852 self._advisoryparams = list(advisoryparams)
852 self._advisoryparams = list(advisoryparams)
853 # checking for duplicated entries
853 # checking for duplicated entries
854 self._seenparams = set()
854 self._seenparams = set()
855 for pname, __ in self._mandatoryparams + self._advisoryparams:
855 for pname, __ in self._mandatoryparams + self._advisoryparams:
856 if pname in self._seenparams:
856 if pname in self._seenparams:
857 raise RuntimeError('duplicated params: %s' % pname)
857 raise RuntimeError('duplicated params: %s' % pname)
858 self._seenparams.add(pname)
858 self._seenparams.add(pname)
859 # status of the part's generation:
859 # status of the part's generation:
860 # - None: not started,
860 # - None: not started,
861 # - False: currently generated,
861 # - False: currently generated,
862 # - True: generation done.
862 # - True: generation done.
863 self._generated = None
863 self._generated = None
864 self.mandatory = mandatory
864 self.mandatory = mandatory
865
865
866 def copy(self):
866 def copy(self):
867 """return a copy of the part
867 """return a copy of the part
868
868
869 The new part have the very same content but no partid assigned yet.
869 The new part have the very same content but no partid assigned yet.
870 Parts with generated data cannot be copied."""
870 Parts with generated data cannot be copied."""
871 assert not util.safehasattr(self.data, 'next')
871 assert not util.safehasattr(self.data, 'next')
872 return self.__class__(self.type, self._mandatoryparams,
872 return self.__class__(self.type, self._mandatoryparams,
873 self._advisoryparams, self._data, self.mandatory)
873 self._advisoryparams, self._data, self.mandatory)
874
874
875 # methods used to defines the part content
875 # methods used to defines the part content
876 @property
876 @property
877 def data(self):
877 def data(self):
878 return self._data
878 return self._data
879
879
880 @data.setter
880 @data.setter
881 def data(self, data):
881 def data(self, data):
882 if self._generated is not None:
882 if self._generated is not None:
883 raise error.ReadOnlyPartError('part is being generated')
883 raise error.ReadOnlyPartError('part is being generated')
884 self._data = data
884 self._data = data
885
885
886 @property
886 @property
887 def mandatoryparams(self):
887 def mandatoryparams(self):
888 # make it an immutable tuple to force people through ``addparam``
888 # make it an immutable tuple to force people through ``addparam``
889 return tuple(self._mandatoryparams)
889 return tuple(self._mandatoryparams)
890
890
891 @property
891 @property
892 def advisoryparams(self):
892 def advisoryparams(self):
893 # make it an immutable tuple to force people through ``addparam``
893 # make it an immutable tuple to force people through ``addparam``
894 return tuple(self._advisoryparams)
894 return tuple(self._advisoryparams)
895
895
896 def addparam(self, name, value='', mandatory=True):
896 def addparam(self, name, value='', mandatory=True):
897 if self._generated is not None:
897 if self._generated is not None:
898 raise error.ReadOnlyPartError('part is being generated')
898 raise error.ReadOnlyPartError('part is being generated')
899 if name in self._seenparams:
899 if name in self._seenparams:
900 raise ValueError('duplicated params: %s' % name)
900 raise ValueError('duplicated params: %s' % name)
901 self._seenparams.add(name)
901 self._seenparams.add(name)
902 params = self._advisoryparams
902 params = self._advisoryparams
903 if mandatory:
903 if mandatory:
904 params = self._mandatoryparams
904 params = self._mandatoryparams
905 params.append((name, value))
905 params.append((name, value))
906
906
907 # methods used to generates the bundle2 stream
907 # methods used to generates the bundle2 stream
908 def getchunks(self, ui):
908 def getchunks(self, ui):
909 if self._generated is not None:
909 if self._generated is not None:
910 raise RuntimeError('part can only be consumed once')
910 raise RuntimeError('part can only be consumed once')
911 self._generated = False
911 self._generated = False
912
912
913 if ui.debugflag:
913 if ui.debugflag:
914 msg = ['bundle2-output-part: "%s"' % self.type]
914 msg = ['bundle2-output-part: "%s"' % self.type]
915 if not self.mandatory:
915 if not self.mandatory:
916 msg.append(' (advisory)')
916 msg.append(' (advisory)')
917 nbmp = len(self.mandatoryparams)
917 nbmp = len(self.mandatoryparams)
918 nbap = len(self.advisoryparams)
918 nbap = len(self.advisoryparams)
919 if nbmp or nbap:
919 if nbmp or nbap:
920 msg.append(' (params:')
920 msg.append(' (params:')
921 if nbmp:
921 if nbmp:
922 msg.append(' %i mandatory' % nbmp)
922 msg.append(' %i mandatory' % nbmp)
923 if nbap:
923 if nbap:
924 msg.append(' %i advisory' % nbmp)
924 msg.append(' %i advisory' % nbmp)
925 msg.append(')')
925 msg.append(')')
926 if not self.data:
926 if not self.data:
927 msg.append(' empty payload')
927 msg.append(' empty payload')
928 elif util.safehasattr(self.data, 'next'):
928 elif util.safehasattr(self.data, 'next'):
929 msg.append(' streamed payload')
929 msg.append(' streamed payload')
930 else:
930 else:
931 msg.append(' %i bytes payload' % len(self.data))
931 msg.append(' %i bytes payload' % len(self.data))
932 msg.append('\n')
932 msg.append('\n')
933 ui.debug(''.join(msg))
933 ui.debug(''.join(msg))
934
934
935 #### header
935 #### header
936 if self.mandatory:
936 if self.mandatory:
937 parttype = self.type.upper()
937 parttype = self.type.upper()
938 else:
938 else:
939 parttype = self.type.lower()
939 parttype = self.type.lower()
940 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
940 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
941 ## parttype
941 ## parttype
942 header = [_pack(_fparttypesize, len(parttype)),
942 header = [_pack(_fparttypesize, len(parttype)),
943 parttype, _pack(_fpartid, self.id),
943 parttype, _pack(_fpartid, self.id),
944 ]
944 ]
945 ## parameters
945 ## parameters
946 # count
946 # count
947 manpar = self.mandatoryparams
947 manpar = self.mandatoryparams
948 advpar = self.advisoryparams
948 advpar = self.advisoryparams
949 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
949 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
950 # size
950 # size
951 parsizes = []
951 parsizes = []
952 for key, value in manpar:
952 for key, value in manpar:
953 parsizes.append(len(key))
953 parsizes.append(len(key))
954 parsizes.append(len(value))
954 parsizes.append(len(value))
955 for key, value in advpar:
955 for key, value in advpar:
956 parsizes.append(len(key))
956 parsizes.append(len(key))
957 parsizes.append(len(value))
957 parsizes.append(len(value))
958 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
958 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
959 header.append(paramsizes)
959 header.append(paramsizes)
960 # key, value
960 # key, value
961 for key, value in manpar:
961 for key, value in manpar:
962 header.append(key)
962 header.append(key)
963 header.append(value)
963 header.append(value)
964 for key, value in advpar:
964 for key, value in advpar:
965 header.append(key)
965 header.append(key)
966 header.append(value)
966 header.append(value)
967 ## finalize header
967 ## finalize header
968 headerchunk = ''.join(header)
968 headerchunk = ''.join(header)
969 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
969 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
970 yield _pack(_fpartheadersize, len(headerchunk))
970 yield _pack(_fpartheadersize, len(headerchunk))
971 yield headerchunk
971 yield headerchunk
972 ## payload
972 ## payload
973 try:
973 try:
974 for chunk in self._payloadchunks():
974 for chunk in self._payloadchunks():
975 outdebug(ui, 'payload chunk size: %i' % len(chunk))
975 outdebug(ui, 'payload chunk size: %i' % len(chunk))
976 yield _pack(_fpayloadsize, len(chunk))
976 yield _pack(_fpayloadsize, len(chunk))
977 yield chunk
977 yield chunk
978 except GeneratorExit:
978 except GeneratorExit:
979 # GeneratorExit means that nobody is listening for our
979 # GeneratorExit means that nobody is listening for our
980 # results anyway, so just bail quickly rather than trying
980 # results anyway, so just bail quickly rather than trying
981 # to produce an error part.
981 # to produce an error part.
982 ui.debug('bundle2-generatorexit\n')
982 ui.debug('bundle2-generatorexit\n')
983 raise
983 raise
984 except BaseException as exc:
984 except BaseException as exc:
985 # backup exception data for later
985 # backup exception data for later
986 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
986 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
987 % exc)
987 % exc)
988 exc_info = sys.exc_info()
988 exc_info = sys.exc_info()
989 msg = 'unexpected error: %s' % exc
989 msg = 'unexpected error: %s' % exc
990 interpart = bundlepart('error:abort', [('message', msg)],
990 interpart = bundlepart('error:abort', [('message', msg)],
991 mandatory=False)
991 mandatory=False)
992 interpart.id = 0
992 interpart.id = 0
993 yield _pack(_fpayloadsize, -1)
993 yield _pack(_fpayloadsize, -1)
994 for chunk in interpart.getchunks(ui=ui):
994 for chunk in interpart.getchunks(ui=ui):
995 yield chunk
995 yield chunk
996 outdebug(ui, 'closing payload chunk')
996 outdebug(ui, 'closing payload chunk')
997 # abort current part payload
997 # abort current part payload
998 yield _pack(_fpayloadsize, 0)
998 yield _pack(_fpayloadsize, 0)
999 raise exc_info[0], exc_info[1], exc_info[2]
999 raise exc_info[0], exc_info[1], exc_info[2]
1000 # end of payload
1000 # end of payload
1001 outdebug(ui, 'closing payload chunk')
1001 outdebug(ui, 'closing payload chunk')
1002 yield _pack(_fpayloadsize, 0)
1002 yield _pack(_fpayloadsize, 0)
1003 self._generated = True
1003 self._generated = True
1004
1004
1005 def _payloadchunks(self):
1005 def _payloadchunks(self):
1006 """yield chunks of a the part payload
1006 """yield chunks of a the part payload
1007
1007
1008 Exists to handle the different methods to provide data to a part."""
1008 Exists to handle the different methods to provide data to a part."""
1009 # we only support fixed size data now.
1009 # we only support fixed size data now.
1010 # This will be improved in the future.
1010 # This will be improved in the future.
1011 if util.safehasattr(self.data, 'next'):
1011 if util.safehasattr(self.data, 'next'):
1012 buff = util.chunkbuffer(self.data)
1012 buff = util.chunkbuffer(self.data)
1013 chunk = buff.read(preferedchunksize)
1013 chunk = buff.read(preferedchunksize)
1014 while chunk:
1014 while chunk:
1015 yield chunk
1015 yield chunk
1016 chunk = buff.read(preferedchunksize)
1016 chunk = buff.read(preferedchunksize)
1017 elif len(self.data):
1017 elif len(self.data):
1018 yield self.data
1018 yield self.data
1019
1019
1020
1020
1021 flaginterrupt = -1
1021 flaginterrupt = -1
1022
1022
1023 class interrupthandler(unpackermixin):
1023 class interrupthandler(unpackermixin):
1024 """read one part and process it with restricted capability
1024 """read one part and process it with restricted capability
1025
1025
1026 This allows to transmit exception raised on the producer size during part
1026 This allows to transmit exception raised on the producer size during part
1027 iteration while the consumer is reading a part.
1027 iteration while the consumer is reading a part.
1028
1028
1029 Part processed in this manner only have access to a ui object,"""
1029 Part processed in this manner only have access to a ui object,"""
1030
1030
1031 def __init__(self, ui, fp):
1031 def __init__(self, ui, fp):
1032 super(interrupthandler, self).__init__(fp)
1032 super(interrupthandler, self).__init__(fp)
1033 self.ui = ui
1033 self.ui = ui
1034
1034
1035 def _readpartheader(self):
1035 def _readpartheader(self):
1036 """reads a part header size and return the bytes blob
1036 """reads a part header size and return the bytes blob
1037
1037
1038 returns None if empty"""
1038 returns None if empty"""
1039 headersize = self._unpack(_fpartheadersize)[0]
1039 headersize = self._unpack(_fpartheadersize)[0]
1040 if headersize < 0:
1040 if headersize < 0:
1041 raise error.BundleValueError('negative part header size: %i'
1041 raise error.BundleValueError('negative part header size: %i'
1042 % headersize)
1042 % headersize)
1043 indebug(self.ui, 'part header size: %i\n' % headersize)
1043 indebug(self.ui, 'part header size: %i\n' % headersize)
1044 if headersize:
1044 if headersize:
1045 return self._readexact(headersize)
1045 return self._readexact(headersize)
1046 return None
1046 return None
1047
1047
1048 def __call__(self):
1048 def __call__(self):
1049
1049
1050 self.ui.debug('bundle2-input-stream-interrupt:'
1050 self.ui.debug('bundle2-input-stream-interrupt:'
1051 ' opening out of band context\n')
1051 ' opening out of band context\n')
1052 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1052 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1053 headerblock = self._readpartheader()
1053 headerblock = self._readpartheader()
1054 if headerblock is None:
1054 if headerblock is None:
1055 indebug(self.ui, 'no part found during interruption.')
1055 indebug(self.ui, 'no part found during interruption.')
1056 return
1056 return
1057 part = unbundlepart(self.ui, headerblock, self._fp)
1057 part = unbundlepart(self.ui, headerblock, self._fp)
1058 op = interruptoperation(self.ui)
1058 op = interruptoperation(self.ui)
1059 _processpart(op, part)
1059 _processpart(op, part)
1060 self.ui.debug('bundle2-input-stream-interrupt:'
1060 self.ui.debug('bundle2-input-stream-interrupt:'
1061 ' closing out of band context\n')
1061 ' closing out of band context\n')
1062
1062
1063 class interruptoperation(object):
1063 class interruptoperation(object):
1064 """A limited operation to be use by part handler during interruption
1064 """A limited operation to be use by part handler during interruption
1065
1065
1066 It only have access to an ui object.
1066 It only have access to an ui object.
1067 """
1067 """
1068
1068
1069 def __init__(self, ui):
1069 def __init__(self, ui):
1070 self.ui = ui
1070 self.ui = ui
1071 self.reply = None
1071 self.reply = None
1072 self.captureoutput = False
1072 self.captureoutput = False
1073
1073
1074 @property
1074 @property
1075 def repo(self):
1075 def repo(self):
1076 raise RuntimeError('no repo access from stream interruption')
1076 raise RuntimeError('no repo access from stream interruption')
1077
1077
1078 def gettransaction(self):
1078 def gettransaction(self):
1079 raise TransactionUnavailable('no repo access from stream interruption')
1079 raise TransactionUnavailable('no repo access from stream interruption')
1080
1080
1081 class unbundlepart(unpackermixin):
1081 class unbundlepart(unpackermixin):
1082 """a bundle part read from a bundle"""
1082 """a bundle part read from a bundle"""
1083
1083
1084 def __init__(self, ui, header, fp):
1084 def __init__(self, ui, header, fp):
1085 super(unbundlepart, self).__init__(fp)
1085 super(unbundlepart, self).__init__(fp)
1086 self.ui = ui
1086 self.ui = ui
1087 # unbundle state attr
1087 # unbundle state attr
1088 self._headerdata = header
1088 self._headerdata = header
1089 self._headeroffset = 0
1089 self._headeroffset = 0
1090 self._initialized = False
1090 self._initialized = False
1091 self.consumed = False
1091 self.consumed = False
1092 # part data
1092 # part data
1093 self.id = None
1093 self.id = None
1094 self.type = None
1094 self.type = None
1095 self.mandatoryparams = None
1095 self.mandatoryparams = None
1096 self.advisoryparams = None
1096 self.advisoryparams = None
1097 self.params = None
1097 self.params = None
1098 self.mandatorykeys = ()
1098 self.mandatorykeys = ()
1099 self._payloadstream = None
1099 self._payloadstream = None
1100 self._readheader()
1100 self._readheader()
1101 self._mandatory = None
1101 self._mandatory = None
1102 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1102 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1103 self._pos = 0
1103 self._pos = 0
1104
1104
1105 def _fromheader(self, size):
1105 def _fromheader(self, size):
1106 """return the next <size> byte from the header"""
1106 """return the next <size> byte from the header"""
1107 offset = self._headeroffset
1107 offset = self._headeroffset
1108 data = self._headerdata[offset:(offset + size)]
1108 data = self._headerdata[offset:(offset + size)]
1109 self._headeroffset = offset + size
1109 self._headeroffset = offset + size
1110 return data
1110 return data
1111
1111
1112 def _unpackheader(self, format):
1112 def _unpackheader(self, format):
1113 """read given format from header
1113 """read given format from header
1114
1114
1115 This automatically compute the size of the format to read."""
1115 This automatically compute the size of the format to read."""
1116 data = self._fromheader(struct.calcsize(format))
1116 data = self._fromheader(struct.calcsize(format))
1117 return _unpack(format, data)
1117 return _unpack(format, data)
1118
1118
1119 def _initparams(self, mandatoryparams, advisoryparams):
1119 def _initparams(self, mandatoryparams, advisoryparams):
1120 """internal function to setup all logic related parameters"""
1120 """internal function to setup all logic related parameters"""
1121 # make it read only to prevent people touching it by mistake.
1121 # make it read only to prevent people touching it by mistake.
1122 self.mandatoryparams = tuple(mandatoryparams)
1122 self.mandatoryparams = tuple(mandatoryparams)
1123 self.advisoryparams = tuple(advisoryparams)
1123 self.advisoryparams = tuple(advisoryparams)
1124 # user friendly UI
1124 # user friendly UI
1125 self.params = util.sortdict(self.mandatoryparams)
1125 self.params = util.sortdict(self.mandatoryparams)
1126 self.params.update(self.advisoryparams)
1126 self.params.update(self.advisoryparams)
1127 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1127 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1128
1128
1129 def _payloadchunks(self, chunknum=0):
1129 def _payloadchunks(self, chunknum=0):
1130 '''seek to specified chunk and start yielding data'''
1130 '''seek to specified chunk and start yielding data'''
1131 if len(self._chunkindex) == 0:
1131 if len(self._chunkindex) == 0:
1132 assert chunknum == 0, 'Must start with chunk 0'
1132 assert chunknum == 0, 'Must start with chunk 0'
1133 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1133 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1134 else:
1134 else:
1135 assert chunknum < len(self._chunkindex), \
1135 assert chunknum < len(self._chunkindex), \
1136 'Unknown chunk %d' % chunknum
1136 'Unknown chunk %d' % chunknum
1137 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1137 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1138
1138
1139 pos = self._chunkindex[chunknum][0]
1139 pos = self._chunkindex[chunknum][0]
1140 payloadsize = self._unpack(_fpayloadsize)[0]
1140 payloadsize = self._unpack(_fpayloadsize)[0]
1141 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1141 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1142 while payloadsize:
1142 while payloadsize:
1143 if payloadsize == flaginterrupt:
1143 if payloadsize == flaginterrupt:
1144 # interruption detection, the handler will now read a
1144 # interruption detection, the handler will now read a
1145 # single part and process it.
1145 # single part and process it.
1146 interrupthandler(self.ui, self._fp)()
1146 interrupthandler(self.ui, self._fp)()
1147 elif payloadsize < 0:
1147 elif payloadsize < 0:
1148 msg = 'negative payload chunk size: %i' % payloadsize
1148 msg = 'negative payload chunk size: %i' % payloadsize
1149 raise error.BundleValueError(msg)
1149 raise error.BundleValueError(msg)
1150 else:
1150 else:
1151 result = self._readexact(payloadsize)
1151 result = self._readexact(payloadsize)
1152 chunknum += 1
1152 chunknum += 1
1153 pos += payloadsize
1153 pos += payloadsize
1154 if chunknum == len(self._chunkindex):
1154 if chunknum == len(self._chunkindex):
1155 self._chunkindex.append((pos,
1155 self._chunkindex.append((pos,
1156 super(unbundlepart, self).tell()))
1156 super(unbundlepart, self).tell()))
1157 yield result
1157 yield result
1158 payloadsize = self._unpack(_fpayloadsize)[0]
1158 payloadsize = self._unpack(_fpayloadsize)[0]
1159 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1159 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1160
1160
1161 def _findchunk(self, pos):
1161 def _findchunk(self, pos):
1162 '''for a given payload position, return a chunk number and offset'''
1162 '''for a given payload position, return a chunk number and offset'''
1163 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1163 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1164 if ppos == pos:
1164 if ppos == pos:
1165 return chunk, 0
1165 return chunk, 0
1166 elif ppos > pos:
1166 elif ppos > pos:
1167 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1167 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1168 raise ValueError('Unknown chunk')
1168 raise ValueError('Unknown chunk')
1169
1169
1170 def _readheader(self):
1170 def _readheader(self):
1171 """read the header and setup the object"""
1171 """read the header and setup the object"""
1172 typesize = self._unpackheader(_fparttypesize)[0]
1172 typesize = self._unpackheader(_fparttypesize)[0]
1173 self.type = self._fromheader(typesize)
1173 self.type = self._fromheader(typesize)
1174 indebug(self.ui, 'part type: "%s"' % self.type)
1174 indebug(self.ui, 'part type: "%s"' % self.type)
1175 self.id = self._unpackheader(_fpartid)[0]
1175 self.id = self._unpackheader(_fpartid)[0]
1176 indebug(self.ui, 'part id: "%s"' % self.id)
1176 indebug(self.ui, 'part id: "%s"' % self.id)
1177 # extract mandatory bit from type
1177 # extract mandatory bit from type
1178 self.mandatory = (self.type != self.type.lower())
1178 self.mandatory = (self.type != self.type.lower())
1179 self.type = self.type.lower()
1179 self.type = self.type.lower()
1180 ## reading parameters
1180 ## reading parameters
1181 # param count
1181 # param count
1182 mancount, advcount = self._unpackheader(_fpartparamcount)
1182 mancount, advcount = self._unpackheader(_fpartparamcount)
1183 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1183 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1184 # param size
1184 # param size
1185 fparamsizes = _makefpartparamsizes(mancount + advcount)
1185 fparamsizes = _makefpartparamsizes(mancount + advcount)
1186 paramsizes = self._unpackheader(fparamsizes)
1186 paramsizes = self._unpackheader(fparamsizes)
1187 # make it a list of couple again
1187 # make it a list of couple again
1188 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1188 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1189 # split mandatory from advisory
1189 # split mandatory from advisory
1190 mansizes = paramsizes[:mancount]
1190 mansizes = paramsizes[:mancount]
1191 advsizes = paramsizes[mancount:]
1191 advsizes = paramsizes[mancount:]
1192 # retrieve param value
1192 # retrieve param value
1193 manparams = []
1193 manparams = []
1194 for key, value in mansizes:
1194 for key, value in mansizes:
1195 manparams.append((self._fromheader(key), self._fromheader(value)))
1195 manparams.append((self._fromheader(key), self._fromheader(value)))
1196 advparams = []
1196 advparams = []
1197 for key, value in advsizes:
1197 for key, value in advsizes:
1198 advparams.append((self._fromheader(key), self._fromheader(value)))
1198 advparams.append((self._fromheader(key), self._fromheader(value)))
1199 self._initparams(manparams, advparams)
1199 self._initparams(manparams, advparams)
1200 ## part payload
1200 ## part payload
1201 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1201 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1202 # we read the data, tell it
1202 # we read the data, tell it
1203 self._initialized = True
1203 self._initialized = True
1204
1204
1205 def read(self, size=None):
1205 def read(self, size=None):
1206 """read payload data"""
1206 """read payload data"""
1207 if not self._initialized:
1207 if not self._initialized:
1208 self._readheader()
1208 self._readheader()
1209 if size is None:
1209 if size is None:
1210 data = self._payloadstream.read()
1210 data = self._payloadstream.read()
1211 else:
1211 else:
1212 data = self._payloadstream.read(size)
1212 data = self._payloadstream.read(size)
1213 self._pos += len(data)
1213 self._pos += len(data)
1214 if size is None or len(data) < size:
1214 if size is None or len(data) < size:
1215 if not self.consumed and self._pos:
1215 if not self.consumed and self._pos:
1216 self.ui.debug('bundle2-input-part: total payload size %i\n'
1216 self.ui.debug('bundle2-input-part: total payload size %i\n'
1217 % self._pos)
1217 % self._pos)
1218 self.consumed = True
1218 self.consumed = True
1219 return data
1219 return data
1220
1220
1221 def tell(self):
1221 def tell(self):
1222 return self._pos
1222 return self._pos
1223
1223
1224 def seek(self, offset, whence=0):
1224 def seek(self, offset, whence=0):
1225 if whence == 0:
1225 if whence == 0:
1226 newpos = offset
1226 newpos = offset
1227 elif whence == 1:
1227 elif whence == 1:
1228 newpos = self._pos + offset
1228 newpos = self._pos + offset
1229 elif whence == 2:
1229 elif whence == 2:
1230 if not self.consumed:
1230 if not self.consumed:
1231 self.read()
1231 self.read()
1232 newpos = self._chunkindex[-1][0] - offset
1232 newpos = self._chunkindex[-1][0] - offset
1233 else:
1233 else:
1234 raise ValueError('Unknown whence value: %r' % (whence,))
1234 raise ValueError('Unknown whence value: %r' % (whence,))
1235
1235
1236 if newpos > self._chunkindex[-1][0] and not self.consumed:
1236 if newpos > self._chunkindex[-1][0] and not self.consumed:
1237 self.read()
1237 self.read()
1238 if not 0 <= newpos <= self._chunkindex[-1][0]:
1238 if not 0 <= newpos <= self._chunkindex[-1][0]:
1239 raise ValueError('Offset out of range')
1239 raise ValueError('Offset out of range')
1240
1240
1241 if self._pos != newpos:
1241 if self._pos != newpos:
1242 chunk, internaloffset = self._findchunk(newpos)
1242 chunk, internaloffset = self._findchunk(newpos)
1243 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1243 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1244 adjust = self.read(internaloffset)
1244 adjust = self.read(internaloffset)
1245 if len(adjust) != internaloffset:
1245 if len(adjust) != internaloffset:
1246 raise error.Abort(_('Seek failed\n'))
1246 raise error.Abort(_('Seek failed\n'))
1247 self._pos = newpos
1247 self._pos = newpos
1248
1248
1249 # These are only the static capabilities.
1249 # These are only the static capabilities.
1250 # Check the 'getrepocaps' function for the rest.
1250 # Check the 'getrepocaps' function for the rest.
1251 capabilities = {'HG20': (),
1251 capabilities = {'HG20': (),
1252 'error': ('abort', 'unsupportedcontent', 'pushraced',
1252 'error': ('abort', 'unsupportedcontent', 'pushraced',
1253 'pushkey'),
1253 'pushkey'),
1254 'listkeys': (),
1254 'listkeys': (),
1255 'pushkey': (),
1255 'pushkey': (),
1256 'digests': tuple(sorted(util.DIGESTS.keys())),
1256 'digests': tuple(sorted(util.DIGESTS.keys())),
1257 'remote-changegroup': ('http', 'https'),
1257 'remote-changegroup': ('http', 'https'),
1258 'hgtagsfnodes': (),
1258 'hgtagsfnodes': (),
1259 }
1259 }
1260
1260
1261 def getrepocaps(repo, allowpushback=False):
1261 def getrepocaps(repo, allowpushback=False):
1262 """return the bundle2 capabilities for a given repo
1262 """return the bundle2 capabilities for a given repo
1263
1263
1264 Exists to allow extensions (like evolution) to mutate the capabilities.
1264 Exists to allow extensions (like evolution) to mutate the capabilities.
1265 """
1265 """
1266 caps = capabilities.copy()
1266 caps = capabilities.copy()
1267 caps['changegroup'] = tuple(sorted(
1267 caps['changegroup'] = tuple(sorted(
1268 changegroup.supportedincomingversions(repo)))
1268 changegroup.supportedincomingversions(repo)))
1269 if obsolete.isenabled(repo, obsolete.exchangeopt):
1269 if obsolete.isenabled(repo, obsolete.exchangeopt):
1270 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1270 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1271 caps['obsmarkers'] = supportedformat
1271 caps['obsmarkers'] = supportedformat
1272 if allowpushback:
1272 if allowpushback:
1273 caps['pushback'] = ()
1273 caps['pushback'] = ()
1274 return caps
1274 return caps
1275
1275
1276 def bundle2caps(remote):
1276 def bundle2caps(remote):
1277 """return the bundle capabilities of a peer as dict"""
1277 """return the bundle capabilities of a peer as dict"""
1278 raw = remote.capable('bundle2')
1278 raw = remote.capable('bundle2')
1279 if not raw and raw != '':
1279 if not raw and raw != '':
1280 return {}
1280 return {}
1281 capsblob = urlreq.unquote(remote.capable('bundle2'))
1281 capsblob = urlreq.unquote(remote.capable('bundle2'))
1282 return decodecaps(capsblob)
1282 return decodecaps(capsblob)
1283
1283
1284 def obsmarkersversion(caps):
1284 def obsmarkersversion(caps):
1285 """extract the list of supported obsmarkers versions from a bundle2caps dict
1285 """extract the list of supported obsmarkers versions from a bundle2caps dict
1286 """
1286 """
1287 obscaps = caps.get('obsmarkers', ())
1287 obscaps = caps.get('obsmarkers', ())
1288 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1288 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1289
1289
1290 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
1290 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
1291 """Write a bundle file and return its filename.
1291 """Write a bundle file and return its filename.
1292
1292
1293 Existing files will not be overwritten.
1293 Existing files will not be overwritten.
1294 If no filename is specified, a temporary file is created.
1294 If no filename is specified, a temporary file is created.
1295 bz2 compression can be turned off.
1295 bz2 compression can be turned off.
1296 The bundle file will be deleted in case of errors.
1296 The bundle file will be deleted in case of errors.
1297 """
1297 """
1298
1298
1299 if bundletype == "HG20":
1299 if bundletype == "HG20":
1300 bundle = bundle20(ui)
1300 bundle = bundle20(ui)
1301 bundle.setcompression(compression)
1301 bundle.setcompression(compression)
1302 part = bundle.newpart('changegroup', data=cg.getchunks())
1302 part = bundle.newpart('changegroup', data=cg.getchunks())
1303 part.addparam('version', cg.version)
1303 part.addparam('version', cg.version)
1304 if 'clcount' in cg.extras:
1304 if 'clcount' in cg.extras:
1305 part.addparam('nbchanges', str(cg.extras['clcount']),
1305 part.addparam('nbchanges', str(cg.extras['clcount']),
1306 mandatory=False)
1306 mandatory=False)
1307 chunkiter = bundle.getchunks()
1307 chunkiter = bundle.getchunks()
1308 else:
1308 else:
1309 # compression argument is only for the bundle2 case
1309 # compression argument is only for the bundle2 case
1310 assert compression is None
1310 assert compression is None
1311 if cg.version != '01':
1311 if cg.version != '01':
1312 raise error.Abort(_('old bundle types only supports v1 '
1312 raise error.Abort(_('old bundle types only supports v1 '
1313 'changegroups'))
1313 'changegroups'))
1314 header, comp = bundletypes[bundletype]
1314 header, comp = bundletypes[bundletype]
1315 if comp not in util.compressors:
1315 if comp not in util.compressors:
1316 raise error.Abort(_('unknown stream compression type: %s')
1316 raise error.Abort(_('unknown stream compression type: %s')
1317 % comp)
1317 % comp)
1318 z = util.compressors[comp]()
1318 z = util.compressors[comp]()
1319 subchunkiter = cg.getchunks()
1319 subchunkiter = cg.getchunks()
1320 def chunkiter():
1320 def chunkiter():
1321 yield header
1321 yield header
1322 for chunk in subchunkiter:
1322 for chunk in subchunkiter:
1323 yield z.compress(chunk)
1323 yield z.compress(chunk)
1324 yield z.flush()
1324 yield z.flush()
1325 chunkiter = chunkiter()
1325 chunkiter = chunkiter()
1326
1326
1327 # parse the changegroup data, otherwise we will block
1327 # parse the changegroup data, otherwise we will block
1328 # in case of sshrepo because we don't know the end of the stream
1328 # in case of sshrepo because we don't know the end of the stream
1329 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1329 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1330
1330
1331 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1331 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1332 def handlechangegroup(op, inpart):
1332 def handlechangegroup(op, inpart):
1333 """apply a changegroup part on the repo
1333 """apply a changegroup part on the repo
1334
1334
1335 This is a very early implementation that will massive rework before being
1335 This is a very early implementation that will massive rework before being
1336 inflicted to any end-user.
1336 inflicted to any end-user.
1337 """
1337 """
1338 # Make sure we trigger a transaction creation
1338 # Make sure we trigger a transaction creation
1339 #
1339 #
1340 # The addchangegroup function will get a transaction object by itself, but
1340 # The addchangegroup function will get a transaction object by itself, but
1341 # we need to make sure we trigger the creation of a transaction object used
1341 # we need to make sure we trigger the creation of a transaction object used
1342 # for the whole processing scope.
1342 # for the whole processing scope.
1343 op.gettransaction()
1343 op.gettransaction()
1344 unpackerversion = inpart.params.get('version', '01')
1344 unpackerversion = inpart.params.get('version', '01')
1345 # We should raise an appropriate exception here
1345 # We should raise an appropriate exception here
1346 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1346 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1347 # the source and url passed here are overwritten by the one contained in
1347 # the source and url passed here are overwritten by the one contained in
1348 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1348 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1349 nbchangesets = None
1349 nbchangesets = None
1350 if 'nbchanges' in inpart.params:
1350 if 'nbchanges' in inpart.params:
1351 nbchangesets = int(inpart.params.get('nbchanges'))
1351 nbchangesets = int(inpart.params.get('nbchanges'))
1352 if ('treemanifest' in inpart.params and
1352 if ('treemanifest' in inpart.params and
1353 'treemanifest' not in op.repo.requirements):
1353 'treemanifest' not in op.repo.requirements):
1354 if len(op.repo.changelog) != 0:
1354 if len(op.repo.changelog) != 0:
1355 raise error.Abort(_(
1355 raise error.Abort(_(
1356 "bundle contains tree manifests, but local repo is "
1356 "bundle contains tree manifests, but local repo is "
1357 "non-empty and does not use tree manifests"))
1357 "non-empty and does not use tree manifests"))
1358 op.repo.requirements.add('treemanifest')
1358 op.repo.requirements.add('treemanifest')
1359 op.repo._applyopenerreqs()
1359 op.repo._applyopenerreqs()
1360 op.repo._writerequirements()
1360 op.repo._writerequirements()
1361 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1361 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1362 op.records.add('changegroup', {'return': ret})
1362 op.records.add('changegroup', {'return': ret})
1363 if op.reply is not None:
1363 if op.reply is not None:
1364 # This is definitely not the final form of this
1364 # This is definitely not the final form of this
1365 # return. But one need to start somewhere.
1365 # return. But one need to start somewhere.
1366 part = op.reply.newpart('reply:changegroup', mandatory=False)
1366 part = op.reply.newpart('reply:changegroup', mandatory=False)
1367 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1367 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1368 part.addparam('return', '%i' % ret, mandatory=False)
1368 part.addparam('return', '%i' % ret, mandatory=False)
1369 assert not inpart.read()
1369 assert not inpart.read()
1370
1370
1371 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1371 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1372 ['digest:%s' % k for k in util.DIGESTS.keys()])
1372 ['digest:%s' % k for k in util.DIGESTS.keys()])
1373 @parthandler('remote-changegroup', _remotechangegroupparams)
1373 @parthandler('remote-changegroup', _remotechangegroupparams)
1374 def handleremotechangegroup(op, inpart):
1374 def handleremotechangegroup(op, inpart):
1375 """apply a bundle10 on the repo, given an url and validation information
1375 """apply a bundle10 on the repo, given an url and validation information
1376
1376
1377 All the information about the remote bundle to import are given as
1377 All the information about the remote bundle to import are given as
1378 parameters. The parameters include:
1378 parameters. The parameters include:
1379 - url: the url to the bundle10.
1379 - url: the url to the bundle10.
1380 - size: the bundle10 file size. It is used to validate what was
1380 - size: the bundle10 file size. It is used to validate what was
1381 retrieved by the client matches the server knowledge about the bundle.
1381 retrieved by the client matches the server knowledge about the bundle.
1382 - digests: a space separated list of the digest types provided as
1382 - digests: a space separated list of the digest types provided as
1383 parameters.
1383 parameters.
1384 - digest:<digest-type>: the hexadecimal representation of the digest with
1384 - digest:<digest-type>: the hexadecimal representation of the digest with
1385 that name. Like the size, it is used to validate what was retrieved by
1385 that name. Like the size, it is used to validate what was retrieved by
1386 the client matches what the server knows about the bundle.
1386 the client matches what the server knows about the bundle.
1387
1387
1388 When multiple digest types are given, all of them are checked.
1388 When multiple digest types are given, all of them are checked.
1389 """
1389 """
1390 try:
1390 try:
1391 raw_url = inpart.params['url']
1391 raw_url = inpart.params['url']
1392 except KeyError:
1392 except KeyError:
1393 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1393 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1394 parsed_url = util.url(raw_url)
1394 parsed_url = util.url(raw_url)
1395 if parsed_url.scheme not in capabilities['remote-changegroup']:
1395 if parsed_url.scheme not in capabilities['remote-changegroup']:
1396 raise error.Abort(_('remote-changegroup does not support %s urls') %
1396 raise error.Abort(_('remote-changegroup does not support %s urls') %
1397 parsed_url.scheme)
1397 parsed_url.scheme)
1398
1398
1399 try:
1399 try:
1400 size = int(inpart.params['size'])
1400 size = int(inpart.params['size'])
1401 except ValueError:
1401 except ValueError:
1402 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1402 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1403 % 'size')
1403 % 'size')
1404 except KeyError:
1404 except KeyError:
1405 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1405 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1406
1406
1407 digests = {}
1407 digests = {}
1408 for typ in inpart.params.get('digests', '').split():
1408 for typ in inpart.params.get('digests', '').split():
1409 param = 'digest:%s' % typ
1409 param = 'digest:%s' % typ
1410 try:
1410 try:
1411 value = inpart.params[param]
1411 value = inpart.params[param]
1412 except KeyError:
1412 except KeyError:
1413 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1413 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1414 param)
1414 param)
1415 digests[typ] = value
1415 digests[typ] = value
1416
1416
1417 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1417 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1418
1418
1419 # Make sure we trigger a transaction creation
1419 # Make sure we trigger a transaction creation
1420 #
1420 #
1421 # The addchangegroup function will get a transaction object by itself, but
1421 # The addchangegroup function will get a transaction object by itself, but
1422 # we need to make sure we trigger the creation of a transaction object used
1422 # we need to make sure we trigger the creation of a transaction object used
1423 # for the whole processing scope.
1423 # for the whole processing scope.
1424 op.gettransaction()
1424 op.gettransaction()
1425 from . import exchange
1425 from . import exchange
1426 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1426 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1427 if not isinstance(cg, changegroup.cg1unpacker):
1427 if not isinstance(cg, changegroup.cg1unpacker):
1428 raise error.Abort(_('%s: not a bundle version 1.0') %
1428 raise error.Abort(_('%s: not a bundle version 1.0') %
1429 util.hidepassword(raw_url))
1429 util.hidepassword(raw_url))
1430 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1430 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1431 op.records.add('changegroup', {'return': ret})
1431 op.records.add('changegroup', {'return': ret})
1432 if op.reply is not None:
1432 if op.reply is not None:
1433 # This is definitely not the final form of this
1433 # This is definitely not the final form of this
1434 # return. But one need to start somewhere.
1434 # return. But one need to start somewhere.
1435 part = op.reply.newpart('reply:changegroup')
1435 part = op.reply.newpart('reply:changegroup')
1436 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1436 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1437 part.addparam('return', '%i' % ret, mandatory=False)
1437 part.addparam('return', '%i' % ret, mandatory=False)
1438 try:
1438 try:
1439 real_part.validate()
1439 real_part.validate()
1440 except error.Abort as e:
1440 except error.Abort as e:
1441 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1441 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1442 (util.hidepassword(raw_url), str(e)))
1442 (util.hidepassword(raw_url), str(e)))
1443 assert not inpart.read()
1443 assert not inpart.read()
1444
1444
1445 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1445 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1446 def handlereplychangegroup(op, inpart):
1446 def handlereplychangegroup(op, inpart):
1447 ret = int(inpart.params['return'])
1447 ret = int(inpart.params['return'])
1448 replyto = int(inpart.params['in-reply-to'])
1448 replyto = int(inpart.params['in-reply-to'])
1449 op.records.add('changegroup', {'return': ret}, replyto)
1449 op.records.add('changegroup', {'return': ret}, replyto)
1450
1450
1451 @parthandler('check:heads')
1451 @parthandler('check:heads')
1452 def handlecheckheads(op, inpart):
1452 def handlecheckheads(op, inpart):
1453 """check that head of the repo did not change
1453 """check that head of the repo did not change
1454
1454
1455 This is used to detect a push race when using unbundle.
1455 This is used to detect a push race when using unbundle.
1456 This replaces the "heads" argument of unbundle."""
1456 This replaces the "heads" argument of unbundle."""
1457 h = inpart.read(20)
1457 h = inpart.read(20)
1458 heads = []
1458 heads = []
1459 while len(h) == 20:
1459 while len(h) == 20:
1460 heads.append(h)
1460 heads.append(h)
1461 h = inpart.read(20)
1461 h = inpart.read(20)
1462 assert not h
1462 assert not h
1463 # Trigger a transaction so that we are guaranteed to have the lock now.
1463 # Trigger a transaction so that we are guaranteed to have the lock now.
1464 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1464 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1465 op.gettransaction()
1465 op.gettransaction()
1466 if sorted(heads) != sorted(op.repo.heads()):
1466 if sorted(heads) != sorted(op.repo.heads()):
1467 raise error.PushRaced('repository changed while pushing - '
1467 raise error.PushRaced('repository changed while pushing - '
1468 'please try again')
1468 'please try again')
1469
1469
1470 @parthandler('output')
1470 @parthandler('output')
1471 def handleoutput(op, inpart):
1471 def handleoutput(op, inpart):
1472 """forward output captured on the server to the client"""
1472 """forward output captured on the server to the client"""
1473 for line in inpart.read().splitlines():
1473 for line in inpart.read().splitlines():
1474 op.ui.status(('remote: %s\n' % line))
1474 op.ui.status(_('remote: %s\n') % line)
1475
1475
1476 @parthandler('replycaps')
1476 @parthandler('replycaps')
1477 def handlereplycaps(op, inpart):
1477 def handlereplycaps(op, inpart):
1478 """Notify that a reply bundle should be created
1478 """Notify that a reply bundle should be created
1479
1479
1480 The payload contains the capabilities information for the reply"""
1480 The payload contains the capabilities information for the reply"""
1481 caps = decodecaps(inpart.read())
1481 caps = decodecaps(inpart.read())
1482 if op.reply is None:
1482 if op.reply is None:
1483 op.reply = bundle20(op.ui, caps)
1483 op.reply = bundle20(op.ui, caps)
1484
1484
1485 class AbortFromPart(error.Abort):
1485 class AbortFromPart(error.Abort):
1486 """Sub-class of Abort that denotes an error from a bundle2 part."""
1486 """Sub-class of Abort that denotes an error from a bundle2 part."""
1487
1487
1488 @parthandler('error:abort', ('message', 'hint'))
1488 @parthandler('error:abort', ('message', 'hint'))
1489 def handleerrorabort(op, inpart):
1489 def handleerrorabort(op, inpart):
1490 """Used to transmit abort error over the wire"""
1490 """Used to transmit abort error over the wire"""
1491 raise AbortFromPart(inpart.params['message'],
1491 raise AbortFromPart(inpart.params['message'],
1492 hint=inpart.params.get('hint'))
1492 hint=inpart.params.get('hint'))
1493
1493
1494 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1494 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1495 'in-reply-to'))
1495 'in-reply-to'))
1496 def handleerrorpushkey(op, inpart):
1496 def handleerrorpushkey(op, inpart):
1497 """Used to transmit failure of a mandatory pushkey over the wire"""
1497 """Used to transmit failure of a mandatory pushkey over the wire"""
1498 kwargs = {}
1498 kwargs = {}
1499 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1499 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1500 value = inpart.params.get(name)
1500 value = inpart.params.get(name)
1501 if value is not None:
1501 if value is not None:
1502 kwargs[name] = value
1502 kwargs[name] = value
1503 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1503 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1504
1504
1505 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1505 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1506 def handleerrorunsupportedcontent(op, inpart):
1506 def handleerrorunsupportedcontent(op, inpart):
1507 """Used to transmit unknown content error over the wire"""
1507 """Used to transmit unknown content error over the wire"""
1508 kwargs = {}
1508 kwargs = {}
1509 parttype = inpart.params.get('parttype')
1509 parttype = inpart.params.get('parttype')
1510 if parttype is not None:
1510 if parttype is not None:
1511 kwargs['parttype'] = parttype
1511 kwargs['parttype'] = parttype
1512 params = inpart.params.get('params')
1512 params = inpart.params.get('params')
1513 if params is not None:
1513 if params is not None:
1514 kwargs['params'] = params.split('\0')
1514 kwargs['params'] = params.split('\0')
1515
1515
1516 raise error.BundleUnknownFeatureError(**kwargs)
1516 raise error.BundleUnknownFeatureError(**kwargs)
1517
1517
1518 @parthandler('error:pushraced', ('message',))
1518 @parthandler('error:pushraced', ('message',))
1519 def handleerrorpushraced(op, inpart):
1519 def handleerrorpushraced(op, inpart):
1520 """Used to transmit push race error over the wire"""
1520 """Used to transmit push race error over the wire"""
1521 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1521 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1522
1522
1523 @parthandler('listkeys', ('namespace',))
1523 @parthandler('listkeys', ('namespace',))
1524 def handlelistkeys(op, inpart):
1524 def handlelistkeys(op, inpart):
1525 """retrieve pushkey namespace content stored in a bundle2"""
1525 """retrieve pushkey namespace content stored in a bundle2"""
1526 namespace = inpart.params['namespace']
1526 namespace = inpart.params['namespace']
1527 r = pushkey.decodekeys(inpart.read())
1527 r = pushkey.decodekeys(inpart.read())
1528 op.records.add('listkeys', (namespace, r))
1528 op.records.add('listkeys', (namespace, r))
1529
1529
1530 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1530 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1531 def handlepushkey(op, inpart):
1531 def handlepushkey(op, inpart):
1532 """process a pushkey request"""
1532 """process a pushkey request"""
1533 dec = pushkey.decode
1533 dec = pushkey.decode
1534 namespace = dec(inpart.params['namespace'])
1534 namespace = dec(inpart.params['namespace'])
1535 key = dec(inpart.params['key'])
1535 key = dec(inpart.params['key'])
1536 old = dec(inpart.params['old'])
1536 old = dec(inpart.params['old'])
1537 new = dec(inpart.params['new'])
1537 new = dec(inpart.params['new'])
1538 # Grab the transaction to ensure that we have the lock before performing the
1538 # Grab the transaction to ensure that we have the lock before performing the
1539 # pushkey.
1539 # pushkey.
1540 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1540 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1541 op.gettransaction()
1541 op.gettransaction()
1542 ret = op.repo.pushkey(namespace, key, old, new)
1542 ret = op.repo.pushkey(namespace, key, old, new)
1543 record = {'namespace': namespace,
1543 record = {'namespace': namespace,
1544 'key': key,
1544 'key': key,
1545 'old': old,
1545 'old': old,
1546 'new': new}
1546 'new': new}
1547 op.records.add('pushkey', record)
1547 op.records.add('pushkey', record)
1548 if op.reply is not None:
1548 if op.reply is not None:
1549 rpart = op.reply.newpart('reply:pushkey')
1549 rpart = op.reply.newpart('reply:pushkey')
1550 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1550 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1551 rpart.addparam('return', '%i' % ret, mandatory=False)
1551 rpart.addparam('return', '%i' % ret, mandatory=False)
1552 if inpart.mandatory and not ret:
1552 if inpart.mandatory and not ret:
1553 kwargs = {}
1553 kwargs = {}
1554 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1554 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1555 if key in inpart.params:
1555 if key in inpart.params:
1556 kwargs[key] = inpart.params[key]
1556 kwargs[key] = inpart.params[key]
1557 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1557 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1558
1558
1559 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1559 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1560 def handlepushkeyreply(op, inpart):
1560 def handlepushkeyreply(op, inpart):
1561 """retrieve the result of a pushkey request"""
1561 """retrieve the result of a pushkey request"""
1562 ret = int(inpart.params['return'])
1562 ret = int(inpart.params['return'])
1563 partid = int(inpart.params['in-reply-to'])
1563 partid = int(inpart.params['in-reply-to'])
1564 op.records.add('pushkey', {'return': ret}, partid)
1564 op.records.add('pushkey', {'return': ret}, partid)
1565
1565
1566 @parthandler('obsmarkers')
1566 @parthandler('obsmarkers')
1567 def handleobsmarker(op, inpart):
1567 def handleobsmarker(op, inpart):
1568 """add a stream of obsmarkers to the repo"""
1568 """add a stream of obsmarkers to the repo"""
1569 tr = op.gettransaction()
1569 tr = op.gettransaction()
1570 markerdata = inpart.read()
1570 markerdata = inpart.read()
1571 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1571 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1572 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1572 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1573 % len(markerdata))
1573 % len(markerdata))
1574 # The mergemarkers call will crash if marker creation is not enabled.
1574 # The mergemarkers call will crash if marker creation is not enabled.
1575 # we want to avoid this if the part is advisory.
1575 # we want to avoid this if the part is advisory.
1576 if not inpart.mandatory and op.repo.obsstore.readonly:
1576 if not inpart.mandatory and op.repo.obsstore.readonly:
1577 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1577 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1578 return
1578 return
1579 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1579 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1580 if new:
1580 if new:
1581 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1581 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1582 op.records.add('obsmarkers', {'new': new})
1582 op.records.add('obsmarkers', {'new': new})
1583 if op.reply is not None:
1583 if op.reply is not None:
1584 rpart = op.reply.newpart('reply:obsmarkers')
1584 rpart = op.reply.newpart('reply:obsmarkers')
1585 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1585 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1586 rpart.addparam('new', '%i' % new, mandatory=False)
1586 rpart.addparam('new', '%i' % new, mandatory=False)
1587
1587
1588
1588
1589 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1589 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1590 def handleobsmarkerreply(op, inpart):
1590 def handleobsmarkerreply(op, inpart):
1591 """retrieve the result of a pushkey request"""
1591 """retrieve the result of a pushkey request"""
1592 ret = int(inpart.params['new'])
1592 ret = int(inpart.params['new'])
1593 partid = int(inpart.params['in-reply-to'])
1593 partid = int(inpart.params['in-reply-to'])
1594 op.records.add('obsmarkers', {'new': ret}, partid)
1594 op.records.add('obsmarkers', {'new': ret}, partid)
1595
1595
1596 @parthandler('hgtagsfnodes')
1596 @parthandler('hgtagsfnodes')
1597 def handlehgtagsfnodes(op, inpart):
1597 def handlehgtagsfnodes(op, inpart):
1598 """Applies .hgtags fnodes cache entries to the local repo.
1598 """Applies .hgtags fnodes cache entries to the local repo.
1599
1599
1600 Payload is pairs of 20 byte changeset nodes and filenodes.
1600 Payload is pairs of 20 byte changeset nodes and filenodes.
1601 """
1601 """
1602 # Grab the transaction so we ensure that we have the lock at this point.
1602 # Grab the transaction so we ensure that we have the lock at this point.
1603 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1603 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1604 op.gettransaction()
1604 op.gettransaction()
1605 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1605 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1606
1606
1607 count = 0
1607 count = 0
1608 while True:
1608 while True:
1609 node = inpart.read(20)
1609 node = inpart.read(20)
1610 fnode = inpart.read(20)
1610 fnode = inpart.read(20)
1611 if len(node) < 20 or len(fnode) < 20:
1611 if len(node) < 20 or len(fnode) < 20:
1612 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1612 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1613 break
1613 break
1614 cache.setfnode(node, fnode)
1614 cache.setfnode(node, fnode)
1615 count += 1
1615 count += 1
1616
1616
1617 cache.write()
1617 cache.write()
1618 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1618 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now