##// END OF EJS Templates
bundle2: use modern Python division...
Augie Fackler -
r33635:da7c285e default
parent child Browse files
Show More
@@ -1,1874 +1,1874 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import, division
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 phases,
161 phases,
162 pushkey,
162 pushkey,
163 pycompat,
163 pycompat,
164 tags,
164 tags,
165 url,
165 url,
166 util,
166 util,
167 )
167 )
168
168
169 urlerr = util.urlerr
169 urlerr = util.urlerr
170 urlreq = util.urlreq
170 urlreq = util.urlreq
171
171
172 _pack = struct.pack
172 _pack = struct.pack
173 _unpack = struct.unpack
173 _unpack = struct.unpack
174
174
175 _fstreamparamsize = '>i'
175 _fstreamparamsize = '>i'
176 _fpartheadersize = '>i'
176 _fpartheadersize = '>i'
177 _fparttypesize = '>B'
177 _fparttypesize = '>B'
178 _fpartid = '>I'
178 _fpartid = '>I'
179 _fpayloadsize = '>i'
179 _fpayloadsize = '>i'
180 _fpartparamcount = '>BB'
180 _fpartparamcount = '>BB'
181
181
182 _fphasesentry = '>i20s'
182 _fphasesentry = '>i20s'
183
183
184 preferedchunksize = 4096
184 preferedchunksize = 4096
185
185
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187
187
188 def outdebug(ui, message):
188 def outdebug(ui, message):
189 """debug regarding output stream (bundling)"""
189 """debug regarding output stream (bundling)"""
190 if ui.configbool('devel', 'bundle2.debug'):
190 if ui.configbool('devel', 'bundle2.debug'):
191 ui.debug('bundle2-output: %s\n' % message)
191 ui.debug('bundle2-output: %s\n' % message)
192
192
193 def indebug(ui, message):
193 def indebug(ui, message):
194 """debug on input stream (unbundling)"""
194 """debug on input stream (unbundling)"""
195 if ui.configbool('devel', 'bundle2.debug'):
195 if ui.configbool('devel', 'bundle2.debug'):
196 ui.debug('bundle2-input: %s\n' % message)
196 ui.debug('bundle2-input: %s\n' % message)
197
197
198 def validateparttype(parttype):
198 def validateparttype(parttype):
199 """raise ValueError if a parttype contains invalid character"""
199 """raise ValueError if a parttype contains invalid character"""
200 if _parttypeforbidden.search(parttype):
200 if _parttypeforbidden.search(parttype):
201 raise ValueError(parttype)
201 raise ValueError(parttype)
202
202
203 def _makefpartparamsizes(nbparams):
203 def _makefpartparamsizes(nbparams):
204 """return a struct format to read part parameter sizes
204 """return a struct format to read part parameter sizes
205
205
206 The number parameters is variable so we need to build that format
206 The number parameters is variable so we need to build that format
207 dynamically.
207 dynamically.
208 """
208 """
209 return '>'+('BB'*nbparams)
209 return '>'+('BB'*nbparams)
210
210
211 parthandlermapping = {}
211 parthandlermapping = {}
212
212
213 def parthandler(parttype, params=()):
213 def parthandler(parttype, params=()):
214 """decorator that register a function as a bundle2 part handler
214 """decorator that register a function as a bundle2 part handler
215
215
216 eg::
216 eg::
217
217
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 def myparttypehandler(...):
219 def myparttypehandler(...):
220 '''process a part of type "my part".'''
220 '''process a part of type "my part".'''
221 ...
221 ...
222 """
222 """
223 validateparttype(parttype)
223 validateparttype(parttype)
224 def _decorator(func):
224 def _decorator(func):
225 lparttype = parttype.lower() # enforce lower case matching.
225 lparttype = parttype.lower() # enforce lower case matching.
226 assert lparttype not in parthandlermapping
226 assert lparttype not in parthandlermapping
227 parthandlermapping[lparttype] = func
227 parthandlermapping[lparttype] = func
228 func.params = frozenset(params)
228 func.params = frozenset(params)
229 return func
229 return func
230 return _decorator
230 return _decorator
231
231
232 class unbundlerecords(object):
232 class unbundlerecords(object):
233 """keep record of what happens during and unbundle
233 """keep record of what happens during and unbundle
234
234
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 category of record and obj is an arbitrary object.
236 category of record and obj is an arbitrary object.
237
237
238 `records['cat']` will return all entries of this category 'cat'.
238 `records['cat']` will return all entries of this category 'cat'.
239
239
240 Iterating on the object itself will yield `('category', obj)` tuples
240 Iterating on the object itself will yield `('category', obj)` tuples
241 for all entries.
241 for all entries.
242
242
243 All iterations happens in chronological order.
243 All iterations happens in chronological order.
244 """
244 """
245
245
246 def __init__(self):
246 def __init__(self):
247 self._categories = {}
247 self._categories = {}
248 self._sequences = []
248 self._sequences = []
249 self._replies = {}
249 self._replies = {}
250
250
251 def add(self, category, entry, inreplyto=None):
251 def add(self, category, entry, inreplyto=None):
252 """add a new record of a given category.
252 """add a new record of a given category.
253
253
254 The entry can then be retrieved in the list returned by
254 The entry can then be retrieved in the list returned by
255 self['category']."""
255 self['category']."""
256 self._categories.setdefault(category, []).append(entry)
256 self._categories.setdefault(category, []).append(entry)
257 self._sequences.append((category, entry))
257 self._sequences.append((category, entry))
258 if inreplyto is not None:
258 if inreplyto is not None:
259 self.getreplies(inreplyto).add(category, entry)
259 self.getreplies(inreplyto).add(category, entry)
260
260
261 def getreplies(self, partid):
261 def getreplies(self, partid):
262 """get the records that are replies to a specific part"""
262 """get the records that are replies to a specific part"""
263 return self._replies.setdefault(partid, unbundlerecords())
263 return self._replies.setdefault(partid, unbundlerecords())
264
264
265 def __getitem__(self, cat):
265 def __getitem__(self, cat):
266 return tuple(self._categories.get(cat, ()))
266 return tuple(self._categories.get(cat, ()))
267
267
268 def __iter__(self):
268 def __iter__(self):
269 return iter(self._sequences)
269 return iter(self._sequences)
270
270
271 def __len__(self):
271 def __len__(self):
272 return len(self._sequences)
272 return len(self._sequences)
273
273
274 def __nonzero__(self):
274 def __nonzero__(self):
275 return bool(self._sequences)
275 return bool(self._sequences)
276
276
277 __bool__ = __nonzero__
277 __bool__ = __nonzero__
278
278
279 class bundleoperation(object):
279 class bundleoperation(object):
280 """an object that represents a single bundling process
280 """an object that represents a single bundling process
281
281
282 Its purpose is to carry unbundle-related objects and states.
282 Its purpose is to carry unbundle-related objects and states.
283
283
284 A new object should be created at the beginning of each bundle processing.
284 A new object should be created at the beginning of each bundle processing.
285 The object is to be returned by the processing function.
285 The object is to be returned by the processing function.
286
286
287 The object has very little content now it will ultimately contain:
287 The object has very little content now it will ultimately contain:
288 * an access to the repo the bundle is applied to,
288 * an access to the repo the bundle is applied to,
289 * a ui object,
289 * a ui object,
290 * a way to retrieve a transaction to add changes to the repo,
290 * a way to retrieve a transaction to add changes to the repo,
291 * a way to record the result of processing each part,
291 * a way to record the result of processing each part,
292 * a way to construct a bundle response when applicable.
292 * a way to construct a bundle response when applicable.
293 """
293 """
294
294
295 def __init__(self, repo, transactiongetter, captureoutput=True):
295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 self.repo = repo
296 self.repo = repo
297 self.ui = repo.ui
297 self.ui = repo.ui
298 self.records = unbundlerecords()
298 self.records = unbundlerecords()
299 self.reply = None
299 self.reply = None
300 self.captureoutput = captureoutput
300 self.captureoutput = captureoutput
301 self.hookargs = {}
301 self.hookargs = {}
302 self._gettransaction = transactiongetter
302 self._gettransaction = transactiongetter
303
303
304 def gettransaction(self):
304 def gettransaction(self):
305 transaction = self._gettransaction()
305 transaction = self._gettransaction()
306
306
307 if self.hookargs is not None:
307 if self.hookargs is not None:
308 # the ones added to the transaction supercede those added
308 # the ones added to the transaction supercede those added
309 # to the operation.
309 # to the operation.
310 self.hookargs.update(transaction.hookargs)
310 self.hookargs.update(transaction.hookargs)
311 transaction.hookargs = self.hookargs
311 transaction.hookargs = self.hookargs
312
312
313 # mark the hookargs as flushed. further attempts to add to
313 # mark the hookargs as flushed. further attempts to add to
314 # hookargs will result in an abort.
314 # hookargs will result in an abort.
315 self.hookargs = None
315 self.hookargs = None
316
316
317 return transaction
317 return transaction
318
318
319 def addhookargs(self, hookargs):
319 def addhookargs(self, hookargs):
320 if self.hookargs is None:
320 if self.hookargs is None:
321 raise error.Abort(
321 raise error.Abort(
322 _('attempted to add hooks to operation after transaction '
322 _('attempted to add hooks to operation after transaction '
323 'started'))
323 'started'))
324 self.hookargs.update(hookargs)
324 self.hookargs.update(hookargs)
325
325
326 class TransactionUnavailable(RuntimeError):
326 class TransactionUnavailable(RuntimeError):
327 pass
327 pass
328
328
329 def _notransaction():
329 def _notransaction():
330 """default method to get a transaction while processing a bundle
330 """default method to get a transaction while processing a bundle
331
331
332 Raise an exception to highlight the fact that no transaction was expected
332 Raise an exception to highlight the fact that no transaction was expected
333 to be created"""
333 to be created"""
334 raise TransactionUnavailable()
334 raise TransactionUnavailable()
335
335
336 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
336 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
337 # transform me into unbundler.apply() as soon as the freeze is lifted
337 # transform me into unbundler.apply() as soon as the freeze is lifted
338 if isinstance(unbundler, unbundle20):
338 if isinstance(unbundler, unbundle20):
339 tr.hookargs['bundle2'] = '1'
339 tr.hookargs['bundle2'] = '1'
340 if source is not None and 'source' not in tr.hookargs:
340 if source is not None and 'source' not in tr.hookargs:
341 tr.hookargs['source'] = source
341 tr.hookargs['source'] = source
342 if url is not None and 'url' not in tr.hookargs:
342 if url is not None and 'url' not in tr.hookargs:
343 tr.hookargs['url'] = url
343 tr.hookargs['url'] = url
344 return processbundle(repo, unbundler, lambda: tr)
344 return processbundle(repo, unbundler, lambda: tr)
345 else:
345 else:
346 # the transactiongetter won't be used, but we might as well set it
346 # the transactiongetter won't be used, but we might as well set it
347 op = bundleoperation(repo, lambda: tr)
347 op = bundleoperation(repo, lambda: tr)
348 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
348 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
349 return op
349 return op
350
350
351 def processbundle(repo, unbundler, transactiongetter=None, op=None):
351 def processbundle(repo, unbundler, transactiongetter=None, op=None):
352 """This function process a bundle, apply effect to/from a repo
352 """This function process a bundle, apply effect to/from a repo
353
353
354 It iterates over each part then searches for and uses the proper handling
354 It iterates over each part then searches for and uses the proper handling
355 code to process the part. Parts are processed in order.
355 code to process the part. Parts are processed in order.
356
356
357 Unknown Mandatory part will abort the process.
357 Unknown Mandatory part will abort the process.
358
358
359 It is temporarily possible to provide a prebuilt bundleoperation to the
359 It is temporarily possible to provide a prebuilt bundleoperation to the
360 function. This is used to ensure output is properly propagated in case of
360 function. This is used to ensure output is properly propagated in case of
361 an error during the unbundling. This output capturing part will likely be
361 an error during the unbundling. This output capturing part will likely be
362 reworked and this ability will probably go away in the process.
362 reworked and this ability will probably go away in the process.
363 """
363 """
364 if op is None:
364 if op is None:
365 if transactiongetter is None:
365 if transactiongetter is None:
366 transactiongetter = _notransaction
366 transactiongetter = _notransaction
367 op = bundleoperation(repo, transactiongetter)
367 op = bundleoperation(repo, transactiongetter)
368 # todo:
368 # todo:
369 # - replace this is a init function soon.
369 # - replace this is a init function soon.
370 # - exception catching
370 # - exception catching
371 unbundler.params
371 unbundler.params
372 if repo.ui.debugflag:
372 if repo.ui.debugflag:
373 msg = ['bundle2-input-bundle:']
373 msg = ['bundle2-input-bundle:']
374 if unbundler.params:
374 if unbundler.params:
375 msg.append(' %i params' % len(unbundler.params))
375 msg.append(' %i params' % len(unbundler.params))
376 if op.gettransaction is None or op.gettransaction is _notransaction:
376 if op.gettransaction is None or op.gettransaction is _notransaction:
377 msg.append(' no-transaction')
377 msg.append(' no-transaction')
378 else:
378 else:
379 msg.append(' with-transaction')
379 msg.append(' with-transaction')
380 msg.append('\n')
380 msg.append('\n')
381 repo.ui.debug(''.join(msg))
381 repo.ui.debug(''.join(msg))
382 iterparts = enumerate(unbundler.iterparts())
382 iterparts = enumerate(unbundler.iterparts())
383 part = None
383 part = None
384 nbpart = 0
384 nbpart = 0
385 try:
385 try:
386 for nbpart, part in iterparts:
386 for nbpart, part in iterparts:
387 _processpart(op, part)
387 _processpart(op, part)
388 except Exception as exc:
388 except Exception as exc:
389 # Any exceptions seeking to the end of the bundle at this point are
389 # Any exceptions seeking to the end of the bundle at this point are
390 # almost certainly related to the underlying stream being bad.
390 # almost certainly related to the underlying stream being bad.
391 # And, chances are that the exception we're handling is related to
391 # And, chances are that the exception we're handling is related to
392 # getting in that bad state. So, we swallow the seeking error and
392 # getting in that bad state. So, we swallow the seeking error and
393 # re-raise the original error.
393 # re-raise the original error.
394 seekerror = False
394 seekerror = False
395 try:
395 try:
396 for nbpart, part in iterparts:
396 for nbpart, part in iterparts:
397 # consume the bundle content
397 # consume the bundle content
398 part.seek(0, 2)
398 part.seek(0, 2)
399 except Exception:
399 except Exception:
400 seekerror = True
400 seekerror = True
401
401
402 # Small hack to let caller code distinguish exceptions from bundle2
402 # Small hack to let caller code distinguish exceptions from bundle2
403 # processing from processing the old format. This is mostly
403 # processing from processing the old format. This is mostly
404 # needed to handle different return codes to unbundle according to the
404 # needed to handle different return codes to unbundle according to the
405 # type of bundle. We should probably clean up or drop this return code
405 # type of bundle. We should probably clean up or drop this return code
406 # craziness in a future version.
406 # craziness in a future version.
407 exc.duringunbundle2 = True
407 exc.duringunbundle2 = True
408 salvaged = []
408 salvaged = []
409 replycaps = None
409 replycaps = None
410 if op.reply is not None:
410 if op.reply is not None:
411 salvaged = op.reply.salvageoutput()
411 salvaged = op.reply.salvageoutput()
412 replycaps = op.reply.capabilities
412 replycaps = op.reply.capabilities
413 exc._replycaps = replycaps
413 exc._replycaps = replycaps
414 exc._bundle2salvagedoutput = salvaged
414 exc._bundle2salvagedoutput = salvaged
415
415
416 # Re-raising from a variable loses the original stack. So only use
416 # Re-raising from a variable loses the original stack. So only use
417 # that form if we need to.
417 # that form if we need to.
418 if seekerror:
418 if seekerror:
419 raise exc
419 raise exc
420 else:
420 else:
421 raise
421 raise
422 finally:
422 finally:
423 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
423 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
424
424
425 return op
425 return op
426
426
427 def _processchangegroup(op, cg, tr, source, url, **kwargs):
427 def _processchangegroup(op, cg, tr, source, url, **kwargs):
428 ret = cg.apply(op.repo, tr, source, url, **kwargs)
428 ret = cg.apply(op.repo, tr, source, url, **kwargs)
429 op.records.add('changegroup', {
429 op.records.add('changegroup', {
430 'return': ret,
430 'return': ret,
431 })
431 })
432 return ret
432 return ret
433
433
434 def _processpart(op, part):
434 def _processpart(op, part):
435 """process a single part from a bundle
435 """process a single part from a bundle
436
436
437 The part is guaranteed to have been fully consumed when the function exits
437 The part is guaranteed to have been fully consumed when the function exits
438 (even if an exception is raised)."""
438 (even if an exception is raised)."""
439 status = 'unknown' # used by debug output
439 status = 'unknown' # used by debug output
440 hardabort = False
440 hardabort = False
441 try:
441 try:
442 try:
442 try:
443 handler = parthandlermapping.get(part.type)
443 handler = parthandlermapping.get(part.type)
444 if handler is None:
444 if handler is None:
445 status = 'unsupported-type'
445 status = 'unsupported-type'
446 raise error.BundleUnknownFeatureError(parttype=part.type)
446 raise error.BundleUnknownFeatureError(parttype=part.type)
447 indebug(op.ui, 'found a handler for part %r' % part.type)
447 indebug(op.ui, 'found a handler for part %r' % part.type)
448 unknownparams = part.mandatorykeys - handler.params
448 unknownparams = part.mandatorykeys - handler.params
449 if unknownparams:
449 if unknownparams:
450 unknownparams = list(unknownparams)
450 unknownparams = list(unknownparams)
451 unknownparams.sort()
451 unknownparams.sort()
452 status = 'unsupported-params (%s)' % unknownparams
452 status = 'unsupported-params (%s)' % unknownparams
453 raise error.BundleUnknownFeatureError(parttype=part.type,
453 raise error.BundleUnknownFeatureError(parttype=part.type,
454 params=unknownparams)
454 params=unknownparams)
455 status = 'supported'
455 status = 'supported'
456 except error.BundleUnknownFeatureError as exc:
456 except error.BundleUnknownFeatureError as exc:
457 if part.mandatory: # mandatory parts
457 if part.mandatory: # mandatory parts
458 raise
458 raise
459 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
459 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
460 return # skip to part processing
460 return # skip to part processing
461 finally:
461 finally:
462 if op.ui.debugflag:
462 if op.ui.debugflag:
463 msg = ['bundle2-input-part: "%s"' % part.type]
463 msg = ['bundle2-input-part: "%s"' % part.type]
464 if not part.mandatory:
464 if not part.mandatory:
465 msg.append(' (advisory)')
465 msg.append(' (advisory)')
466 nbmp = len(part.mandatorykeys)
466 nbmp = len(part.mandatorykeys)
467 nbap = len(part.params) - nbmp
467 nbap = len(part.params) - nbmp
468 if nbmp or nbap:
468 if nbmp or nbap:
469 msg.append(' (params:')
469 msg.append(' (params:')
470 if nbmp:
470 if nbmp:
471 msg.append(' %i mandatory' % nbmp)
471 msg.append(' %i mandatory' % nbmp)
472 if nbap:
472 if nbap:
473 msg.append(' %i advisory' % nbmp)
473 msg.append(' %i advisory' % nbmp)
474 msg.append(')')
474 msg.append(')')
475 msg.append(' %s\n' % status)
475 msg.append(' %s\n' % status)
476 op.ui.debug(''.join(msg))
476 op.ui.debug(''.join(msg))
477
477
478 # handler is called outside the above try block so that we don't
478 # handler is called outside the above try block so that we don't
479 # risk catching KeyErrors from anything other than the
479 # risk catching KeyErrors from anything other than the
480 # parthandlermapping lookup (any KeyError raised by handler()
480 # parthandlermapping lookup (any KeyError raised by handler()
481 # itself represents a defect of a different variety).
481 # itself represents a defect of a different variety).
482 output = None
482 output = None
483 if op.captureoutput and op.reply is not None:
483 if op.captureoutput and op.reply is not None:
484 op.ui.pushbuffer(error=True, subproc=True)
484 op.ui.pushbuffer(error=True, subproc=True)
485 output = ''
485 output = ''
486 try:
486 try:
487 handler(op, part)
487 handler(op, part)
488 finally:
488 finally:
489 if output is not None:
489 if output is not None:
490 output = op.ui.popbuffer()
490 output = op.ui.popbuffer()
491 if output:
491 if output:
492 outpart = op.reply.newpart('output', data=output,
492 outpart = op.reply.newpart('output', data=output,
493 mandatory=False)
493 mandatory=False)
494 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
494 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
495 # If exiting or interrupted, do not attempt to seek the stream in the
495 # If exiting or interrupted, do not attempt to seek the stream in the
496 # finally block below. This makes abort faster.
496 # finally block below. This makes abort faster.
497 except (SystemExit, KeyboardInterrupt):
497 except (SystemExit, KeyboardInterrupt):
498 hardabort = True
498 hardabort = True
499 raise
499 raise
500 finally:
500 finally:
501 # consume the part content to not corrupt the stream.
501 # consume the part content to not corrupt the stream.
502 if not hardabort:
502 if not hardabort:
503 part.seek(0, 2)
503 part.seek(0, 2)
504
504
505
505
506 def decodecaps(blob):
506 def decodecaps(blob):
507 """decode a bundle2 caps bytes blob into a dictionary
507 """decode a bundle2 caps bytes blob into a dictionary
508
508
509 The blob is a list of capabilities (one per line)
509 The blob is a list of capabilities (one per line)
510 Capabilities may have values using a line of the form::
510 Capabilities may have values using a line of the form::
511
511
512 capability=value1,value2,value3
512 capability=value1,value2,value3
513
513
514 The values are always a list."""
514 The values are always a list."""
515 caps = {}
515 caps = {}
516 for line in blob.splitlines():
516 for line in blob.splitlines():
517 if not line:
517 if not line:
518 continue
518 continue
519 if '=' not in line:
519 if '=' not in line:
520 key, vals = line, ()
520 key, vals = line, ()
521 else:
521 else:
522 key, vals = line.split('=', 1)
522 key, vals = line.split('=', 1)
523 vals = vals.split(',')
523 vals = vals.split(',')
524 key = urlreq.unquote(key)
524 key = urlreq.unquote(key)
525 vals = [urlreq.unquote(v) for v in vals]
525 vals = [urlreq.unquote(v) for v in vals]
526 caps[key] = vals
526 caps[key] = vals
527 return caps
527 return caps
528
528
529 def encodecaps(caps):
529 def encodecaps(caps):
530 """encode a bundle2 caps dictionary into a bytes blob"""
530 """encode a bundle2 caps dictionary into a bytes blob"""
531 chunks = []
531 chunks = []
532 for ca in sorted(caps):
532 for ca in sorted(caps):
533 vals = caps[ca]
533 vals = caps[ca]
534 ca = urlreq.quote(ca)
534 ca = urlreq.quote(ca)
535 vals = [urlreq.quote(v) for v in vals]
535 vals = [urlreq.quote(v) for v in vals]
536 if vals:
536 if vals:
537 ca = "%s=%s" % (ca, ','.join(vals))
537 ca = "%s=%s" % (ca, ','.join(vals))
538 chunks.append(ca)
538 chunks.append(ca)
539 return '\n'.join(chunks)
539 return '\n'.join(chunks)
540
540
541 bundletypes = {
541 bundletypes = {
542 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
542 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
543 # since the unification ssh accepts a header but there
543 # since the unification ssh accepts a header but there
544 # is no capability signaling it.
544 # is no capability signaling it.
545 "HG20": (), # special-cased below
545 "HG20": (), # special-cased below
546 "HG10UN": ("HG10UN", 'UN'),
546 "HG10UN": ("HG10UN", 'UN'),
547 "HG10BZ": ("HG10", 'BZ'),
547 "HG10BZ": ("HG10", 'BZ'),
548 "HG10GZ": ("HG10GZ", 'GZ'),
548 "HG10GZ": ("HG10GZ", 'GZ'),
549 }
549 }
550
550
551 # hgweb uses this list to communicate its preferred type
551 # hgweb uses this list to communicate its preferred type
552 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
552 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
553
553
554 class bundle20(object):
554 class bundle20(object):
555 """represent an outgoing bundle2 container
555 """represent an outgoing bundle2 container
556
556
557 Use the `addparam` method to add stream level parameter. and `newpart` to
557 Use the `addparam` method to add stream level parameter. and `newpart` to
558 populate it. Then call `getchunks` to retrieve all the binary chunks of
558 populate it. Then call `getchunks` to retrieve all the binary chunks of
559 data that compose the bundle2 container."""
559 data that compose the bundle2 container."""
560
560
561 _magicstring = 'HG20'
561 _magicstring = 'HG20'
562
562
563 def __init__(self, ui, capabilities=()):
563 def __init__(self, ui, capabilities=()):
564 self.ui = ui
564 self.ui = ui
565 self._params = []
565 self._params = []
566 self._parts = []
566 self._parts = []
567 self.capabilities = dict(capabilities)
567 self.capabilities = dict(capabilities)
568 self._compengine = util.compengines.forbundletype('UN')
568 self._compengine = util.compengines.forbundletype('UN')
569 self._compopts = None
569 self._compopts = None
570
570
571 def setcompression(self, alg, compopts=None):
571 def setcompression(self, alg, compopts=None):
572 """setup core part compression to <alg>"""
572 """setup core part compression to <alg>"""
573 if alg in (None, 'UN'):
573 if alg in (None, 'UN'):
574 return
574 return
575 assert not any(n.lower() == 'compression' for n, v in self._params)
575 assert not any(n.lower() == 'compression' for n, v in self._params)
576 self.addparam('Compression', alg)
576 self.addparam('Compression', alg)
577 self._compengine = util.compengines.forbundletype(alg)
577 self._compengine = util.compengines.forbundletype(alg)
578 self._compopts = compopts
578 self._compopts = compopts
579
579
580 @property
580 @property
581 def nbparts(self):
581 def nbparts(self):
582 """total number of parts added to the bundler"""
582 """total number of parts added to the bundler"""
583 return len(self._parts)
583 return len(self._parts)
584
584
585 # methods used to defines the bundle2 content
585 # methods used to defines the bundle2 content
586 def addparam(self, name, value=None):
586 def addparam(self, name, value=None):
587 """add a stream level parameter"""
587 """add a stream level parameter"""
588 if not name:
588 if not name:
589 raise ValueError('empty parameter name')
589 raise ValueError('empty parameter name')
590 if name[0] not in string.letters:
590 if name[0] not in string.letters:
591 raise ValueError('non letter first character: %r' % name)
591 raise ValueError('non letter first character: %r' % name)
592 self._params.append((name, value))
592 self._params.append((name, value))
593
593
594 def addpart(self, part):
594 def addpart(self, part):
595 """add a new part to the bundle2 container
595 """add a new part to the bundle2 container
596
596
597 Parts contains the actual applicative payload."""
597 Parts contains the actual applicative payload."""
598 assert part.id is None
598 assert part.id is None
599 part.id = len(self._parts) # very cheap counter
599 part.id = len(self._parts) # very cheap counter
600 self._parts.append(part)
600 self._parts.append(part)
601
601
602 def newpart(self, typeid, *args, **kwargs):
602 def newpart(self, typeid, *args, **kwargs):
603 """create a new part and add it to the containers
603 """create a new part and add it to the containers
604
604
605 As the part is directly added to the containers. For now, this means
605 As the part is directly added to the containers. For now, this means
606 that any failure to properly initialize the part after calling
606 that any failure to properly initialize the part after calling
607 ``newpart`` should result in a failure of the whole bundling process.
607 ``newpart`` should result in a failure of the whole bundling process.
608
608
609 You can still fall back to manually create and add if you need better
609 You can still fall back to manually create and add if you need better
610 control."""
610 control."""
611 part = bundlepart(typeid, *args, **kwargs)
611 part = bundlepart(typeid, *args, **kwargs)
612 self.addpart(part)
612 self.addpart(part)
613 return part
613 return part
614
614
615 # methods used to generate the bundle2 stream
615 # methods used to generate the bundle2 stream
616 def getchunks(self):
616 def getchunks(self):
617 if self.ui.debugflag:
617 if self.ui.debugflag:
618 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
618 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
619 if self._params:
619 if self._params:
620 msg.append(' (%i params)' % len(self._params))
620 msg.append(' (%i params)' % len(self._params))
621 msg.append(' %i parts total\n' % len(self._parts))
621 msg.append(' %i parts total\n' % len(self._parts))
622 self.ui.debug(''.join(msg))
622 self.ui.debug(''.join(msg))
623 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
623 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
624 yield self._magicstring
624 yield self._magicstring
625 param = self._paramchunk()
625 param = self._paramchunk()
626 outdebug(self.ui, 'bundle parameter: %s' % param)
626 outdebug(self.ui, 'bundle parameter: %s' % param)
627 yield _pack(_fstreamparamsize, len(param))
627 yield _pack(_fstreamparamsize, len(param))
628 if param:
628 if param:
629 yield param
629 yield param
630 for chunk in self._compengine.compressstream(self._getcorechunk(),
630 for chunk in self._compengine.compressstream(self._getcorechunk(),
631 self._compopts):
631 self._compopts):
632 yield chunk
632 yield chunk
633
633
634 def _paramchunk(self):
634 def _paramchunk(self):
635 """return a encoded version of all stream parameters"""
635 """return a encoded version of all stream parameters"""
636 blocks = []
636 blocks = []
637 for par, value in self._params:
637 for par, value in self._params:
638 par = urlreq.quote(par)
638 par = urlreq.quote(par)
639 if value is not None:
639 if value is not None:
640 value = urlreq.quote(value)
640 value = urlreq.quote(value)
641 par = '%s=%s' % (par, value)
641 par = '%s=%s' % (par, value)
642 blocks.append(par)
642 blocks.append(par)
643 return ' '.join(blocks)
643 return ' '.join(blocks)
644
644
645 def _getcorechunk(self):
645 def _getcorechunk(self):
646 """yield chunk for the core part of the bundle
646 """yield chunk for the core part of the bundle
647
647
648 (all but headers and parameters)"""
648 (all but headers and parameters)"""
649 outdebug(self.ui, 'start of parts')
649 outdebug(self.ui, 'start of parts')
650 for part in self._parts:
650 for part in self._parts:
651 outdebug(self.ui, 'bundle part: "%s"' % part.type)
651 outdebug(self.ui, 'bundle part: "%s"' % part.type)
652 for chunk in part.getchunks(ui=self.ui):
652 for chunk in part.getchunks(ui=self.ui):
653 yield chunk
653 yield chunk
654 outdebug(self.ui, 'end of bundle')
654 outdebug(self.ui, 'end of bundle')
655 yield _pack(_fpartheadersize, 0)
655 yield _pack(_fpartheadersize, 0)
656
656
657
657
658 def salvageoutput(self):
658 def salvageoutput(self):
659 """return a list with a copy of all output parts in the bundle
659 """return a list with a copy of all output parts in the bundle
660
660
661 This is meant to be used during error handling to make sure we preserve
661 This is meant to be used during error handling to make sure we preserve
662 server output"""
662 server output"""
663 salvaged = []
663 salvaged = []
664 for part in self._parts:
664 for part in self._parts:
665 if part.type.startswith('output'):
665 if part.type.startswith('output'):
666 salvaged.append(part.copy())
666 salvaged.append(part.copy())
667 return salvaged
667 return salvaged
668
668
669
669
670 class unpackermixin(object):
670 class unpackermixin(object):
671 """A mixin to extract bytes and struct data from a stream"""
671 """A mixin to extract bytes and struct data from a stream"""
672
672
673 def __init__(self, fp):
673 def __init__(self, fp):
674 self._fp = fp
674 self._fp = fp
675
675
676 def _unpack(self, format):
676 def _unpack(self, format):
677 """unpack this struct format from the stream
677 """unpack this struct format from the stream
678
678
679 This method is meant for internal usage by the bundle2 protocol only.
679 This method is meant for internal usage by the bundle2 protocol only.
680 They directly manipulate the low level stream including bundle2 level
680 They directly manipulate the low level stream including bundle2 level
681 instruction.
681 instruction.
682
682
683 Do not use it to implement higher-level logic or methods."""
683 Do not use it to implement higher-level logic or methods."""
684 data = self._readexact(struct.calcsize(format))
684 data = self._readexact(struct.calcsize(format))
685 return _unpack(format, data)
685 return _unpack(format, data)
686
686
687 def _readexact(self, size):
687 def _readexact(self, size):
688 """read exactly <size> bytes from the stream
688 """read exactly <size> bytes from the stream
689
689
690 This method is meant for internal usage by the bundle2 protocol only.
690 This method is meant for internal usage by the bundle2 protocol only.
691 They directly manipulate the low level stream including bundle2 level
691 They directly manipulate the low level stream including bundle2 level
692 instruction.
692 instruction.
693
693
694 Do not use it to implement higher-level logic or methods."""
694 Do not use it to implement higher-level logic or methods."""
695 return changegroup.readexactly(self._fp, size)
695 return changegroup.readexactly(self._fp, size)
696
696
697 def getunbundler(ui, fp, magicstring=None):
697 def getunbundler(ui, fp, magicstring=None):
698 """return a valid unbundler object for a given magicstring"""
698 """return a valid unbundler object for a given magicstring"""
699 if magicstring is None:
699 if magicstring is None:
700 magicstring = changegroup.readexactly(fp, 4)
700 magicstring = changegroup.readexactly(fp, 4)
701 magic, version = magicstring[0:2], magicstring[2:4]
701 magic, version = magicstring[0:2], magicstring[2:4]
702 if magic != 'HG':
702 if magic != 'HG':
703 ui.debug(
703 ui.debug(
704 "error: invalid magic: %r (version %r), should be 'HG'\n"
704 "error: invalid magic: %r (version %r), should be 'HG'\n"
705 % (magic, version))
705 % (magic, version))
706 raise error.Abort(_('not a Mercurial bundle'))
706 raise error.Abort(_('not a Mercurial bundle'))
707 unbundlerclass = formatmap.get(version)
707 unbundlerclass = formatmap.get(version)
708 if unbundlerclass is None:
708 if unbundlerclass is None:
709 raise error.Abort(_('unknown bundle version %s') % version)
709 raise error.Abort(_('unknown bundle version %s') % version)
710 unbundler = unbundlerclass(ui, fp)
710 unbundler = unbundlerclass(ui, fp)
711 indebug(ui, 'start processing of %s stream' % magicstring)
711 indebug(ui, 'start processing of %s stream' % magicstring)
712 return unbundler
712 return unbundler
713
713
714 class unbundle20(unpackermixin):
714 class unbundle20(unpackermixin):
715 """interpret a bundle2 stream
715 """interpret a bundle2 stream
716
716
717 This class is fed with a binary stream and yields parts through its
717 This class is fed with a binary stream and yields parts through its
718 `iterparts` methods."""
718 `iterparts` methods."""
719
719
720 _magicstring = 'HG20'
720 _magicstring = 'HG20'
721
721
722 def __init__(self, ui, fp):
722 def __init__(self, ui, fp):
723 """If header is specified, we do not read it out of the stream."""
723 """If header is specified, we do not read it out of the stream."""
724 self.ui = ui
724 self.ui = ui
725 self._compengine = util.compengines.forbundletype('UN')
725 self._compengine = util.compengines.forbundletype('UN')
726 self._compressed = None
726 self._compressed = None
727 super(unbundle20, self).__init__(fp)
727 super(unbundle20, self).__init__(fp)
728
728
729 @util.propertycache
729 @util.propertycache
730 def params(self):
730 def params(self):
731 """dictionary of stream level parameters"""
731 """dictionary of stream level parameters"""
732 indebug(self.ui, 'reading bundle2 stream parameters')
732 indebug(self.ui, 'reading bundle2 stream parameters')
733 params = {}
733 params = {}
734 paramssize = self._unpack(_fstreamparamsize)[0]
734 paramssize = self._unpack(_fstreamparamsize)[0]
735 if paramssize < 0:
735 if paramssize < 0:
736 raise error.BundleValueError('negative bundle param size: %i'
736 raise error.BundleValueError('negative bundle param size: %i'
737 % paramssize)
737 % paramssize)
738 if paramssize:
738 if paramssize:
739 params = self._readexact(paramssize)
739 params = self._readexact(paramssize)
740 params = self._processallparams(params)
740 params = self._processallparams(params)
741 return params
741 return params
742
742
743 def _processallparams(self, paramsblock):
743 def _processallparams(self, paramsblock):
744 """"""
744 """"""
745 params = util.sortdict()
745 params = util.sortdict()
746 for p in paramsblock.split(' '):
746 for p in paramsblock.split(' '):
747 p = p.split('=', 1)
747 p = p.split('=', 1)
748 p = [urlreq.unquote(i) for i in p]
748 p = [urlreq.unquote(i) for i in p]
749 if len(p) < 2:
749 if len(p) < 2:
750 p.append(None)
750 p.append(None)
751 self._processparam(*p)
751 self._processparam(*p)
752 params[p[0]] = p[1]
752 params[p[0]] = p[1]
753 return params
753 return params
754
754
755
755
756 def _processparam(self, name, value):
756 def _processparam(self, name, value):
757 """process a parameter, applying its effect if needed
757 """process a parameter, applying its effect if needed
758
758
759 Parameter starting with a lower case letter are advisory and will be
759 Parameter starting with a lower case letter are advisory and will be
760 ignored when unknown. Those starting with an upper case letter are
760 ignored when unknown. Those starting with an upper case letter are
761 mandatory and will this function will raise a KeyError when unknown.
761 mandatory and will this function will raise a KeyError when unknown.
762
762
763 Note: no option are currently supported. Any input will be either
763 Note: no option are currently supported. Any input will be either
764 ignored or failing.
764 ignored or failing.
765 """
765 """
766 if not name:
766 if not name:
767 raise ValueError('empty parameter name')
767 raise ValueError('empty parameter name')
768 if name[0] not in string.letters:
768 if name[0] not in string.letters:
769 raise ValueError('non letter first character: %r' % name)
769 raise ValueError('non letter first character: %r' % name)
770 try:
770 try:
771 handler = b2streamparamsmap[name.lower()]
771 handler = b2streamparamsmap[name.lower()]
772 except KeyError:
772 except KeyError:
773 if name[0].islower():
773 if name[0].islower():
774 indebug(self.ui, "ignoring unknown parameter %r" % name)
774 indebug(self.ui, "ignoring unknown parameter %r" % name)
775 else:
775 else:
776 raise error.BundleUnknownFeatureError(params=(name,))
776 raise error.BundleUnknownFeatureError(params=(name,))
777 else:
777 else:
778 handler(self, name, value)
778 handler(self, name, value)
779
779
780 def _forwardchunks(self):
780 def _forwardchunks(self):
781 """utility to transfer a bundle2 as binary
781 """utility to transfer a bundle2 as binary
782
782
783 This is made necessary by the fact the 'getbundle' command over 'ssh'
783 This is made necessary by the fact the 'getbundle' command over 'ssh'
784 have no way to know then the reply end, relying on the bundle to be
784 have no way to know then the reply end, relying on the bundle to be
785 interpreted to know its end. This is terrible and we are sorry, but we
785 interpreted to know its end. This is terrible and we are sorry, but we
786 needed to move forward to get general delta enabled.
786 needed to move forward to get general delta enabled.
787 """
787 """
788 yield self._magicstring
788 yield self._magicstring
789 assert 'params' not in vars(self)
789 assert 'params' not in vars(self)
790 paramssize = self._unpack(_fstreamparamsize)[0]
790 paramssize = self._unpack(_fstreamparamsize)[0]
791 if paramssize < 0:
791 if paramssize < 0:
792 raise error.BundleValueError('negative bundle param size: %i'
792 raise error.BundleValueError('negative bundle param size: %i'
793 % paramssize)
793 % paramssize)
794 yield _pack(_fstreamparamsize, paramssize)
794 yield _pack(_fstreamparamsize, paramssize)
795 if paramssize:
795 if paramssize:
796 params = self._readexact(paramssize)
796 params = self._readexact(paramssize)
797 self._processallparams(params)
797 self._processallparams(params)
798 yield params
798 yield params
799 assert self._compengine.bundletype == 'UN'
799 assert self._compengine.bundletype == 'UN'
800 # From there, payload might need to be decompressed
800 # From there, payload might need to be decompressed
801 self._fp = self._compengine.decompressorreader(self._fp)
801 self._fp = self._compengine.decompressorreader(self._fp)
802 emptycount = 0
802 emptycount = 0
803 while emptycount < 2:
803 while emptycount < 2:
804 # so we can brainlessly loop
804 # so we can brainlessly loop
805 assert _fpartheadersize == _fpayloadsize
805 assert _fpartheadersize == _fpayloadsize
806 size = self._unpack(_fpartheadersize)[0]
806 size = self._unpack(_fpartheadersize)[0]
807 yield _pack(_fpartheadersize, size)
807 yield _pack(_fpartheadersize, size)
808 if size:
808 if size:
809 emptycount = 0
809 emptycount = 0
810 else:
810 else:
811 emptycount += 1
811 emptycount += 1
812 continue
812 continue
813 if size == flaginterrupt:
813 if size == flaginterrupt:
814 continue
814 continue
815 elif size < 0:
815 elif size < 0:
816 raise error.BundleValueError('negative chunk size: %i')
816 raise error.BundleValueError('negative chunk size: %i')
817 yield self._readexact(size)
817 yield self._readexact(size)
818
818
819
819
820 def iterparts(self):
820 def iterparts(self):
821 """yield all parts contained in the stream"""
821 """yield all parts contained in the stream"""
822 # make sure param have been loaded
822 # make sure param have been loaded
823 self.params
823 self.params
824 # From there, payload need to be decompressed
824 # From there, payload need to be decompressed
825 self._fp = self._compengine.decompressorreader(self._fp)
825 self._fp = self._compengine.decompressorreader(self._fp)
826 indebug(self.ui, 'start extraction of bundle2 parts')
826 indebug(self.ui, 'start extraction of bundle2 parts')
827 headerblock = self._readpartheader()
827 headerblock = self._readpartheader()
828 while headerblock is not None:
828 while headerblock is not None:
829 part = unbundlepart(self.ui, headerblock, self._fp)
829 part = unbundlepart(self.ui, headerblock, self._fp)
830 yield part
830 yield part
831 part.seek(0, 2)
831 part.seek(0, 2)
832 headerblock = self._readpartheader()
832 headerblock = self._readpartheader()
833 indebug(self.ui, 'end of bundle2 stream')
833 indebug(self.ui, 'end of bundle2 stream')
834
834
835 def _readpartheader(self):
835 def _readpartheader(self):
836 """reads a part header size and return the bytes blob
836 """reads a part header size and return the bytes blob
837
837
838 returns None if empty"""
838 returns None if empty"""
839 headersize = self._unpack(_fpartheadersize)[0]
839 headersize = self._unpack(_fpartheadersize)[0]
840 if headersize < 0:
840 if headersize < 0:
841 raise error.BundleValueError('negative part header size: %i'
841 raise error.BundleValueError('negative part header size: %i'
842 % headersize)
842 % headersize)
843 indebug(self.ui, 'part header size: %i' % headersize)
843 indebug(self.ui, 'part header size: %i' % headersize)
844 if headersize:
844 if headersize:
845 return self._readexact(headersize)
845 return self._readexact(headersize)
846 return None
846 return None
847
847
848 def compressed(self):
848 def compressed(self):
849 self.params # load params
849 self.params # load params
850 return self._compressed
850 return self._compressed
851
851
852 def close(self):
852 def close(self):
853 """close underlying file"""
853 """close underlying file"""
854 if util.safehasattr(self._fp, 'close'):
854 if util.safehasattr(self._fp, 'close'):
855 return self._fp.close()
855 return self._fp.close()
856
856
857 formatmap = {'20': unbundle20}
857 formatmap = {'20': unbundle20}
858
858
859 b2streamparamsmap = {}
859 b2streamparamsmap = {}
860
860
861 def b2streamparamhandler(name):
861 def b2streamparamhandler(name):
862 """register a handler for a stream level parameter"""
862 """register a handler for a stream level parameter"""
863 def decorator(func):
863 def decorator(func):
864 assert name not in formatmap
864 assert name not in formatmap
865 b2streamparamsmap[name] = func
865 b2streamparamsmap[name] = func
866 return func
866 return func
867 return decorator
867 return decorator
868
868
869 @b2streamparamhandler('compression')
869 @b2streamparamhandler('compression')
870 def processcompression(unbundler, param, value):
870 def processcompression(unbundler, param, value):
871 """read compression parameter and install payload decompression"""
871 """read compression parameter and install payload decompression"""
872 if value not in util.compengines.supportedbundletypes:
872 if value not in util.compengines.supportedbundletypes:
873 raise error.BundleUnknownFeatureError(params=(param,),
873 raise error.BundleUnknownFeatureError(params=(param,),
874 values=(value,))
874 values=(value,))
875 unbundler._compengine = util.compengines.forbundletype(value)
875 unbundler._compengine = util.compengines.forbundletype(value)
876 if value is not None:
876 if value is not None:
877 unbundler._compressed = True
877 unbundler._compressed = True
878
878
879 class bundlepart(object):
879 class bundlepart(object):
880 """A bundle2 part contains application level payload
880 """A bundle2 part contains application level payload
881
881
882 The part `type` is used to route the part to the application level
882 The part `type` is used to route the part to the application level
883 handler.
883 handler.
884
884
885 The part payload is contained in ``part.data``. It could be raw bytes or a
885 The part payload is contained in ``part.data``. It could be raw bytes or a
886 generator of byte chunks.
886 generator of byte chunks.
887
887
888 You can add parameters to the part using the ``addparam`` method.
888 You can add parameters to the part using the ``addparam`` method.
889 Parameters can be either mandatory (default) or advisory. Remote side
889 Parameters can be either mandatory (default) or advisory. Remote side
890 should be able to safely ignore the advisory ones.
890 should be able to safely ignore the advisory ones.
891
891
892 Both data and parameters cannot be modified after the generation has begun.
892 Both data and parameters cannot be modified after the generation has begun.
893 """
893 """
894
894
895 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
895 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
896 data='', mandatory=True):
896 data='', mandatory=True):
897 validateparttype(parttype)
897 validateparttype(parttype)
898 self.id = None
898 self.id = None
899 self.type = parttype
899 self.type = parttype
900 self._data = data
900 self._data = data
901 self._mandatoryparams = list(mandatoryparams)
901 self._mandatoryparams = list(mandatoryparams)
902 self._advisoryparams = list(advisoryparams)
902 self._advisoryparams = list(advisoryparams)
903 # checking for duplicated entries
903 # checking for duplicated entries
904 self._seenparams = set()
904 self._seenparams = set()
905 for pname, __ in self._mandatoryparams + self._advisoryparams:
905 for pname, __ in self._mandatoryparams + self._advisoryparams:
906 if pname in self._seenparams:
906 if pname in self._seenparams:
907 raise error.ProgrammingError('duplicated params: %s' % pname)
907 raise error.ProgrammingError('duplicated params: %s' % pname)
908 self._seenparams.add(pname)
908 self._seenparams.add(pname)
909 # status of the part's generation:
909 # status of the part's generation:
910 # - None: not started,
910 # - None: not started,
911 # - False: currently generated,
911 # - False: currently generated,
912 # - True: generation done.
912 # - True: generation done.
913 self._generated = None
913 self._generated = None
914 self.mandatory = mandatory
914 self.mandatory = mandatory
915
915
916 def __repr__(self):
916 def __repr__(self):
917 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
917 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
918 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
918 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
919 % (cls, id(self), self.id, self.type, self.mandatory))
919 % (cls, id(self), self.id, self.type, self.mandatory))
920
920
921 def copy(self):
921 def copy(self):
922 """return a copy of the part
922 """return a copy of the part
923
923
924 The new part have the very same content but no partid assigned yet.
924 The new part have the very same content but no partid assigned yet.
925 Parts with generated data cannot be copied."""
925 Parts with generated data cannot be copied."""
926 assert not util.safehasattr(self.data, 'next')
926 assert not util.safehasattr(self.data, 'next')
927 return self.__class__(self.type, self._mandatoryparams,
927 return self.__class__(self.type, self._mandatoryparams,
928 self._advisoryparams, self._data, self.mandatory)
928 self._advisoryparams, self._data, self.mandatory)
929
929
930 # methods used to defines the part content
930 # methods used to defines the part content
931 @property
931 @property
932 def data(self):
932 def data(self):
933 return self._data
933 return self._data
934
934
935 @data.setter
935 @data.setter
936 def data(self, data):
936 def data(self, data):
937 if self._generated is not None:
937 if self._generated is not None:
938 raise error.ReadOnlyPartError('part is being generated')
938 raise error.ReadOnlyPartError('part is being generated')
939 self._data = data
939 self._data = data
940
940
941 @property
941 @property
942 def mandatoryparams(self):
942 def mandatoryparams(self):
943 # make it an immutable tuple to force people through ``addparam``
943 # make it an immutable tuple to force people through ``addparam``
944 return tuple(self._mandatoryparams)
944 return tuple(self._mandatoryparams)
945
945
946 @property
946 @property
947 def advisoryparams(self):
947 def advisoryparams(self):
948 # make it an immutable tuple to force people through ``addparam``
948 # make it an immutable tuple to force people through ``addparam``
949 return tuple(self._advisoryparams)
949 return tuple(self._advisoryparams)
950
950
951 def addparam(self, name, value='', mandatory=True):
951 def addparam(self, name, value='', mandatory=True):
952 """add a parameter to the part
952 """add a parameter to the part
953
953
954 If 'mandatory' is set to True, the remote handler must claim support
954 If 'mandatory' is set to True, the remote handler must claim support
955 for this parameter or the unbundling will be aborted.
955 for this parameter or the unbundling will be aborted.
956
956
957 The 'name' and 'value' cannot exceed 255 bytes each.
957 The 'name' and 'value' cannot exceed 255 bytes each.
958 """
958 """
959 if self._generated is not None:
959 if self._generated is not None:
960 raise error.ReadOnlyPartError('part is being generated')
960 raise error.ReadOnlyPartError('part is being generated')
961 if name in self._seenparams:
961 if name in self._seenparams:
962 raise ValueError('duplicated params: %s' % name)
962 raise ValueError('duplicated params: %s' % name)
963 self._seenparams.add(name)
963 self._seenparams.add(name)
964 params = self._advisoryparams
964 params = self._advisoryparams
965 if mandatory:
965 if mandatory:
966 params = self._mandatoryparams
966 params = self._mandatoryparams
967 params.append((name, value))
967 params.append((name, value))
968
968
969 # methods used to generates the bundle2 stream
969 # methods used to generates the bundle2 stream
970 def getchunks(self, ui):
970 def getchunks(self, ui):
971 if self._generated is not None:
971 if self._generated is not None:
972 raise error.ProgrammingError('part can only be consumed once')
972 raise error.ProgrammingError('part can only be consumed once')
973 self._generated = False
973 self._generated = False
974
974
975 if ui.debugflag:
975 if ui.debugflag:
976 msg = ['bundle2-output-part: "%s"' % self.type]
976 msg = ['bundle2-output-part: "%s"' % self.type]
977 if not self.mandatory:
977 if not self.mandatory:
978 msg.append(' (advisory)')
978 msg.append(' (advisory)')
979 nbmp = len(self.mandatoryparams)
979 nbmp = len(self.mandatoryparams)
980 nbap = len(self.advisoryparams)
980 nbap = len(self.advisoryparams)
981 if nbmp or nbap:
981 if nbmp or nbap:
982 msg.append(' (params:')
982 msg.append(' (params:')
983 if nbmp:
983 if nbmp:
984 msg.append(' %i mandatory' % nbmp)
984 msg.append(' %i mandatory' % nbmp)
985 if nbap:
985 if nbap:
986 msg.append(' %i advisory' % nbmp)
986 msg.append(' %i advisory' % nbmp)
987 msg.append(')')
987 msg.append(')')
988 if not self.data:
988 if not self.data:
989 msg.append(' empty payload')
989 msg.append(' empty payload')
990 elif util.safehasattr(self.data, 'next'):
990 elif util.safehasattr(self.data, 'next'):
991 msg.append(' streamed payload')
991 msg.append(' streamed payload')
992 else:
992 else:
993 msg.append(' %i bytes payload' % len(self.data))
993 msg.append(' %i bytes payload' % len(self.data))
994 msg.append('\n')
994 msg.append('\n')
995 ui.debug(''.join(msg))
995 ui.debug(''.join(msg))
996
996
997 #### header
997 #### header
998 if self.mandatory:
998 if self.mandatory:
999 parttype = self.type.upper()
999 parttype = self.type.upper()
1000 else:
1000 else:
1001 parttype = self.type.lower()
1001 parttype = self.type.lower()
1002 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
1002 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
1003 ## parttype
1003 ## parttype
1004 header = [_pack(_fparttypesize, len(parttype)),
1004 header = [_pack(_fparttypesize, len(parttype)),
1005 parttype, _pack(_fpartid, self.id),
1005 parttype, _pack(_fpartid, self.id),
1006 ]
1006 ]
1007 ## parameters
1007 ## parameters
1008 # count
1008 # count
1009 manpar = self.mandatoryparams
1009 manpar = self.mandatoryparams
1010 advpar = self.advisoryparams
1010 advpar = self.advisoryparams
1011 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1011 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1012 # size
1012 # size
1013 parsizes = []
1013 parsizes = []
1014 for key, value in manpar:
1014 for key, value in manpar:
1015 parsizes.append(len(key))
1015 parsizes.append(len(key))
1016 parsizes.append(len(value))
1016 parsizes.append(len(value))
1017 for key, value in advpar:
1017 for key, value in advpar:
1018 parsizes.append(len(key))
1018 parsizes.append(len(key))
1019 parsizes.append(len(value))
1019 parsizes.append(len(value))
1020 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
1020 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1021 header.append(paramsizes)
1021 header.append(paramsizes)
1022 # key, value
1022 # key, value
1023 for key, value in manpar:
1023 for key, value in manpar:
1024 header.append(key)
1024 header.append(key)
1025 header.append(value)
1025 header.append(value)
1026 for key, value in advpar:
1026 for key, value in advpar:
1027 header.append(key)
1027 header.append(key)
1028 header.append(value)
1028 header.append(value)
1029 ## finalize header
1029 ## finalize header
1030 headerchunk = ''.join(header)
1030 headerchunk = ''.join(header)
1031 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1031 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1032 yield _pack(_fpartheadersize, len(headerchunk))
1032 yield _pack(_fpartheadersize, len(headerchunk))
1033 yield headerchunk
1033 yield headerchunk
1034 ## payload
1034 ## payload
1035 try:
1035 try:
1036 for chunk in self._payloadchunks():
1036 for chunk in self._payloadchunks():
1037 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1037 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1038 yield _pack(_fpayloadsize, len(chunk))
1038 yield _pack(_fpayloadsize, len(chunk))
1039 yield chunk
1039 yield chunk
1040 except GeneratorExit:
1040 except GeneratorExit:
1041 # GeneratorExit means that nobody is listening for our
1041 # GeneratorExit means that nobody is listening for our
1042 # results anyway, so just bail quickly rather than trying
1042 # results anyway, so just bail quickly rather than trying
1043 # to produce an error part.
1043 # to produce an error part.
1044 ui.debug('bundle2-generatorexit\n')
1044 ui.debug('bundle2-generatorexit\n')
1045 raise
1045 raise
1046 except BaseException as exc:
1046 except BaseException as exc:
1047 # backup exception data for later
1047 # backup exception data for later
1048 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1048 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1049 % exc)
1049 % exc)
1050 tb = sys.exc_info()[2]
1050 tb = sys.exc_info()[2]
1051 msg = 'unexpected error: %s' % exc
1051 msg = 'unexpected error: %s' % exc
1052 interpart = bundlepart('error:abort', [('message', msg)],
1052 interpart = bundlepart('error:abort', [('message', msg)],
1053 mandatory=False)
1053 mandatory=False)
1054 interpart.id = 0
1054 interpart.id = 0
1055 yield _pack(_fpayloadsize, -1)
1055 yield _pack(_fpayloadsize, -1)
1056 for chunk in interpart.getchunks(ui=ui):
1056 for chunk in interpart.getchunks(ui=ui):
1057 yield chunk
1057 yield chunk
1058 outdebug(ui, 'closing payload chunk')
1058 outdebug(ui, 'closing payload chunk')
1059 # abort current part payload
1059 # abort current part payload
1060 yield _pack(_fpayloadsize, 0)
1060 yield _pack(_fpayloadsize, 0)
1061 pycompat.raisewithtb(exc, tb)
1061 pycompat.raisewithtb(exc, tb)
1062 # end of payload
1062 # end of payload
1063 outdebug(ui, 'closing payload chunk')
1063 outdebug(ui, 'closing payload chunk')
1064 yield _pack(_fpayloadsize, 0)
1064 yield _pack(_fpayloadsize, 0)
1065 self._generated = True
1065 self._generated = True
1066
1066
1067 def _payloadchunks(self):
1067 def _payloadchunks(self):
1068 """yield chunks of a the part payload
1068 """yield chunks of a the part payload
1069
1069
1070 Exists to handle the different methods to provide data to a part."""
1070 Exists to handle the different methods to provide data to a part."""
1071 # we only support fixed size data now.
1071 # we only support fixed size data now.
1072 # This will be improved in the future.
1072 # This will be improved in the future.
1073 if util.safehasattr(self.data, 'next'):
1073 if util.safehasattr(self.data, 'next'):
1074 buff = util.chunkbuffer(self.data)
1074 buff = util.chunkbuffer(self.data)
1075 chunk = buff.read(preferedchunksize)
1075 chunk = buff.read(preferedchunksize)
1076 while chunk:
1076 while chunk:
1077 yield chunk
1077 yield chunk
1078 chunk = buff.read(preferedchunksize)
1078 chunk = buff.read(preferedchunksize)
1079 elif len(self.data):
1079 elif len(self.data):
1080 yield self.data
1080 yield self.data
1081
1081
1082
1082
1083 flaginterrupt = -1
1083 flaginterrupt = -1
1084
1084
1085 class interrupthandler(unpackermixin):
1085 class interrupthandler(unpackermixin):
1086 """read one part and process it with restricted capability
1086 """read one part and process it with restricted capability
1087
1087
1088 This allows to transmit exception raised on the producer size during part
1088 This allows to transmit exception raised on the producer size during part
1089 iteration while the consumer is reading a part.
1089 iteration while the consumer is reading a part.
1090
1090
1091 Part processed in this manner only have access to a ui object,"""
1091 Part processed in this manner only have access to a ui object,"""
1092
1092
1093 def __init__(self, ui, fp):
1093 def __init__(self, ui, fp):
1094 super(interrupthandler, self).__init__(fp)
1094 super(interrupthandler, self).__init__(fp)
1095 self.ui = ui
1095 self.ui = ui
1096
1096
1097 def _readpartheader(self):
1097 def _readpartheader(self):
1098 """reads a part header size and return the bytes blob
1098 """reads a part header size and return the bytes blob
1099
1099
1100 returns None if empty"""
1100 returns None if empty"""
1101 headersize = self._unpack(_fpartheadersize)[0]
1101 headersize = self._unpack(_fpartheadersize)[0]
1102 if headersize < 0:
1102 if headersize < 0:
1103 raise error.BundleValueError('negative part header size: %i'
1103 raise error.BundleValueError('negative part header size: %i'
1104 % headersize)
1104 % headersize)
1105 indebug(self.ui, 'part header size: %i\n' % headersize)
1105 indebug(self.ui, 'part header size: %i\n' % headersize)
1106 if headersize:
1106 if headersize:
1107 return self._readexact(headersize)
1107 return self._readexact(headersize)
1108 return None
1108 return None
1109
1109
1110 def __call__(self):
1110 def __call__(self):
1111
1111
1112 self.ui.debug('bundle2-input-stream-interrupt:'
1112 self.ui.debug('bundle2-input-stream-interrupt:'
1113 ' opening out of band context\n')
1113 ' opening out of band context\n')
1114 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1114 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1115 headerblock = self._readpartheader()
1115 headerblock = self._readpartheader()
1116 if headerblock is None:
1116 if headerblock is None:
1117 indebug(self.ui, 'no part found during interruption.')
1117 indebug(self.ui, 'no part found during interruption.')
1118 return
1118 return
1119 part = unbundlepart(self.ui, headerblock, self._fp)
1119 part = unbundlepart(self.ui, headerblock, self._fp)
1120 op = interruptoperation(self.ui)
1120 op = interruptoperation(self.ui)
1121 _processpart(op, part)
1121 _processpart(op, part)
1122 self.ui.debug('bundle2-input-stream-interrupt:'
1122 self.ui.debug('bundle2-input-stream-interrupt:'
1123 ' closing out of band context\n')
1123 ' closing out of band context\n')
1124
1124
1125 class interruptoperation(object):
1125 class interruptoperation(object):
1126 """A limited operation to be use by part handler during interruption
1126 """A limited operation to be use by part handler during interruption
1127
1127
1128 It only have access to an ui object.
1128 It only have access to an ui object.
1129 """
1129 """
1130
1130
1131 def __init__(self, ui):
1131 def __init__(self, ui):
1132 self.ui = ui
1132 self.ui = ui
1133 self.reply = None
1133 self.reply = None
1134 self.captureoutput = False
1134 self.captureoutput = False
1135
1135
1136 @property
1136 @property
1137 def repo(self):
1137 def repo(self):
1138 raise error.ProgrammingError('no repo access from stream interruption')
1138 raise error.ProgrammingError('no repo access from stream interruption')
1139
1139
1140 def gettransaction(self):
1140 def gettransaction(self):
1141 raise TransactionUnavailable('no repo access from stream interruption')
1141 raise TransactionUnavailable('no repo access from stream interruption')
1142
1142
1143 class unbundlepart(unpackermixin):
1143 class unbundlepart(unpackermixin):
1144 """a bundle part read from a bundle"""
1144 """a bundle part read from a bundle"""
1145
1145
1146 def __init__(self, ui, header, fp):
1146 def __init__(self, ui, header, fp):
1147 super(unbundlepart, self).__init__(fp)
1147 super(unbundlepart, self).__init__(fp)
1148 self._seekable = (util.safehasattr(fp, 'seek') and
1148 self._seekable = (util.safehasattr(fp, 'seek') and
1149 util.safehasattr(fp, 'tell'))
1149 util.safehasattr(fp, 'tell'))
1150 self.ui = ui
1150 self.ui = ui
1151 # unbundle state attr
1151 # unbundle state attr
1152 self._headerdata = header
1152 self._headerdata = header
1153 self._headeroffset = 0
1153 self._headeroffset = 0
1154 self._initialized = False
1154 self._initialized = False
1155 self.consumed = False
1155 self.consumed = False
1156 # part data
1156 # part data
1157 self.id = None
1157 self.id = None
1158 self.type = None
1158 self.type = None
1159 self.mandatoryparams = None
1159 self.mandatoryparams = None
1160 self.advisoryparams = None
1160 self.advisoryparams = None
1161 self.params = None
1161 self.params = None
1162 self.mandatorykeys = ()
1162 self.mandatorykeys = ()
1163 self._payloadstream = None
1163 self._payloadstream = None
1164 self._readheader()
1164 self._readheader()
1165 self._mandatory = None
1165 self._mandatory = None
1166 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1166 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1167 self._pos = 0
1167 self._pos = 0
1168
1168
1169 def _fromheader(self, size):
1169 def _fromheader(self, size):
1170 """return the next <size> byte from the header"""
1170 """return the next <size> byte from the header"""
1171 offset = self._headeroffset
1171 offset = self._headeroffset
1172 data = self._headerdata[offset:(offset + size)]
1172 data = self._headerdata[offset:(offset + size)]
1173 self._headeroffset = offset + size
1173 self._headeroffset = offset + size
1174 return data
1174 return data
1175
1175
1176 def _unpackheader(self, format):
1176 def _unpackheader(self, format):
1177 """read given format from header
1177 """read given format from header
1178
1178
1179 This automatically compute the size of the format to read."""
1179 This automatically compute the size of the format to read."""
1180 data = self._fromheader(struct.calcsize(format))
1180 data = self._fromheader(struct.calcsize(format))
1181 return _unpack(format, data)
1181 return _unpack(format, data)
1182
1182
1183 def _initparams(self, mandatoryparams, advisoryparams):
1183 def _initparams(self, mandatoryparams, advisoryparams):
1184 """internal function to setup all logic related parameters"""
1184 """internal function to setup all logic related parameters"""
1185 # make it read only to prevent people touching it by mistake.
1185 # make it read only to prevent people touching it by mistake.
1186 self.mandatoryparams = tuple(mandatoryparams)
1186 self.mandatoryparams = tuple(mandatoryparams)
1187 self.advisoryparams = tuple(advisoryparams)
1187 self.advisoryparams = tuple(advisoryparams)
1188 # user friendly UI
1188 # user friendly UI
1189 self.params = util.sortdict(self.mandatoryparams)
1189 self.params = util.sortdict(self.mandatoryparams)
1190 self.params.update(self.advisoryparams)
1190 self.params.update(self.advisoryparams)
1191 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1191 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1192
1192
1193 def _payloadchunks(self, chunknum=0):
1193 def _payloadchunks(self, chunknum=0):
1194 '''seek to specified chunk and start yielding data'''
1194 '''seek to specified chunk and start yielding data'''
1195 if len(self._chunkindex) == 0:
1195 if len(self._chunkindex) == 0:
1196 assert chunknum == 0, 'Must start with chunk 0'
1196 assert chunknum == 0, 'Must start with chunk 0'
1197 self._chunkindex.append((0, self._tellfp()))
1197 self._chunkindex.append((0, self._tellfp()))
1198 else:
1198 else:
1199 assert chunknum < len(self._chunkindex), \
1199 assert chunknum < len(self._chunkindex), \
1200 'Unknown chunk %d' % chunknum
1200 'Unknown chunk %d' % chunknum
1201 self._seekfp(self._chunkindex[chunknum][1])
1201 self._seekfp(self._chunkindex[chunknum][1])
1202
1202
1203 pos = self._chunkindex[chunknum][0]
1203 pos = self._chunkindex[chunknum][0]
1204 payloadsize = self._unpack(_fpayloadsize)[0]
1204 payloadsize = self._unpack(_fpayloadsize)[0]
1205 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1205 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1206 while payloadsize:
1206 while payloadsize:
1207 if payloadsize == flaginterrupt:
1207 if payloadsize == flaginterrupt:
1208 # interruption detection, the handler will now read a
1208 # interruption detection, the handler will now read a
1209 # single part and process it.
1209 # single part and process it.
1210 interrupthandler(self.ui, self._fp)()
1210 interrupthandler(self.ui, self._fp)()
1211 elif payloadsize < 0:
1211 elif payloadsize < 0:
1212 msg = 'negative payload chunk size: %i' % payloadsize
1212 msg = 'negative payload chunk size: %i' % payloadsize
1213 raise error.BundleValueError(msg)
1213 raise error.BundleValueError(msg)
1214 else:
1214 else:
1215 result = self._readexact(payloadsize)
1215 result = self._readexact(payloadsize)
1216 chunknum += 1
1216 chunknum += 1
1217 pos += payloadsize
1217 pos += payloadsize
1218 if chunknum == len(self._chunkindex):
1218 if chunknum == len(self._chunkindex):
1219 self._chunkindex.append((pos, self._tellfp()))
1219 self._chunkindex.append((pos, self._tellfp()))
1220 yield result
1220 yield result
1221 payloadsize = self._unpack(_fpayloadsize)[0]
1221 payloadsize = self._unpack(_fpayloadsize)[0]
1222 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1222 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1223
1223
1224 def _findchunk(self, pos):
1224 def _findchunk(self, pos):
1225 '''for a given payload position, return a chunk number and offset'''
1225 '''for a given payload position, return a chunk number and offset'''
1226 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1226 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1227 if ppos == pos:
1227 if ppos == pos:
1228 return chunk, 0
1228 return chunk, 0
1229 elif ppos > pos:
1229 elif ppos > pos:
1230 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1230 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1231 raise ValueError('Unknown chunk')
1231 raise ValueError('Unknown chunk')
1232
1232
1233 def _readheader(self):
1233 def _readheader(self):
1234 """read the header and setup the object"""
1234 """read the header and setup the object"""
1235 typesize = self._unpackheader(_fparttypesize)[0]
1235 typesize = self._unpackheader(_fparttypesize)[0]
1236 self.type = self._fromheader(typesize)
1236 self.type = self._fromheader(typesize)
1237 indebug(self.ui, 'part type: "%s"' % self.type)
1237 indebug(self.ui, 'part type: "%s"' % self.type)
1238 self.id = self._unpackheader(_fpartid)[0]
1238 self.id = self._unpackheader(_fpartid)[0]
1239 indebug(self.ui, 'part id: "%s"' % self.id)
1239 indebug(self.ui, 'part id: "%s"' % self.id)
1240 # extract mandatory bit from type
1240 # extract mandatory bit from type
1241 self.mandatory = (self.type != self.type.lower())
1241 self.mandatory = (self.type != self.type.lower())
1242 self.type = self.type.lower()
1242 self.type = self.type.lower()
1243 ## reading parameters
1243 ## reading parameters
1244 # param count
1244 # param count
1245 mancount, advcount = self._unpackheader(_fpartparamcount)
1245 mancount, advcount = self._unpackheader(_fpartparamcount)
1246 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1246 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1247 # param size
1247 # param size
1248 fparamsizes = _makefpartparamsizes(mancount + advcount)
1248 fparamsizes = _makefpartparamsizes(mancount + advcount)
1249 paramsizes = self._unpackheader(fparamsizes)
1249 paramsizes = self._unpackheader(fparamsizes)
1250 # make it a list of couple again
1250 # make it a list of couple again
1251 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1251 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1252 # split mandatory from advisory
1252 # split mandatory from advisory
1253 mansizes = paramsizes[:mancount]
1253 mansizes = paramsizes[:mancount]
1254 advsizes = paramsizes[mancount:]
1254 advsizes = paramsizes[mancount:]
1255 # retrieve param value
1255 # retrieve param value
1256 manparams = []
1256 manparams = []
1257 for key, value in mansizes:
1257 for key, value in mansizes:
1258 manparams.append((self._fromheader(key), self._fromheader(value)))
1258 manparams.append((self._fromheader(key), self._fromheader(value)))
1259 advparams = []
1259 advparams = []
1260 for key, value in advsizes:
1260 for key, value in advsizes:
1261 advparams.append((self._fromheader(key), self._fromheader(value)))
1261 advparams.append((self._fromheader(key), self._fromheader(value)))
1262 self._initparams(manparams, advparams)
1262 self._initparams(manparams, advparams)
1263 ## part payload
1263 ## part payload
1264 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1264 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1265 # we read the data, tell it
1265 # we read the data, tell it
1266 self._initialized = True
1266 self._initialized = True
1267
1267
1268 def read(self, size=None):
1268 def read(self, size=None):
1269 """read payload data"""
1269 """read payload data"""
1270 if not self._initialized:
1270 if not self._initialized:
1271 self._readheader()
1271 self._readheader()
1272 if size is None:
1272 if size is None:
1273 data = self._payloadstream.read()
1273 data = self._payloadstream.read()
1274 else:
1274 else:
1275 data = self._payloadstream.read(size)
1275 data = self._payloadstream.read(size)
1276 self._pos += len(data)
1276 self._pos += len(data)
1277 if size is None or len(data) < size:
1277 if size is None or len(data) < size:
1278 if not self.consumed and self._pos:
1278 if not self.consumed and self._pos:
1279 self.ui.debug('bundle2-input-part: total payload size %i\n'
1279 self.ui.debug('bundle2-input-part: total payload size %i\n'
1280 % self._pos)
1280 % self._pos)
1281 self.consumed = True
1281 self.consumed = True
1282 return data
1282 return data
1283
1283
1284 def tell(self):
1284 def tell(self):
1285 return self._pos
1285 return self._pos
1286
1286
1287 def seek(self, offset, whence=0):
1287 def seek(self, offset, whence=0):
1288 if whence == 0:
1288 if whence == 0:
1289 newpos = offset
1289 newpos = offset
1290 elif whence == 1:
1290 elif whence == 1:
1291 newpos = self._pos + offset
1291 newpos = self._pos + offset
1292 elif whence == 2:
1292 elif whence == 2:
1293 if not self.consumed:
1293 if not self.consumed:
1294 self.read()
1294 self.read()
1295 newpos = self._chunkindex[-1][0] - offset
1295 newpos = self._chunkindex[-1][0] - offset
1296 else:
1296 else:
1297 raise ValueError('Unknown whence value: %r' % (whence,))
1297 raise ValueError('Unknown whence value: %r' % (whence,))
1298
1298
1299 if newpos > self._chunkindex[-1][0] and not self.consumed:
1299 if newpos > self._chunkindex[-1][0] and not self.consumed:
1300 self.read()
1300 self.read()
1301 if not 0 <= newpos <= self._chunkindex[-1][0]:
1301 if not 0 <= newpos <= self._chunkindex[-1][0]:
1302 raise ValueError('Offset out of range')
1302 raise ValueError('Offset out of range')
1303
1303
1304 if self._pos != newpos:
1304 if self._pos != newpos:
1305 chunk, internaloffset = self._findchunk(newpos)
1305 chunk, internaloffset = self._findchunk(newpos)
1306 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1306 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1307 adjust = self.read(internaloffset)
1307 adjust = self.read(internaloffset)
1308 if len(adjust) != internaloffset:
1308 if len(adjust) != internaloffset:
1309 raise error.Abort(_('Seek failed\n'))
1309 raise error.Abort(_('Seek failed\n'))
1310 self._pos = newpos
1310 self._pos = newpos
1311
1311
1312 def _seekfp(self, offset, whence=0):
1312 def _seekfp(self, offset, whence=0):
1313 """move the underlying file pointer
1313 """move the underlying file pointer
1314
1314
1315 This method is meant for internal usage by the bundle2 protocol only.
1315 This method is meant for internal usage by the bundle2 protocol only.
1316 They directly manipulate the low level stream including bundle2 level
1316 They directly manipulate the low level stream including bundle2 level
1317 instruction.
1317 instruction.
1318
1318
1319 Do not use it to implement higher-level logic or methods."""
1319 Do not use it to implement higher-level logic or methods."""
1320 if self._seekable:
1320 if self._seekable:
1321 return self._fp.seek(offset, whence)
1321 return self._fp.seek(offset, whence)
1322 else:
1322 else:
1323 raise NotImplementedError(_('File pointer is not seekable'))
1323 raise NotImplementedError(_('File pointer is not seekable'))
1324
1324
1325 def _tellfp(self):
1325 def _tellfp(self):
1326 """return the file offset, or None if file is not seekable
1326 """return the file offset, or None if file is not seekable
1327
1327
1328 This method is meant for internal usage by the bundle2 protocol only.
1328 This method is meant for internal usage by the bundle2 protocol only.
1329 They directly manipulate the low level stream including bundle2 level
1329 They directly manipulate the low level stream including bundle2 level
1330 instruction.
1330 instruction.
1331
1331
1332 Do not use it to implement higher-level logic or methods."""
1332 Do not use it to implement higher-level logic or methods."""
1333 if self._seekable:
1333 if self._seekable:
1334 try:
1334 try:
1335 return self._fp.tell()
1335 return self._fp.tell()
1336 except IOError as e:
1336 except IOError as e:
1337 if e.errno == errno.ESPIPE:
1337 if e.errno == errno.ESPIPE:
1338 self._seekable = False
1338 self._seekable = False
1339 else:
1339 else:
1340 raise
1340 raise
1341 return None
1341 return None
1342
1342
1343 # These are only the static capabilities.
1343 # These are only the static capabilities.
1344 # Check the 'getrepocaps' function for the rest.
1344 # Check the 'getrepocaps' function for the rest.
1345 capabilities = {'HG20': (),
1345 capabilities = {'HG20': (),
1346 'error': ('abort', 'unsupportedcontent', 'pushraced',
1346 'error': ('abort', 'unsupportedcontent', 'pushraced',
1347 'pushkey'),
1347 'pushkey'),
1348 'listkeys': (),
1348 'listkeys': (),
1349 'pushkey': (),
1349 'pushkey': (),
1350 'digests': tuple(sorted(util.DIGESTS.keys())),
1350 'digests': tuple(sorted(util.DIGESTS.keys())),
1351 'remote-changegroup': ('http', 'https'),
1351 'remote-changegroup': ('http', 'https'),
1352 'hgtagsfnodes': (),
1352 'hgtagsfnodes': (),
1353 }
1353 }
1354
1354
1355 def getrepocaps(repo, allowpushback=False):
1355 def getrepocaps(repo, allowpushback=False):
1356 """return the bundle2 capabilities for a given repo
1356 """return the bundle2 capabilities for a given repo
1357
1357
1358 Exists to allow extensions (like evolution) to mutate the capabilities.
1358 Exists to allow extensions (like evolution) to mutate the capabilities.
1359 """
1359 """
1360 caps = capabilities.copy()
1360 caps = capabilities.copy()
1361 caps['changegroup'] = tuple(sorted(
1361 caps['changegroup'] = tuple(sorted(
1362 changegroup.supportedincomingversions(repo)))
1362 changegroup.supportedincomingversions(repo)))
1363 if obsolete.isenabled(repo, obsolete.exchangeopt):
1363 if obsolete.isenabled(repo, obsolete.exchangeopt):
1364 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1364 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1365 caps['obsmarkers'] = supportedformat
1365 caps['obsmarkers'] = supportedformat
1366 if allowpushback:
1366 if allowpushback:
1367 caps['pushback'] = ()
1367 caps['pushback'] = ()
1368 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1368 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1369 if cpmode == 'check-related':
1369 if cpmode == 'check-related':
1370 caps['checkheads'] = ('related',)
1370 caps['checkheads'] = ('related',)
1371 return caps
1371 return caps
1372
1372
1373 def bundle2caps(remote):
1373 def bundle2caps(remote):
1374 """return the bundle capabilities of a peer as dict"""
1374 """return the bundle capabilities of a peer as dict"""
1375 raw = remote.capable('bundle2')
1375 raw = remote.capable('bundle2')
1376 if not raw and raw != '':
1376 if not raw and raw != '':
1377 return {}
1377 return {}
1378 capsblob = urlreq.unquote(remote.capable('bundle2'))
1378 capsblob = urlreq.unquote(remote.capable('bundle2'))
1379 return decodecaps(capsblob)
1379 return decodecaps(capsblob)
1380
1380
1381 def obsmarkersversion(caps):
1381 def obsmarkersversion(caps):
1382 """extract the list of supported obsmarkers versions from a bundle2caps dict
1382 """extract the list of supported obsmarkers versions from a bundle2caps dict
1383 """
1383 """
1384 obscaps = caps.get('obsmarkers', ())
1384 obscaps = caps.get('obsmarkers', ())
1385 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1385 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1386
1386
1387 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1387 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1388 vfs=None, compression=None, compopts=None):
1388 vfs=None, compression=None, compopts=None):
1389 if bundletype.startswith('HG10'):
1389 if bundletype.startswith('HG10'):
1390 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1390 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1391 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1391 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1392 compression=compression, compopts=compopts)
1392 compression=compression, compopts=compopts)
1393 elif not bundletype.startswith('HG20'):
1393 elif not bundletype.startswith('HG20'):
1394 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1394 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1395
1395
1396 caps = {}
1396 caps = {}
1397 if 'obsolescence' in opts:
1397 if 'obsolescence' in opts:
1398 caps['obsmarkers'] = ('V1',)
1398 caps['obsmarkers'] = ('V1',)
1399 bundle = bundle20(ui, caps)
1399 bundle = bundle20(ui, caps)
1400 bundle.setcompression(compression, compopts)
1400 bundle.setcompression(compression, compopts)
1401 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1401 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1402 chunkiter = bundle.getchunks()
1402 chunkiter = bundle.getchunks()
1403
1403
1404 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1404 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1405
1405
1406 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1406 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1407 # We should eventually reconcile this logic with the one behind
1407 # We should eventually reconcile this logic with the one behind
1408 # 'exchange.getbundle2partsgenerator'.
1408 # 'exchange.getbundle2partsgenerator'.
1409 #
1409 #
1410 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1410 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1411 # different right now. So we keep them separated for now for the sake of
1411 # different right now. So we keep them separated for now for the sake of
1412 # simplicity.
1412 # simplicity.
1413
1413
1414 # we always want a changegroup in such bundle
1414 # we always want a changegroup in such bundle
1415 cgversion = opts.get('cg.version')
1415 cgversion = opts.get('cg.version')
1416 if cgversion is None:
1416 if cgversion is None:
1417 cgversion = changegroup.safeversion(repo)
1417 cgversion = changegroup.safeversion(repo)
1418 cg = changegroup.getchangegroup(repo, source, outgoing,
1418 cg = changegroup.getchangegroup(repo, source, outgoing,
1419 version=cgversion)
1419 version=cgversion)
1420 part = bundler.newpart('changegroup', data=cg.getchunks())
1420 part = bundler.newpart('changegroup', data=cg.getchunks())
1421 part.addparam('version', cg.version)
1421 part.addparam('version', cg.version)
1422 if 'clcount' in cg.extras:
1422 if 'clcount' in cg.extras:
1423 part.addparam('nbchanges', str(cg.extras['clcount']),
1423 part.addparam('nbchanges', str(cg.extras['clcount']),
1424 mandatory=False)
1424 mandatory=False)
1425 if opts.get('phases') and repo.revs('%ln and secret()',
1425 if opts.get('phases') and repo.revs('%ln and secret()',
1426 outgoing.missingheads):
1426 outgoing.missingheads):
1427 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1427 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1428
1428
1429 addparttagsfnodescache(repo, bundler, outgoing)
1429 addparttagsfnodescache(repo, bundler, outgoing)
1430
1430
1431 if opts.get('obsolescence', False):
1431 if opts.get('obsolescence', False):
1432 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1432 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1433 buildobsmarkerspart(bundler, obsmarkers)
1433 buildobsmarkerspart(bundler, obsmarkers)
1434
1434
1435 if opts.get('phases', False):
1435 if opts.get('phases', False):
1436 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1436 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1437 phasedata = []
1437 phasedata = []
1438 for phase in phases.allphases:
1438 for phase in phases.allphases:
1439 for head in headsbyphase[phase]:
1439 for head in headsbyphase[phase]:
1440 phasedata.append(_pack(_fphasesentry, phase, head))
1440 phasedata.append(_pack(_fphasesentry, phase, head))
1441 bundler.newpart('phase-heads', data=''.join(phasedata))
1441 bundler.newpart('phase-heads', data=''.join(phasedata))
1442
1442
1443 def addparttagsfnodescache(repo, bundler, outgoing):
1443 def addparttagsfnodescache(repo, bundler, outgoing):
1444 # we include the tags fnode cache for the bundle changeset
1444 # we include the tags fnode cache for the bundle changeset
1445 # (as an optional parts)
1445 # (as an optional parts)
1446 cache = tags.hgtagsfnodescache(repo.unfiltered())
1446 cache = tags.hgtagsfnodescache(repo.unfiltered())
1447 chunks = []
1447 chunks = []
1448
1448
1449 # .hgtags fnodes are only relevant for head changesets. While we could
1449 # .hgtags fnodes are only relevant for head changesets. While we could
1450 # transfer values for all known nodes, there will likely be little to
1450 # transfer values for all known nodes, there will likely be little to
1451 # no benefit.
1451 # no benefit.
1452 #
1452 #
1453 # We don't bother using a generator to produce output data because
1453 # We don't bother using a generator to produce output data because
1454 # a) we only have 40 bytes per head and even esoteric numbers of heads
1454 # a) we only have 40 bytes per head and even esoteric numbers of heads
1455 # consume little memory (1M heads is 40MB) b) we don't want to send the
1455 # consume little memory (1M heads is 40MB) b) we don't want to send the
1456 # part if we don't have entries and knowing if we have entries requires
1456 # part if we don't have entries and knowing if we have entries requires
1457 # cache lookups.
1457 # cache lookups.
1458 for node in outgoing.missingheads:
1458 for node in outgoing.missingheads:
1459 # Don't compute missing, as this may slow down serving.
1459 # Don't compute missing, as this may slow down serving.
1460 fnode = cache.getfnode(node, computemissing=False)
1460 fnode = cache.getfnode(node, computemissing=False)
1461 if fnode is not None:
1461 if fnode is not None:
1462 chunks.extend([node, fnode])
1462 chunks.extend([node, fnode])
1463
1463
1464 if chunks:
1464 if chunks:
1465 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1465 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1466
1466
1467 def buildobsmarkerspart(bundler, markers):
1467 def buildobsmarkerspart(bundler, markers):
1468 """add an obsmarker part to the bundler with <markers>
1468 """add an obsmarker part to the bundler with <markers>
1469
1469
1470 No part is created if markers is empty.
1470 No part is created if markers is empty.
1471 Raises ValueError if the bundler doesn't support any known obsmarker format.
1471 Raises ValueError if the bundler doesn't support any known obsmarker format.
1472 """
1472 """
1473 if not markers:
1473 if not markers:
1474 return None
1474 return None
1475
1475
1476 remoteversions = obsmarkersversion(bundler.capabilities)
1476 remoteversions = obsmarkersversion(bundler.capabilities)
1477 version = obsolete.commonversion(remoteversions)
1477 version = obsolete.commonversion(remoteversions)
1478 if version is None:
1478 if version is None:
1479 raise ValueError('bundler does not support common obsmarker format')
1479 raise ValueError('bundler does not support common obsmarker format')
1480 stream = obsolete.encodemarkers(markers, True, version=version)
1480 stream = obsolete.encodemarkers(markers, True, version=version)
1481 return bundler.newpart('obsmarkers', data=stream)
1481 return bundler.newpart('obsmarkers', data=stream)
1482
1482
1483 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1483 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1484 compopts=None):
1484 compopts=None):
1485 """Write a bundle file and return its filename.
1485 """Write a bundle file and return its filename.
1486
1486
1487 Existing files will not be overwritten.
1487 Existing files will not be overwritten.
1488 If no filename is specified, a temporary file is created.
1488 If no filename is specified, a temporary file is created.
1489 bz2 compression can be turned off.
1489 bz2 compression can be turned off.
1490 The bundle file will be deleted in case of errors.
1490 The bundle file will be deleted in case of errors.
1491 """
1491 """
1492
1492
1493 if bundletype == "HG20":
1493 if bundletype == "HG20":
1494 bundle = bundle20(ui)
1494 bundle = bundle20(ui)
1495 bundle.setcompression(compression, compopts)
1495 bundle.setcompression(compression, compopts)
1496 part = bundle.newpart('changegroup', data=cg.getchunks())
1496 part = bundle.newpart('changegroup', data=cg.getchunks())
1497 part.addparam('version', cg.version)
1497 part.addparam('version', cg.version)
1498 if 'clcount' in cg.extras:
1498 if 'clcount' in cg.extras:
1499 part.addparam('nbchanges', str(cg.extras['clcount']),
1499 part.addparam('nbchanges', str(cg.extras['clcount']),
1500 mandatory=False)
1500 mandatory=False)
1501 chunkiter = bundle.getchunks()
1501 chunkiter = bundle.getchunks()
1502 else:
1502 else:
1503 # compression argument is only for the bundle2 case
1503 # compression argument is only for the bundle2 case
1504 assert compression is None
1504 assert compression is None
1505 if cg.version != '01':
1505 if cg.version != '01':
1506 raise error.Abort(_('old bundle types only supports v1 '
1506 raise error.Abort(_('old bundle types only supports v1 '
1507 'changegroups'))
1507 'changegroups'))
1508 header, comp = bundletypes[bundletype]
1508 header, comp = bundletypes[bundletype]
1509 if comp not in util.compengines.supportedbundletypes:
1509 if comp not in util.compengines.supportedbundletypes:
1510 raise error.Abort(_('unknown stream compression type: %s')
1510 raise error.Abort(_('unknown stream compression type: %s')
1511 % comp)
1511 % comp)
1512 compengine = util.compengines.forbundletype(comp)
1512 compengine = util.compengines.forbundletype(comp)
1513 def chunkiter():
1513 def chunkiter():
1514 yield header
1514 yield header
1515 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1515 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1516 yield chunk
1516 yield chunk
1517 chunkiter = chunkiter()
1517 chunkiter = chunkiter()
1518
1518
1519 # parse the changegroup data, otherwise we will block
1519 # parse the changegroup data, otherwise we will block
1520 # in case of sshrepo because we don't know the end of the stream
1520 # in case of sshrepo because we don't know the end of the stream
1521 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1521 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1522
1522
1523 def combinechangegroupresults(op):
1523 def combinechangegroupresults(op):
1524 """logic to combine 0 or more addchangegroup results into one"""
1524 """logic to combine 0 or more addchangegroup results into one"""
1525 results = [r.get('return', 0)
1525 results = [r.get('return', 0)
1526 for r in op.records['changegroup']]
1526 for r in op.records['changegroup']]
1527 changedheads = 0
1527 changedheads = 0
1528 result = 1
1528 result = 1
1529 for ret in results:
1529 for ret in results:
1530 # If any changegroup result is 0, return 0
1530 # If any changegroup result is 0, return 0
1531 if ret == 0:
1531 if ret == 0:
1532 result = 0
1532 result = 0
1533 break
1533 break
1534 if ret < -1:
1534 if ret < -1:
1535 changedheads += ret + 1
1535 changedheads += ret + 1
1536 elif ret > 1:
1536 elif ret > 1:
1537 changedheads += ret - 1
1537 changedheads += ret - 1
1538 if changedheads > 0:
1538 if changedheads > 0:
1539 result = 1 + changedheads
1539 result = 1 + changedheads
1540 elif changedheads < 0:
1540 elif changedheads < 0:
1541 result = -1 + changedheads
1541 result = -1 + changedheads
1542 return result
1542 return result
1543
1543
1544 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1544 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1545 'targetphase'))
1545 'targetphase'))
1546 def handlechangegroup(op, inpart):
1546 def handlechangegroup(op, inpart):
1547 """apply a changegroup part on the repo
1547 """apply a changegroup part on the repo
1548
1548
1549 This is a very early implementation that will massive rework before being
1549 This is a very early implementation that will massive rework before being
1550 inflicted to any end-user.
1550 inflicted to any end-user.
1551 """
1551 """
1552 tr = op.gettransaction()
1552 tr = op.gettransaction()
1553 unpackerversion = inpart.params.get('version', '01')
1553 unpackerversion = inpart.params.get('version', '01')
1554 # We should raise an appropriate exception here
1554 # We should raise an appropriate exception here
1555 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1555 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1556 # the source and url passed here are overwritten by the one contained in
1556 # the source and url passed here are overwritten by the one contained in
1557 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1557 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1558 nbchangesets = None
1558 nbchangesets = None
1559 if 'nbchanges' in inpart.params:
1559 if 'nbchanges' in inpart.params:
1560 nbchangesets = int(inpart.params.get('nbchanges'))
1560 nbchangesets = int(inpart.params.get('nbchanges'))
1561 if ('treemanifest' in inpart.params and
1561 if ('treemanifest' in inpart.params and
1562 'treemanifest' not in op.repo.requirements):
1562 'treemanifest' not in op.repo.requirements):
1563 if len(op.repo.changelog) != 0:
1563 if len(op.repo.changelog) != 0:
1564 raise error.Abort(_(
1564 raise error.Abort(_(
1565 "bundle contains tree manifests, but local repo is "
1565 "bundle contains tree manifests, but local repo is "
1566 "non-empty and does not use tree manifests"))
1566 "non-empty and does not use tree manifests"))
1567 op.repo.requirements.add('treemanifest')
1567 op.repo.requirements.add('treemanifest')
1568 op.repo._applyopenerreqs()
1568 op.repo._applyopenerreqs()
1569 op.repo._writerequirements()
1569 op.repo._writerequirements()
1570 extrakwargs = {}
1570 extrakwargs = {}
1571 targetphase = inpart.params.get('targetphase')
1571 targetphase = inpart.params.get('targetphase')
1572 if targetphase is not None:
1572 if targetphase is not None:
1573 extrakwargs['targetphase'] = int(targetphase)
1573 extrakwargs['targetphase'] = int(targetphase)
1574 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1574 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1575 expectedtotal=nbchangesets, **extrakwargs)
1575 expectedtotal=nbchangesets, **extrakwargs)
1576 if op.reply is not None:
1576 if op.reply is not None:
1577 # This is definitely not the final form of this
1577 # This is definitely not the final form of this
1578 # return. But one need to start somewhere.
1578 # return. But one need to start somewhere.
1579 part = op.reply.newpart('reply:changegroup', mandatory=False)
1579 part = op.reply.newpart('reply:changegroup', mandatory=False)
1580 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1580 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1581 part.addparam('return', '%i' % ret, mandatory=False)
1581 part.addparam('return', '%i' % ret, mandatory=False)
1582 assert not inpart.read()
1582 assert not inpart.read()
1583
1583
1584 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1584 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1585 ['digest:%s' % k for k in util.DIGESTS.keys()])
1585 ['digest:%s' % k for k in util.DIGESTS.keys()])
1586 @parthandler('remote-changegroup', _remotechangegroupparams)
1586 @parthandler('remote-changegroup', _remotechangegroupparams)
1587 def handleremotechangegroup(op, inpart):
1587 def handleremotechangegroup(op, inpart):
1588 """apply a bundle10 on the repo, given an url and validation information
1588 """apply a bundle10 on the repo, given an url and validation information
1589
1589
1590 All the information about the remote bundle to import are given as
1590 All the information about the remote bundle to import are given as
1591 parameters. The parameters include:
1591 parameters. The parameters include:
1592 - url: the url to the bundle10.
1592 - url: the url to the bundle10.
1593 - size: the bundle10 file size. It is used to validate what was
1593 - size: the bundle10 file size. It is used to validate what was
1594 retrieved by the client matches the server knowledge about the bundle.
1594 retrieved by the client matches the server knowledge about the bundle.
1595 - digests: a space separated list of the digest types provided as
1595 - digests: a space separated list of the digest types provided as
1596 parameters.
1596 parameters.
1597 - digest:<digest-type>: the hexadecimal representation of the digest with
1597 - digest:<digest-type>: the hexadecimal representation of the digest with
1598 that name. Like the size, it is used to validate what was retrieved by
1598 that name. Like the size, it is used to validate what was retrieved by
1599 the client matches what the server knows about the bundle.
1599 the client matches what the server knows about the bundle.
1600
1600
1601 When multiple digest types are given, all of them are checked.
1601 When multiple digest types are given, all of them are checked.
1602 """
1602 """
1603 try:
1603 try:
1604 raw_url = inpart.params['url']
1604 raw_url = inpart.params['url']
1605 except KeyError:
1605 except KeyError:
1606 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1606 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1607 parsed_url = util.url(raw_url)
1607 parsed_url = util.url(raw_url)
1608 if parsed_url.scheme not in capabilities['remote-changegroup']:
1608 if parsed_url.scheme not in capabilities['remote-changegroup']:
1609 raise error.Abort(_('remote-changegroup does not support %s urls') %
1609 raise error.Abort(_('remote-changegroup does not support %s urls') %
1610 parsed_url.scheme)
1610 parsed_url.scheme)
1611
1611
1612 try:
1612 try:
1613 size = int(inpart.params['size'])
1613 size = int(inpart.params['size'])
1614 except ValueError:
1614 except ValueError:
1615 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1615 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1616 % 'size')
1616 % 'size')
1617 except KeyError:
1617 except KeyError:
1618 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1618 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1619
1619
1620 digests = {}
1620 digests = {}
1621 for typ in inpart.params.get('digests', '').split():
1621 for typ in inpart.params.get('digests', '').split():
1622 param = 'digest:%s' % typ
1622 param = 'digest:%s' % typ
1623 try:
1623 try:
1624 value = inpart.params[param]
1624 value = inpart.params[param]
1625 except KeyError:
1625 except KeyError:
1626 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1626 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1627 param)
1627 param)
1628 digests[typ] = value
1628 digests[typ] = value
1629
1629
1630 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1630 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1631
1631
1632 tr = op.gettransaction()
1632 tr = op.gettransaction()
1633 from . import exchange
1633 from . import exchange
1634 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1634 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1635 if not isinstance(cg, changegroup.cg1unpacker):
1635 if not isinstance(cg, changegroup.cg1unpacker):
1636 raise error.Abort(_('%s: not a bundle version 1.0') %
1636 raise error.Abort(_('%s: not a bundle version 1.0') %
1637 util.hidepassword(raw_url))
1637 util.hidepassword(raw_url))
1638 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1638 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1639 if op.reply is not None:
1639 if op.reply is not None:
1640 # This is definitely not the final form of this
1640 # This is definitely not the final form of this
1641 # return. But one need to start somewhere.
1641 # return. But one need to start somewhere.
1642 part = op.reply.newpart('reply:changegroup')
1642 part = op.reply.newpart('reply:changegroup')
1643 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1643 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1644 part.addparam('return', '%i' % ret, mandatory=False)
1644 part.addparam('return', '%i' % ret, mandatory=False)
1645 try:
1645 try:
1646 real_part.validate()
1646 real_part.validate()
1647 except error.Abort as e:
1647 except error.Abort as e:
1648 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1648 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1649 (util.hidepassword(raw_url), str(e)))
1649 (util.hidepassword(raw_url), str(e)))
1650 assert not inpart.read()
1650 assert not inpart.read()
1651
1651
1652 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1652 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1653 def handlereplychangegroup(op, inpart):
1653 def handlereplychangegroup(op, inpart):
1654 ret = int(inpart.params['return'])
1654 ret = int(inpart.params['return'])
1655 replyto = int(inpart.params['in-reply-to'])
1655 replyto = int(inpart.params['in-reply-to'])
1656 op.records.add('changegroup', {'return': ret}, replyto)
1656 op.records.add('changegroup', {'return': ret}, replyto)
1657
1657
1658 @parthandler('check:heads')
1658 @parthandler('check:heads')
1659 def handlecheckheads(op, inpart):
1659 def handlecheckheads(op, inpart):
1660 """check that head of the repo did not change
1660 """check that head of the repo did not change
1661
1661
1662 This is used to detect a push race when using unbundle.
1662 This is used to detect a push race when using unbundle.
1663 This replaces the "heads" argument of unbundle."""
1663 This replaces the "heads" argument of unbundle."""
1664 h = inpart.read(20)
1664 h = inpart.read(20)
1665 heads = []
1665 heads = []
1666 while len(h) == 20:
1666 while len(h) == 20:
1667 heads.append(h)
1667 heads.append(h)
1668 h = inpart.read(20)
1668 h = inpart.read(20)
1669 assert not h
1669 assert not h
1670 # Trigger a transaction so that we are guaranteed to have the lock now.
1670 # Trigger a transaction so that we are guaranteed to have the lock now.
1671 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1671 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1672 op.gettransaction()
1672 op.gettransaction()
1673 if sorted(heads) != sorted(op.repo.heads()):
1673 if sorted(heads) != sorted(op.repo.heads()):
1674 raise error.PushRaced('repository changed while pushing - '
1674 raise error.PushRaced('repository changed while pushing - '
1675 'please try again')
1675 'please try again')
1676
1676
1677 @parthandler('check:updated-heads')
1677 @parthandler('check:updated-heads')
1678 def handlecheckupdatedheads(op, inpart):
1678 def handlecheckupdatedheads(op, inpart):
1679 """check for race on the heads touched by a push
1679 """check for race on the heads touched by a push
1680
1680
1681 This is similar to 'check:heads' but focus on the heads actually updated
1681 This is similar to 'check:heads' but focus on the heads actually updated
1682 during the push. If other activities happen on unrelated heads, it is
1682 during the push. If other activities happen on unrelated heads, it is
1683 ignored.
1683 ignored.
1684
1684
1685 This allow server with high traffic to avoid push contention as long as
1685 This allow server with high traffic to avoid push contention as long as
1686 unrelated parts of the graph are involved."""
1686 unrelated parts of the graph are involved."""
1687 h = inpart.read(20)
1687 h = inpart.read(20)
1688 heads = []
1688 heads = []
1689 while len(h) == 20:
1689 while len(h) == 20:
1690 heads.append(h)
1690 heads.append(h)
1691 h = inpart.read(20)
1691 h = inpart.read(20)
1692 assert not h
1692 assert not h
1693 # trigger a transaction so that we are guaranteed to have the lock now.
1693 # trigger a transaction so that we are guaranteed to have the lock now.
1694 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1694 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1695 op.gettransaction()
1695 op.gettransaction()
1696
1696
1697 currentheads = set()
1697 currentheads = set()
1698 for ls in op.repo.branchmap().itervalues():
1698 for ls in op.repo.branchmap().itervalues():
1699 currentheads.update(ls)
1699 currentheads.update(ls)
1700
1700
1701 for h in heads:
1701 for h in heads:
1702 if h not in currentheads:
1702 if h not in currentheads:
1703 raise error.PushRaced('repository changed while pushing - '
1703 raise error.PushRaced('repository changed while pushing - '
1704 'please try again')
1704 'please try again')
1705
1705
1706 @parthandler('output')
1706 @parthandler('output')
1707 def handleoutput(op, inpart):
1707 def handleoutput(op, inpart):
1708 """forward output captured on the server to the client"""
1708 """forward output captured on the server to the client"""
1709 for line in inpart.read().splitlines():
1709 for line in inpart.read().splitlines():
1710 op.ui.status(_('remote: %s\n') % line)
1710 op.ui.status(_('remote: %s\n') % line)
1711
1711
1712 @parthandler('replycaps')
1712 @parthandler('replycaps')
1713 def handlereplycaps(op, inpart):
1713 def handlereplycaps(op, inpart):
1714 """Notify that a reply bundle should be created
1714 """Notify that a reply bundle should be created
1715
1715
1716 The payload contains the capabilities information for the reply"""
1716 The payload contains the capabilities information for the reply"""
1717 caps = decodecaps(inpart.read())
1717 caps = decodecaps(inpart.read())
1718 if op.reply is None:
1718 if op.reply is None:
1719 op.reply = bundle20(op.ui, caps)
1719 op.reply = bundle20(op.ui, caps)
1720
1720
1721 class AbortFromPart(error.Abort):
1721 class AbortFromPart(error.Abort):
1722 """Sub-class of Abort that denotes an error from a bundle2 part."""
1722 """Sub-class of Abort that denotes an error from a bundle2 part."""
1723
1723
1724 @parthandler('error:abort', ('message', 'hint'))
1724 @parthandler('error:abort', ('message', 'hint'))
1725 def handleerrorabort(op, inpart):
1725 def handleerrorabort(op, inpart):
1726 """Used to transmit abort error over the wire"""
1726 """Used to transmit abort error over the wire"""
1727 raise AbortFromPart(inpart.params['message'],
1727 raise AbortFromPart(inpart.params['message'],
1728 hint=inpart.params.get('hint'))
1728 hint=inpart.params.get('hint'))
1729
1729
1730 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1730 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1731 'in-reply-to'))
1731 'in-reply-to'))
1732 def handleerrorpushkey(op, inpart):
1732 def handleerrorpushkey(op, inpart):
1733 """Used to transmit failure of a mandatory pushkey over the wire"""
1733 """Used to transmit failure of a mandatory pushkey over the wire"""
1734 kwargs = {}
1734 kwargs = {}
1735 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1735 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1736 value = inpart.params.get(name)
1736 value = inpart.params.get(name)
1737 if value is not None:
1737 if value is not None:
1738 kwargs[name] = value
1738 kwargs[name] = value
1739 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1739 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1740
1740
1741 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1741 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1742 def handleerrorunsupportedcontent(op, inpart):
1742 def handleerrorunsupportedcontent(op, inpart):
1743 """Used to transmit unknown content error over the wire"""
1743 """Used to transmit unknown content error over the wire"""
1744 kwargs = {}
1744 kwargs = {}
1745 parttype = inpart.params.get('parttype')
1745 parttype = inpart.params.get('parttype')
1746 if parttype is not None:
1746 if parttype is not None:
1747 kwargs['parttype'] = parttype
1747 kwargs['parttype'] = parttype
1748 params = inpart.params.get('params')
1748 params = inpart.params.get('params')
1749 if params is not None:
1749 if params is not None:
1750 kwargs['params'] = params.split('\0')
1750 kwargs['params'] = params.split('\0')
1751
1751
1752 raise error.BundleUnknownFeatureError(**kwargs)
1752 raise error.BundleUnknownFeatureError(**kwargs)
1753
1753
1754 @parthandler('error:pushraced', ('message',))
1754 @parthandler('error:pushraced', ('message',))
1755 def handleerrorpushraced(op, inpart):
1755 def handleerrorpushraced(op, inpart):
1756 """Used to transmit push race error over the wire"""
1756 """Used to transmit push race error over the wire"""
1757 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1757 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1758
1758
1759 @parthandler('listkeys', ('namespace',))
1759 @parthandler('listkeys', ('namespace',))
1760 def handlelistkeys(op, inpart):
1760 def handlelistkeys(op, inpart):
1761 """retrieve pushkey namespace content stored in a bundle2"""
1761 """retrieve pushkey namespace content stored in a bundle2"""
1762 namespace = inpart.params['namespace']
1762 namespace = inpart.params['namespace']
1763 r = pushkey.decodekeys(inpart.read())
1763 r = pushkey.decodekeys(inpart.read())
1764 op.records.add('listkeys', (namespace, r))
1764 op.records.add('listkeys', (namespace, r))
1765
1765
1766 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1766 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1767 def handlepushkey(op, inpart):
1767 def handlepushkey(op, inpart):
1768 """process a pushkey request"""
1768 """process a pushkey request"""
1769 dec = pushkey.decode
1769 dec = pushkey.decode
1770 namespace = dec(inpart.params['namespace'])
1770 namespace = dec(inpart.params['namespace'])
1771 key = dec(inpart.params['key'])
1771 key = dec(inpart.params['key'])
1772 old = dec(inpart.params['old'])
1772 old = dec(inpart.params['old'])
1773 new = dec(inpart.params['new'])
1773 new = dec(inpart.params['new'])
1774 # Grab the transaction to ensure that we have the lock before performing the
1774 # Grab the transaction to ensure that we have the lock before performing the
1775 # pushkey.
1775 # pushkey.
1776 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1776 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1777 op.gettransaction()
1777 op.gettransaction()
1778 ret = op.repo.pushkey(namespace, key, old, new)
1778 ret = op.repo.pushkey(namespace, key, old, new)
1779 record = {'namespace': namespace,
1779 record = {'namespace': namespace,
1780 'key': key,
1780 'key': key,
1781 'old': old,
1781 'old': old,
1782 'new': new}
1782 'new': new}
1783 op.records.add('pushkey', record)
1783 op.records.add('pushkey', record)
1784 if op.reply is not None:
1784 if op.reply is not None:
1785 rpart = op.reply.newpart('reply:pushkey')
1785 rpart = op.reply.newpart('reply:pushkey')
1786 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1786 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1787 rpart.addparam('return', '%i' % ret, mandatory=False)
1787 rpart.addparam('return', '%i' % ret, mandatory=False)
1788 if inpart.mandatory and not ret:
1788 if inpart.mandatory and not ret:
1789 kwargs = {}
1789 kwargs = {}
1790 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1790 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1791 if key in inpart.params:
1791 if key in inpart.params:
1792 kwargs[key] = inpart.params[key]
1792 kwargs[key] = inpart.params[key]
1793 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1793 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1794
1794
1795 def _readphaseheads(inpart):
1795 def _readphaseheads(inpart):
1796 headsbyphase = [[] for i in phases.allphases]
1796 headsbyphase = [[] for i in phases.allphases]
1797 entrysize = struct.calcsize(_fphasesentry)
1797 entrysize = struct.calcsize(_fphasesentry)
1798 while True:
1798 while True:
1799 entry = inpart.read(entrysize)
1799 entry = inpart.read(entrysize)
1800 if len(entry) < entrysize:
1800 if len(entry) < entrysize:
1801 if entry:
1801 if entry:
1802 raise error.Abort(_('bad phase-heads bundle part'))
1802 raise error.Abort(_('bad phase-heads bundle part'))
1803 break
1803 break
1804 phase, node = struct.unpack(_fphasesentry, entry)
1804 phase, node = struct.unpack(_fphasesentry, entry)
1805 headsbyphase[phase].append(node)
1805 headsbyphase[phase].append(node)
1806 return headsbyphase
1806 return headsbyphase
1807
1807
1808 @parthandler('phase-heads')
1808 @parthandler('phase-heads')
1809 def handlephases(op, inpart):
1809 def handlephases(op, inpart):
1810 """apply phases from bundle part to repo"""
1810 """apply phases from bundle part to repo"""
1811 headsbyphase = _readphaseheads(inpart)
1811 headsbyphase = _readphaseheads(inpart)
1812 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1812 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1813
1813
1814 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1814 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1815 def handlepushkeyreply(op, inpart):
1815 def handlepushkeyreply(op, inpart):
1816 """retrieve the result of a pushkey request"""
1816 """retrieve the result of a pushkey request"""
1817 ret = int(inpart.params['return'])
1817 ret = int(inpart.params['return'])
1818 partid = int(inpart.params['in-reply-to'])
1818 partid = int(inpart.params['in-reply-to'])
1819 op.records.add('pushkey', {'return': ret}, partid)
1819 op.records.add('pushkey', {'return': ret}, partid)
1820
1820
1821 @parthandler('obsmarkers')
1821 @parthandler('obsmarkers')
1822 def handleobsmarker(op, inpart):
1822 def handleobsmarker(op, inpart):
1823 """add a stream of obsmarkers to the repo"""
1823 """add a stream of obsmarkers to the repo"""
1824 tr = op.gettransaction()
1824 tr = op.gettransaction()
1825 markerdata = inpart.read()
1825 markerdata = inpart.read()
1826 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1826 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1827 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1827 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1828 % len(markerdata))
1828 % len(markerdata))
1829 # The mergemarkers call will crash if marker creation is not enabled.
1829 # The mergemarkers call will crash if marker creation is not enabled.
1830 # we want to avoid this if the part is advisory.
1830 # we want to avoid this if the part is advisory.
1831 if not inpart.mandatory and op.repo.obsstore.readonly:
1831 if not inpart.mandatory and op.repo.obsstore.readonly:
1832 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1832 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1833 return
1833 return
1834 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1834 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1835 op.repo.invalidatevolatilesets()
1835 op.repo.invalidatevolatilesets()
1836 if new:
1836 if new:
1837 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1837 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1838 op.records.add('obsmarkers', {'new': new})
1838 op.records.add('obsmarkers', {'new': new})
1839 if op.reply is not None:
1839 if op.reply is not None:
1840 rpart = op.reply.newpart('reply:obsmarkers')
1840 rpart = op.reply.newpart('reply:obsmarkers')
1841 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1841 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1842 rpart.addparam('new', '%i' % new, mandatory=False)
1842 rpart.addparam('new', '%i' % new, mandatory=False)
1843
1843
1844
1844
1845 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1845 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1846 def handleobsmarkerreply(op, inpart):
1846 def handleobsmarkerreply(op, inpart):
1847 """retrieve the result of a pushkey request"""
1847 """retrieve the result of a pushkey request"""
1848 ret = int(inpart.params['new'])
1848 ret = int(inpart.params['new'])
1849 partid = int(inpart.params['in-reply-to'])
1849 partid = int(inpart.params['in-reply-to'])
1850 op.records.add('obsmarkers', {'new': ret}, partid)
1850 op.records.add('obsmarkers', {'new': ret}, partid)
1851
1851
1852 @parthandler('hgtagsfnodes')
1852 @parthandler('hgtagsfnodes')
1853 def handlehgtagsfnodes(op, inpart):
1853 def handlehgtagsfnodes(op, inpart):
1854 """Applies .hgtags fnodes cache entries to the local repo.
1854 """Applies .hgtags fnodes cache entries to the local repo.
1855
1855
1856 Payload is pairs of 20 byte changeset nodes and filenodes.
1856 Payload is pairs of 20 byte changeset nodes and filenodes.
1857 """
1857 """
1858 # Grab the transaction so we ensure that we have the lock at this point.
1858 # Grab the transaction so we ensure that we have the lock at this point.
1859 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1859 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1860 op.gettransaction()
1860 op.gettransaction()
1861 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1861 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1862
1862
1863 count = 0
1863 count = 0
1864 while True:
1864 while True:
1865 node = inpart.read(20)
1865 node = inpart.read(20)
1866 fnode = inpart.read(20)
1866 fnode = inpart.read(20)
1867 if len(node) < 20 or len(fnode) < 20:
1867 if len(node) < 20 or len(fnode) < 20:
1868 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1868 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1869 break
1869 break
1870 cache.setfnode(node, fnode)
1870 cache.setfnode(node, fnode)
1871 count += 1
1871 count += 1
1872
1872
1873 cache.write()
1873 cache.write()
1874 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1874 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now