##// END OF EJS Templates
bundle2: add some debugging information to the not-a-bundle error...
Siddharth Agarwal -
r33122:0aae80d1 default
parent child Browse files
Show More
@@ -1,1845 +1,1848 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 phases,
161 phases,
162 pushkey,
162 pushkey,
163 pycompat,
163 pycompat,
164 tags,
164 tags,
165 url,
165 url,
166 util,
166 util,
167 )
167 )
168
168
169 urlerr = util.urlerr
169 urlerr = util.urlerr
170 urlreq = util.urlreq
170 urlreq = util.urlreq
171
171
172 _pack = struct.pack
172 _pack = struct.pack
173 _unpack = struct.unpack
173 _unpack = struct.unpack
174
174
175 _fstreamparamsize = '>i'
175 _fstreamparamsize = '>i'
176 _fpartheadersize = '>i'
176 _fpartheadersize = '>i'
177 _fparttypesize = '>B'
177 _fparttypesize = '>B'
178 _fpartid = '>I'
178 _fpartid = '>I'
179 _fpayloadsize = '>i'
179 _fpayloadsize = '>i'
180 _fpartparamcount = '>BB'
180 _fpartparamcount = '>BB'
181
181
182 _fphasesentry = '>i20s'
182 _fphasesentry = '>i20s'
183
183
184 preferedchunksize = 4096
184 preferedchunksize = 4096
185
185
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187
187
188 def outdebug(ui, message):
188 def outdebug(ui, message):
189 """debug regarding output stream (bundling)"""
189 """debug regarding output stream (bundling)"""
190 if ui.configbool('devel', 'bundle2.debug', False):
190 if ui.configbool('devel', 'bundle2.debug', False):
191 ui.debug('bundle2-output: %s\n' % message)
191 ui.debug('bundle2-output: %s\n' % message)
192
192
193 def indebug(ui, message):
193 def indebug(ui, message):
194 """debug on input stream (unbundling)"""
194 """debug on input stream (unbundling)"""
195 if ui.configbool('devel', 'bundle2.debug', False):
195 if ui.configbool('devel', 'bundle2.debug', False):
196 ui.debug('bundle2-input: %s\n' % message)
196 ui.debug('bundle2-input: %s\n' % message)
197
197
198 def validateparttype(parttype):
198 def validateparttype(parttype):
199 """raise ValueError if a parttype contains invalid character"""
199 """raise ValueError if a parttype contains invalid character"""
200 if _parttypeforbidden.search(parttype):
200 if _parttypeforbidden.search(parttype):
201 raise ValueError(parttype)
201 raise ValueError(parttype)
202
202
203 def _makefpartparamsizes(nbparams):
203 def _makefpartparamsizes(nbparams):
204 """return a struct format to read part parameter sizes
204 """return a struct format to read part parameter sizes
205
205
206 The number parameters is variable so we need to build that format
206 The number parameters is variable so we need to build that format
207 dynamically.
207 dynamically.
208 """
208 """
209 return '>'+('BB'*nbparams)
209 return '>'+('BB'*nbparams)
210
210
211 parthandlermapping = {}
211 parthandlermapping = {}
212
212
213 def parthandler(parttype, params=()):
213 def parthandler(parttype, params=()):
214 """decorator that register a function as a bundle2 part handler
214 """decorator that register a function as a bundle2 part handler
215
215
216 eg::
216 eg::
217
217
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 def myparttypehandler(...):
219 def myparttypehandler(...):
220 '''process a part of type "my part".'''
220 '''process a part of type "my part".'''
221 ...
221 ...
222 """
222 """
223 validateparttype(parttype)
223 validateparttype(parttype)
224 def _decorator(func):
224 def _decorator(func):
225 lparttype = parttype.lower() # enforce lower case matching.
225 lparttype = parttype.lower() # enforce lower case matching.
226 assert lparttype not in parthandlermapping
226 assert lparttype not in parthandlermapping
227 parthandlermapping[lparttype] = func
227 parthandlermapping[lparttype] = func
228 func.params = frozenset(params)
228 func.params = frozenset(params)
229 return func
229 return func
230 return _decorator
230 return _decorator
231
231
232 class unbundlerecords(object):
232 class unbundlerecords(object):
233 """keep record of what happens during and unbundle
233 """keep record of what happens during and unbundle
234
234
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 category of record and obj is an arbitrary object.
236 category of record and obj is an arbitrary object.
237
237
238 `records['cat']` will return all entries of this category 'cat'.
238 `records['cat']` will return all entries of this category 'cat'.
239
239
240 Iterating on the object itself will yield `('category', obj)` tuples
240 Iterating on the object itself will yield `('category', obj)` tuples
241 for all entries.
241 for all entries.
242
242
243 All iterations happens in chronological order.
243 All iterations happens in chronological order.
244 """
244 """
245
245
246 def __init__(self):
246 def __init__(self):
247 self._categories = {}
247 self._categories = {}
248 self._sequences = []
248 self._sequences = []
249 self._replies = {}
249 self._replies = {}
250
250
251 def add(self, category, entry, inreplyto=None):
251 def add(self, category, entry, inreplyto=None):
252 """add a new record of a given category.
252 """add a new record of a given category.
253
253
254 The entry can then be retrieved in the list returned by
254 The entry can then be retrieved in the list returned by
255 self['category']."""
255 self['category']."""
256 self._categories.setdefault(category, []).append(entry)
256 self._categories.setdefault(category, []).append(entry)
257 self._sequences.append((category, entry))
257 self._sequences.append((category, entry))
258 if inreplyto is not None:
258 if inreplyto is not None:
259 self.getreplies(inreplyto).add(category, entry)
259 self.getreplies(inreplyto).add(category, entry)
260
260
261 def getreplies(self, partid):
261 def getreplies(self, partid):
262 """get the records that are replies to a specific part"""
262 """get the records that are replies to a specific part"""
263 return self._replies.setdefault(partid, unbundlerecords())
263 return self._replies.setdefault(partid, unbundlerecords())
264
264
265 def __getitem__(self, cat):
265 def __getitem__(self, cat):
266 return tuple(self._categories.get(cat, ()))
266 return tuple(self._categories.get(cat, ()))
267
267
268 def __iter__(self):
268 def __iter__(self):
269 return iter(self._sequences)
269 return iter(self._sequences)
270
270
271 def __len__(self):
271 def __len__(self):
272 return len(self._sequences)
272 return len(self._sequences)
273
273
274 def __nonzero__(self):
274 def __nonzero__(self):
275 return bool(self._sequences)
275 return bool(self._sequences)
276
276
277 __bool__ = __nonzero__
277 __bool__ = __nonzero__
278
278
279 class bundleoperation(object):
279 class bundleoperation(object):
280 """an object that represents a single bundling process
280 """an object that represents a single bundling process
281
281
282 Its purpose is to carry unbundle-related objects and states.
282 Its purpose is to carry unbundle-related objects and states.
283
283
284 A new object should be created at the beginning of each bundle processing.
284 A new object should be created at the beginning of each bundle processing.
285 The object is to be returned by the processing function.
285 The object is to be returned by the processing function.
286
286
287 The object has very little content now it will ultimately contain:
287 The object has very little content now it will ultimately contain:
288 * an access to the repo the bundle is applied to,
288 * an access to the repo the bundle is applied to,
289 * a ui object,
289 * a ui object,
290 * a way to retrieve a transaction to add changes to the repo,
290 * a way to retrieve a transaction to add changes to the repo,
291 * a way to record the result of processing each part,
291 * a way to record the result of processing each part,
292 * a way to construct a bundle response when applicable.
292 * a way to construct a bundle response when applicable.
293 """
293 """
294
294
295 def __init__(self, repo, transactiongetter, captureoutput=True):
295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 self.repo = repo
296 self.repo = repo
297 self.ui = repo.ui
297 self.ui = repo.ui
298 self.records = unbundlerecords()
298 self.records = unbundlerecords()
299 self.gettransaction = transactiongetter
299 self.gettransaction = transactiongetter
300 self.reply = None
300 self.reply = None
301 self.captureoutput = captureoutput
301 self.captureoutput = captureoutput
302
302
303 class TransactionUnavailable(RuntimeError):
303 class TransactionUnavailable(RuntimeError):
304 pass
304 pass
305
305
306 def _notransaction():
306 def _notransaction():
307 """default method to get a transaction while processing a bundle
307 """default method to get a transaction while processing a bundle
308
308
309 Raise an exception to highlight the fact that no transaction was expected
309 Raise an exception to highlight the fact that no transaction was expected
310 to be created"""
310 to be created"""
311 raise TransactionUnavailable()
311 raise TransactionUnavailable()
312
312
313 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
313 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
314 # transform me into unbundler.apply() as soon as the freeze is lifted
314 # transform me into unbundler.apply() as soon as the freeze is lifted
315 if isinstance(unbundler, unbundle20):
315 if isinstance(unbundler, unbundle20):
316 tr.hookargs['bundle2'] = '1'
316 tr.hookargs['bundle2'] = '1'
317 if source is not None and 'source' not in tr.hookargs:
317 if source is not None and 'source' not in tr.hookargs:
318 tr.hookargs['source'] = source
318 tr.hookargs['source'] = source
319 if url is not None and 'url' not in tr.hookargs:
319 if url is not None and 'url' not in tr.hookargs:
320 tr.hookargs['url'] = url
320 tr.hookargs['url'] = url
321 return processbundle(repo, unbundler, lambda: tr)
321 return processbundle(repo, unbundler, lambda: tr)
322 else:
322 else:
323 # the transactiongetter won't be used, but we might as well set it
323 # the transactiongetter won't be used, but we might as well set it
324 op = bundleoperation(repo, lambda: tr)
324 op = bundleoperation(repo, lambda: tr)
325 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
325 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
326 return op
326 return op
327
327
328 def processbundle(repo, unbundler, transactiongetter=None, op=None):
328 def processbundle(repo, unbundler, transactiongetter=None, op=None):
329 """This function process a bundle, apply effect to/from a repo
329 """This function process a bundle, apply effect to/from a repo
330
330
331 It iterates over each part then searches for and uses the proper handling
331 It iterates over each part then searches for and uses the proper handling
332 code to process the part. Parts are processed in order.
332 code to process the part. Parts are processed in order.
333
333
334 Unknown Mandatory part will abort the process.
334 Unknown Mandatory part will abort the process.
335
335
336 It is temporarily possible to provide a prebuilt bundleoperation to the
336 It is temporarily possible to provide a prebuilt bundleoperation to the
337 function. This is used to ensure output is properly propagated in case of
337 function. This is used to ensure output is properly propagated in case of
338 an error during the unbundling. This output capturing part will likely be
338 an error during the unbundling. This output capturing part will likely be
339 reworked and this ability will probably go away in the process.
339 reworked and this ability will probably go away in the process.
340 """
340 """
341 if op is None:
341 if op is None:
342 if transactiongetter is None:
342 if transactiongetter is None:
343 transactiongetter = _notransaction
343 transactiongetter = _notransaction
344 op = bundleoperation(repo, transactiongetter)
344 op = bundleoperation(repo, transactiongetter)
345 # todo:
345 # todo:
346 # - replace this is a init function soon.
346 # - replace this is a init function soon.
347 # - exception catching
347 # - exception catching
348 unbundler.params
348 unbundler.params
349 if repo.ui.debugflag:
349 if repo.ui.debugflag:
350 msg = ['bundle2-input-bundle:']
350 msg = ['bundle2-input-bundle:']
351 if unbundler.params:
351 if unbundler.params:
352 msg.append(' %i params')
352 msg.append(' %i params')
353 if op.gettransaction is None or op.gettransaction is _notransaction:
353 if op.gettransaction is None or op.gettransaction is _notransaction:
354 msg.append(' no-transaction')
354 msg.append(' no-transaction')
355 else:
355 else:
356 msg.append(' with-transaction')
356 msg.append(' with-transaction')
357 msg.append('\n')
357 msg.append('\n')
358 repo.ui.debug(''.join(msg))
358 repo.ui.debug(''.join(msg))
359 iterparts = enumerate(unbundler.iterparts())
359 iterparts = enumerate(unbundler.iterparts())
360 part = None
360 part = None
361 nbpart = 0
361 nbpart = 0
362 try:
362 try:
363 for nbpart, part in iterparts:
363 for nbpart, part in iterparts:
364 _processpart(op, part)
364 _processpart(op, part)
365 except Exception as exc:
365 except Exception as exc:
366 # Any exceptions seeking to the end of the bundle at this point are
366 # Any exceptions seeking to the end of the bundle at this point are
367 # almost certainly related to the underlying stream being bad.
367 # almost certainly related to the underlying stream being bad.
368 # And, chances are that the exception we're handling is related to
368 # And, chances are that the exception we're handling is related to
369 # getting in that bad state. So, we swallow the seeking error and
369 # getting in that bad state. So, we swallow the seeking error and
370 # re-raise the original error.
370 # re-raise the original error.
371 seekerror = False
371 seekerror = False
372 try:
372 try:
373 for nbpart, part in iterparts:
373 for nbpart, part in iterparts:
374 # consume the bundle content
374 # consume the bundle content
375 part.seek(0, 2)
375 part.seek(0, 2)
376 except Exception:
376 except Exception:
377 seekerror = True
377 seekerror = True
378
378
379 # Small hack to let caller code distinguish exceptions from bundle2
379 # Small hack to let caller code distinguish exceptions from bundle2
380 # processing from processing the old format. This is mostly
380 # processing from processing the old format. This is mostly
381 # needed to handle different return codes to unbundle according to the
381 # needed to handle different return codes to unbundle according to the
382 # type of bundle. We should probably clean up or drop this return code
382 # type of bundle. We should probably clean up or drop this return code
383 # craziness in a future version.
383 # craziness in a future version.
384 exc.duringunbundle2 = True
384 exc.duringunbundle2 = True
385 salvaged = []
385 salvaged = []
386 replycaps = None
386 replycaps = None
387 if op.reply is not None:
387 if op.reply is not None:
388 salvaged = op.reply.salvageoutput()
388 salvaged = op.reply.salvageoutput()
389 replycaps = op.reply.capabilities
389 replycaps = op.reply.capabilities
390 exc._replycaps = replycaps
390 exc._replycaps = replycaps
391 exc._bundle2salvagedoutput = salvaged
391 exc._bundle2salvagedoutput = salvaged
392
392
393 # Re-raising from a variable loses the original stack. So only use
393 # Re-raising from a variable loses the original stack. So only use
394 # that form if we need to.
394 # that form if we need to.
395 if seekerror:
395 if seekerror:
396 raise exc
396 raise exc
397 else:
397 else:
398 raise
398 raise
399 finally:
399 finally:
400 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
400 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
401
401
402 return op
402 return op
403
403
404 def _processchangegroup(op, cg, tr, source, url, **kwargs):
404 def _processchangegroup(op, cg, tr, source, url, **kwargs):
405 ret, addednodes = cg.apply(op.repo, tr, source, url, **kwargs)
405 ret, addednodes = cg.apply(op.repo, tr, source, url, **kwargs)
406 op.records.add('changegroup', {
406 op.records.add('changegroup', {
407 'return': ret,
407 'return': ret,
408 'addednodes': addednodes,
408 'addednodes': addednodes,
409 })
409 })
410 return ret
410 return ret
411
411
412 def _processpart(op, part):
412 def _processpart(op, part):
413 """process a single part from a bundle
413 """process a single part from a bundle
414
414
415 The part is guaranteed to have been fully consumed when the function exits
415 The part is guaranteed to have been fully consumed when the function exits
416 (even if an exception is raised)."""
416 (even if an exception is raised)."""
417 status = 'unknown' # used by debug output
417 status = 'unknown' # used by debug output
418 hardabort = False
418 hardabort = False
419 try:
419 try:
420 try:
420 try:
421 handler = parthandlermapping.get(part.type)
421 handler = parthandlermapping.get(part.type)
422 if handler is None:
422 if handler is None:
423 status = 'unsupported-type'
423 status = 'unsupported-type'
424 raise error.BundleUnknownFeatureError(parttype=part.type)
424 raise error.BundleUnknownFeatureError(parttype=part.type)
425 indebug(op.ui, 'found a handler for part %r' % part.type)
425 indebug(op.ui, 'found a handler for part %r' % part.type)
426 unknownparams = part.mandatorykeys - handler.params
426 unknownparams = part.mandatorykeys - handler.params
427 if unknownparams:
427 if unknownparams:
428 unknownparams = list(unknownparams)
428 unknownparams = list(unknownparams)
429 unknownparams.sort()
429 unknownparams.sort()
430 status = 'unsupported-params (%s)' % unknownparams
430 status = 'unsupported-params (%s)' % unknownparams
431 raise error.BundleUnknownFeatureError(parttype=part.type,
431 raise error.BundleUnknownFeatureError(parttype=part.type,
432 params=unknownparams)
432 params=unknownparams)
433 status = 'supported'
433 status = 'supported'
434 except error.BundleUnknownFeatureError as exc:
434 except error.BundleUnknownFeatureError as exc:
435 if part.mandatory: # mandatory parts
435 if part.mandatory: # mandatory parts
436 raise
436 raise
437 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
437 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
438 return # skip to part processing
438 return # skip to part processing
439 finally:
439 finally:
440 if op.ui.debugflag:
440 if op.ui.debugflag:
441 msg = ['bundle2-input-part: "%s"' % part.type]
441 msg = ['bundle2-input-part: "%s"' % part.type]
442 if not part.mandatory:
442 if not part.mandatory:
443 msg.append(' (advisory)')
443 msg.append(' (advisory)')
444 nbmp = len(part.mandatorykeys)
444 nbmp = len(part.mandatorykeys)
445 nbap = len(part.params) - nbmp
445 nbap = len(part.params) - nbmp
446 if nbmp or nbap:
446 if nbmp or nbap:
447 msg.append(' (params:')
447 msg.append(' (params:')
448 if nbmp:
448 if nbmp:
449 msg.append(' %i mandatory' % nbmp)
449 msg.append(' %i mandatory' % nbmp)
450 if nbap:
450 if nbap:
451 msg.append(' %i advisory' % nbmp)
451 msg.append(' %i advisory' % nbmp)
452 msg.append(')')
452 msg.append(')')
453 msg.append(' %s\n' % status)
453 msg.append(' %s\n' % status)
454 op.ui.debug(''.join(msg))
454 op.ui.debug(''.join(msg))
455
455
456 # handler is called outside the above try block so that we don't
456 # handler is called outside the above try block so that we don't
457 # risk catching KeyErrors from anything other than the
457 # risk catching KeyErrors from anything other than the
458 # parthandlermapping lookup (any KeyError raised by handler()
458 # parthandlermapping lookup (any KeyError raised by handler()
459 # itself represents a defect of a different variety).
459 # itself represents a defect of a different variety).
460 output = None
460 output = None
461 if op.captureoutput and op.reply is not None:
461 if op.captureoutput and op.reply is not None:
462 op.ui.pushbuffer(error=True, subproc=True)
462 op.ui.pushbuffer(error=True, subproc=True)
463 output = ''
463 output = ''
464 try:
464 try:
465 handler(op, part)
465 handler(op, part)
466 finally:
466 finally:
467 if output is not None:
467 if output is not None:
468 output = op.ui.popbuffer()
468 output = op.ui.popbuffer()
469 if output:
469 if output:
470 outpart = op.reply.newpart('output', data=output,
470 outpart = op.reply.newpart('output', data=output,
471 mandatory=False)
471 mandatory=False)
472 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
472 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
473 # If exiting or interrupted, do not attempt to seek the stream in the
473 # If exiting or interrupted, do not attempt to seek the stream in the
474 # finally block below. This makes abort faster.
474 # finally block below. This makes abort faster.
475 except (SystemExit, KeyboardInterrupt):
475 except (SystemExit, KeyboardInterrupt):
476 hardabort = True
476 hardabort = True
477 raise
477 raise
478 finally:
478 finally:
479 # consume the part content to not corrupt the stream.
479 # consume the part content to not corrupt the stream.
480 if not hardabort:
480 if not hardabort:
481 part.seek(0, 2)
481 part.seek(0, 2)
482
482
483
483
484 def decodecaps(blob):
484 def decodecaps(blob):
485 """decode a bundle2 caps bytes blob into a dictionary
485 """decode a bundle2 caps bytes blob into a dictionary
486
486
487 The blob is a list of capabilities (one per line)
487 The blob is a list of capabilities (one per line)
488 Capabilities may have values using a line of the form::
488 Capabilities may have values using a line of the form::
489
489
490 capability=value1,value2,value3
490 capability=value1,value2,value3
491
491
492 The values are always a list."""
492 The values are always a list."""
493 caps = {}
493 caps = {}
494 for line in blob.splitlines():
494 for line in blob.splitlines():
495 if not line:
495 if not line:
496 continue
496 continue
497 if '=' not in line:
497 if '=' not in line:
498 key, vals = line, ()
498 key, vals = line, ()
499 else:
499 else:
500 key, vals = line.split('=', 1)
500 key, vals = line.split('=', 1)
501 vals = vals.split(',')
501 vals = vals.split(',')
502 key = urlreq.unquote(key)
502 key = urlreq.unquote(key)
503 vals = [urlreq.unquote(v) for v in vals]
503 vals = [urlreq.unquote(v) for v in vals]
504 caps[key] = vals
504 caps[key] = vals
505 return caps
505 return caps
506
506
507 def encodecaps(caps):
507 def encodecaps(caps):
508 """encode a bundle2 caps dictionary into a bytes blob"""
508 """encode a bundle2 caps dictionary into a bytes blob"""
509 chunks = []
509 chunks = []
510 for ca in sorted(caps):
510 for ca in sorted(caps):
511 vals = caps[ca]
511 vals = caps[ca]
512 ca = urlreq.quote(ca)
512 ca = urlreq.quote(ca)
513 vals = [urlreq.quote(v) for v in vals]
513 vals = [urlreq.quote(v) for v in vals]
514 if vals:
514 if vals:
515 ca = "%s=%s" % (ca, ','.join(vals))
515 ca = "%s=%s" % (ca, ','.join(vals))
516 chunks.append(ca)
516 chunks.append(ca)
517 return '\n'.join(chunks)
517 return '\n'.join(chunks)
518
518
519 bundletypes = {
519 bundletypes = {
520 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
520 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
521 # since the unification ssh accepts a header but there
521 # since the unification ssh accepts a header but there
522 # is no capability signaling it.
522 # is no capability signaling it.
523 "HG20": (), # special-cased below
523 "HG20": (), # special-cased below
524 "HG10UN": ("HG10UN", 'UN'),
524 "HG10UN": ("HG10UN", 'UN'),
525 "HG10BZ": ("HG10", 'BZ'),
525 "HG10BZ": ("HG10", 'BZ'),
526 "HG10GZ": ("HG10GZ", 'GZ'),
526 "HG10GZ": ("HG10GZ", 'GZ'),
527 }
527 }
528
528
529 # hgweb uses this list to communicate its preferred type
529 # hgweb uses this list to communicate its preferred type
530 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
530 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
531
531
532 class bundle20(object):
532 class bundle20(object):
533 """represent an outgoing bundle2 container
533 """represent an outgoing bundle2 container
534
534
535 Use the `addparam` method to add stream level parameter. and `newpart` to
535 Use the `addparam` method to add stream level parameter. and `newpart` to
536 populate it. Then call `getchunks` to retrieve all the binary chunks of
536 populate it. Then call `getchunks` to retrieve all the binary chunks of
537 data that compose the bundle2 container."""
537 data that compose the bundle2 container."""
538
538
539 _magicstring = 'HG20'
539 _magicstring = 'HG20'
540
540
541 def __init__(self, ui, capabilities=()):
541 def __init__(self, ui, capabilities=()):
542 self.ui = ui
542 self.ui = ui
543 self._params = []
543 self._params = []
544 self._parts = []
544 self._parts = []
545 self.capabilities = dict(capabilities)
545 self.capabilities = dict(capabilities)
546 self._compengine = util.compengines.forbundletype('UN')
546 self._compengine = util.compengines.forbundletype('UN')
547 self._compopts = None
547 self._compopts = None
548
548
549 def setcompression(self, alg, compopts=None):
549 def setcompression(self, alg, compopts=None):
550 """setup core part compression to <alg>"""
550 """setup core part compression to <alg>"""
551 if alg in (None, 'UN'):
551 if alg in (None, 'UN'):
552 return
552 return
553 assert not any(n.lower() == 'compression' for n, v in self._params)
553 assert not any(n.lower() == 'compression' for n, v in self._params)
554 self.addparam('Compression', alg)
554 self.addparam('Compression', alg)
555 self._compengine = util.compengines.forbundletype(alg)
555 self._compengine = util.compengines.forbundletype(alg)
556 self._compopts = compopts
556 self._compopts = compopts
557
557
558 @property
558 @property
559 def nbparts(self):
559 def nbparts(self):
560 """total number of parts added to the bundler"""
560 """total number of parts added to the bundler"""
561 return len(self._parts)
561 return len(self._parts)
562
562
563 # methods used to defines the bundle2 content
563 # methods used to defines the bundle2 content
564 def addparam(self, name, value=None):
564 def addparam(self, name, value=None):
565 """add a stream level parameter"""
565 """add a stream level parameter"""
566 if not name:
566 if not name:
567 raise ValueError('empty parameter name')
567 raise ValueError('empty parameter name')
568 if name[0] not in string.letters:
568 if name[0] not in string.letters:
569 raise ValueError('non letter first character: %r' % name)
569 raise ValueError('non letter first character: %r' % name)
570 self._params.append((name, value))
570 self._params.append((name, value))
571
571
572 def addpart(self, part):
572 def addpart(self, part):
573 """add a new part to the bundle2 container
573 """add a new part to the bundle2 container
574
574
575 Parts contains the actual applicative payload."""
575 Parts contains the actual applicative payload."""
576 assert part.id is None
576 assert part.id is None
577 part.id = len(self._parts) # very cheap counter
577 part.id = len(self._parts) # very cheap counter
578 self._parts.append(part)
578 self._parts.append(part)
579
579
580 def newpart(self, typeid, *args, **kwargs):
580 def newpart(self, typeid, *args, **kwargs):
581 """create a new part and add it to the containers
581 """create a new part and add it to the containers
582
582
583 As the part is directly added to the containers. For now, this means
583 As the part is directly added to the containers. For now, this means
584 that any failure to properly initialize the part after calling
584 that any failure to properly initialize the part after calling
585 ``newpart`` should result in a failure of the whole bundling process.
585 ``newpart`` should result in a failure of the whole bundling process.
586
586
587 You can still fall back to manually create and add if you need better
587 You can still fall back to manually create and add if you need better
588 control."""
588 control."""
589 part = bundlepart(typeid, *args, **kwargs)
589 part = bundlepart(typeid, *args, **kwargs)
590 self.addpart(part)
590 self.addpart(part)
591 return part
591 return part
592
592
593 # methods used to generate the bundle2 stream
593 # methods used to generate the bundle2 stream
594 def getchunks(self):
594 def getchunks(self):
595 if self.ui.debugflag:
595 if self.ui.debugflag:
596 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
596 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
597 if self._params:
597 if self._params:
598 msg.append(' (%i params)' % len(self._params))
598 msg.append(' (%i params)' % len(self._params))
599 msg.append(' %i parts total\n' % len(self._parts))
599 msg.append(' %i parts total\n' % len(self._parts))
600 self.ui.debug(''.join(msg))
600 self.ui.debug(''.join(msg))
601 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
601 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
602 yield self._magicstring
602 yield self._magicstring
603 param = self._paramchunk()
603 param = self._paramchunk()
604 outdebug(self.ui, 'bundle parameter: %s' % param)
604 outdebug(self.ui, 'bundle parameter: %s' % param)
605 yield _pack(_fstreamparamsize, len(param))
605 yield _pack(_fstreamparamsize, len(param))
606 if param:
606 if param:
607 yield param
607 yield param
608 for chunk in self._compengine.compressstream(self._getcorechunk(),
608 for chunk in self._compengine.compressstream(self._getcorechunk(),
609 self._compopts):
609 self._compopts):
610 yield chunk
610 yield chunk
611
611
612 def _paramchunk(self):
612 def _paramchunk(self):
613 """return a encoded version of all stream parameters"""
613 """return a encoded version of all stream parameters"""
614 blocks = []
614 blocks = []
615 for par, value in self._params:
615 for par, value in self._params:
616 par = urlreq.quote(par)
616 par = urlreq.quote(par)
617 if value is not None:
617 if value is not None:
618 value = urlreq.quote(value)
618 value = urlreq.quote(value)
619 par = '%s=%s' % (par, value)
619 par = '%s=%s' % (par, value)
620 blocks.append(par)
620 blocks.append(par)
621 return ' '.join(blocks)
621 return ' '.join(blocks)
622
622
623 def _getcorechunk(self):
623 def _getcorechunk(self):
624 """yield chunk for the core part of the bundle
624 """yield chunk for the core part of the bundle
625
625
626 (all but headers and parameters)"""
626 (all but headers and parameters)"""
627 outdebug(self.ui, 'start of parts')
627 outdebug(self.ui, 'start of parts')
628 for part in self._parts:
628 for part in self._parts:
629 outdebug(self.ui, 'bundle part: "%s"' % part.type)
629 outdebug(self.ui, 'bundle part: "%s"' % part.type)
630 for chunk in part.getchunks(ui=self.ui):
630 for chunk in part.getchunks(ui=self.ui):
631 yield chunk
631 yield chunk
632 outdebug(self.ui, 'end of bundle')
632 outdebug(self.ui, 'end of bundle')
633 yield _pack(_fpartheadersize, 0)
633 yield _pack(_fpartheadersize, 0)
634
634
635
635
636 def salvageoutput(self):
636 def salvageoutput(self):
637 """return a list with a copy of all output parts in the bundle
637 """return a list with a copy of all output parts in the bundle
638
638
639 This is meant to be used during error handling to make sure we preserve
639 This is meant to be used during error handling to make sure we preserve
640 server output"""
640 server output"""
641 salvaged = []
641 salvaged = []
642 for part in self._parts:
642 for part in self._parts:
643 if part.type.startswith('output'):
643 if part.type.startswith('output'):
644 salvaged.append(part.copy())
644 salvaged.append(part.copy())
645 return salvaged
645 return salvaged
646
646
647
647
648 class unpackermixin(object):
648 class unpackermixin(object):
649 """A mixin to extract bytes and struct data from a stream"""
649 """A mixin to extract bytes and struct data from a stream"""
650
650
651 def __init__(self, fp):
651 def __init__(self, fp):
652 self._fp = fp
652 self._fp = fp
653
653
654 def _unpack(self, format):
654 def _unpack(self, format):
655 """unpack this struct format from the stream
655 """unpack this struct format from the stream
656
656
657 This method is meant for internal usage by the bundle2 protocol only.
657 This method is meant for internal usage by the bundle2 protocol only.
658 They directly manipulate the low level stream including bundle2 level
658 They directly manipulate the low level stream including bundle2 level
659 instruction.
659 instruction.
660
660
661 Do not use it to implement higher-level logic or methods."""
661 Do not use it to implement higher-level logic or methods."""
662 data = self._readexact(struct.calcsize(format))
662 data = self._readexact(struct.calcsize(format))
663 return _unpack(format, data)
663 return _unpack(format, data)
664
664
665 def _readexact(self, size):
665 def _readexact(self, size):
666 """read exactly <size> bytes from the stream
666 """read exactly <size> bytes from the stream
667
667
668 This method is meant for internal usage by the bundle2 protocol only.
668 This method is meant for internal usage by the bundle2 protocol only.
669 They directly manipulate the low level stream including bundle2 level
669 They directly manipulate the low level stream including bundle2 level
670 instruction.
670 instruction.
671
671
672 Do not use it to implement higher-level logic or methods."""
672 Do not use it to implement higher-level logic or methods."""
673 return changegroup.readexactly(self._fp, size)
673 return changegroup.readexactly(self._fp, size)
674
674
675 def getunbundler(ui, fp, magicstring=None):
675 def getunbundler(ui, fp, magicstring=None):
676 """return a valid unbundler object for a given magicstring"""
676 """return a valid unbundler object for a given magicstring"""
677 if magicstring is None:
677 if magicstring is None:
678 magicstring = changegroup.readexactly(fp, 4)
678 magicstring = changegroup.readexactly(fp, 4)
679 magic, version = magicstring[0:2], magicstring[2:4]
679 magic, version = magicstring[0:2], magicstring[2:4]
680 if magic != 'HG':
680 if magic != 'HG':
681 ui.debug(
682 "error: invalid magic: %r (version %r), should be 'HG'\n"
683 % (magic, version))
681 raise error.Abort(_('not a Mercurial bundle'))
684 raise error.Abort(_('not a Mercurial bundle'))
682 unbundlerclass = formatmap.get(version)
685 unbundlerclass = formatmap.get(version)
683 if unbundlerclass is None:
686 if unbundlerclass is None:
684 raise error.Abort(_('unknown bundle version %s') % version)
687 raise error.Abort(_('unknown bundle version %s') % version)
685 unbundler = unbundlerclass(ui, fp)
688 unbundler = unbundlerclass(ui, fp)
686 indebug(ui, 'start processing of %s stream' % magicstring)
689 indebug(ui, 'start processing of %s stream' % magicstring)
687 return unbundler
690 return unbundler
688
691
689 class unbundle20(unpackermixin):
692 class unbundle20(unpackermixin):
690 """interpret a bundle2 stream
693 """interpret a bundle2 stream
691
694
692 This class is fed with a binary stream and yields parts through its
695 This class is fed with a binary stream and yields parts through its
693 `iterparts` methods."""
696 `iterparts` methods."""
694
697
695 _magicstring = 'HG20'
698 _magicstring = 'HG20'
696
699
697 def __init__(self, ui, fp):
700 def __init__(self, ui, fp):
698 """If header is specified, we do not read it out of the stream."""
701 """If header is specified, we do not read it out of the stream."""
699 self.ui = ui
702 self.ui = ui
700 self._compengine = util.compengines.forbundletype('UN')
703 self._compengine = util.compengines.forbundletype('UN')
701 self._compressed = None
704 self._compressed = None
702 super(unbundle20, self).__init__(fp)
705 super(unbundle20, self).__init__(fp)
703
706
704 @util.propertycache
707 @util.propertycache
705 def params(self):
708 def params(self):
706 """dictionary of stream level parameters"""
709 """dictionary of stream level parameters"""
707 indebug(self.ui, 'reading bundle2 stream parameters')
710 indebug(self.ui, 'reading bundle2 stream parameters')
708 params = {}
711 params = {}
709 paramssize = self._unpack(_fstreamparamsize)[0]
712 paramssize = self._unpack(_fstreamparamsize)[0]
710 if paramssize < 0:
713 if paramssize < 0:
711 raise error.BundleValueError('negative bundle param size: %i'
714 raise error.BundleValueError('negative bundle param size: %i'
712 % paramssize)
715 % paramssize)
713 if paramssize:
716 if paramssize:
714 params = self._readexact(paramssize)
717 params = self._readexact(paramssize)
715 params = self._processallparams(params)
718 params = self._processallparams(params)
716 return params
719 return params
717
720
718 def _processallparams(self, paramsblock):
721 def _processallparams(self, paramsblock):
719 """"""
722 """"""
720 params = util.sortdict()
723 params = util.sortdict()
721 for p in paramsblock.split(' '):
724 for p in paramsblock.split(' '):
722 p = p.split('=', 1)
725 p = p.split('=', 1)
723 p = [urlreq.unquote(i) for i in p]
726 p = [urlreq.unquote(i) for i in p]
724 if len(p) < 2:
727 if len(p) < 2:
725 p.append(None)
728 p.append(None)
726 self._processparam(*p)
729 self._processparam(*p)
727 params[p[0]] = p[1]
730 params[p[0]] = p[1]
728 return params
731 return params
729
732
730
733
731 def _processparam(self, name, value):
734 def _processparam(self, name, value):
732 """process a parameter, applying its effect if needed
735 """process a parameter, applying its effect if needed
733
736
734 Parameter starting with a lower case letter are advisory and will be
737 Parameter starting with a lower case letter are advisory and will be
735 ignored when unknown. Those starting with an upper case letter are
738 ignored when unknown. Those starting with an upper case letter are
736 mandatory and will this function will raise a KeyError when unknown.
739 mandatory and will this function will raise a KeyError when unknown.
737
740
738 Note: no option are currently supported. Any input will be either
741 Note: no option are currently supported. Any input will be either
739 ignored or failing.
742 ignored or failing.
740 """
743 """
741 if not name:
744 if not name:
742 raise ValueError('empty parameter name')
745 raise ValueError('empty parameter name')
743 if name[0] not in string.letters:
746 if name[0] not in string.letters:
744 raise ValueError('non letter first character: %r' % name)
747 raise ValueError('non letter first character: %r' % name)
745 try:
748 try:
746 handler = b2streamparamsmap[name.lower()]
749 handler = b2streamparamsmap[name.lower()]
747 except KeyError:
750 except KeyError:
748 if name[0].islower():
751 if name[0].islower():
749 indebug(self.ui, "ignoring unknown parameter %r" % name)
752 indebug(self.ui, "ignoring unknown parameter %r" % name)
750 else:
753 else:
751 raise error.BundleUnknownFeatureError(params=(name,))
754 raise error.BundleUnknownFeatureError(params=(name,))
752 else:
755 else:
753 handler(self, name, value)
756 handler(self, name, value)
754
757
755 def _forwardchunks(self):
758 def _forwardchunks(self):
756 """utility to transfer a bundle2 as binary
759 """utility to transfer a bundle2 as binary
757
760
758 This is made necessary by the fact the 'getbundle' command over 'ssh'
761 This is made necessary by the fact the 'getbundle' command over 'ssh'
759 have no way to know then the reply end, relying on the bundle to be
762 have no way to know then the reply end, relying on the bundle to be
760 interpreted to know its end. This is terrible and we are sorry, but we
763 interpreted to know its end. This is terrible and we are sorry, but we
761 needed to move forward to get general delta enabled.
764 needed to move forward to get general delta enabled.
762 """
765 """
763 yield self._magicstring
766 yield self._magicstring
764 assert 'params' not in vars(self)
767 assert 'params' not in vars(self)
765 paramssize = self._unpack(_fstreamparamsize)[0]
768 paramssize = self._unpack(_fstreamparamsize)[0]
766 if paramssize < 0:
769 if paramssize < 0:
767 raise error.BundleValueError('negative bundle param size: %i'
770 raise error.BundleValueError('negative bundle param size: %i'
768 % paramssize)
771 % paramssize)
769 yield _pack(_fstreamparamsize, paramssize)
772 yield _pack(_fstreamparamsize, paramssize)
770 if paramssize:
773 if paramssize:
771 params = self._readexact(paramssize)
774 params = self._readexact(paramssize)
772 self._processallparams(params)
775 self._processallparams(params)
773 yield params
776 yield params
774 assert self._compengine.bundletype == 'UN'
777 assert self._compengine.bundletype == 'UN'
775 # From there, payload might need to be decompressed
778 # From there, payload might need to be decompressed
776 self._fp = self._compengine.decompressorreader(self._fp)
779 self._fp = self._compengine.decompressorreader(self._fp)
777 emptycount = 0
780 emptycount = 0
778 while emptycount < 2:
781 while emptycount < 2:
779 # so we can brainlessly loop
782 # so we can brainlessly loop
780 assert _fpartheadersize == _fpayloadsize
783 assert _fpartheadersize == _fpayloadsize
781 size = self._unpack(_fpartheadersize)[0]
784 size = self._unpack(_fpartheadersize)[0]
782 yield _pack(_fpartheadersize, size)
785 yield _pack(_fpartheadersize, size)
783 if size:
786 if size:
784 emptycount = 0
787 emptycount = 0
785 else:
788 else:
786 emptycount += 1
789 emptycount += 1
787 continue
790 continue
788 if size == flaginterrupt:
791 if size == flaginterrupt:
789 continue
792 continue
790 elif size < 0:
793 elif size < 0:
791 raise error.BundleValueError('negative chunk size: %i')
794 raise error.BundleValueError('negative chunk size: %i')
792 yield self._readexact(size)
795 yield self._readexact(size)
793
796
794
797
795 def iterparts(self):
798 def iterparts(self):
796 """yield all parts contained in the stream"""
799 """yield all parts contained in the stream"""
797 # make sure param have been loaded
800 # make sure param have been loaded
798 self.params
801 self.params
799 # From there, payload need to be decompressed
802 # From there, payload need to be decompressed
800 self._fp = self._compengine.decompressorreader(self._fp)
803 self._fp = self._compengine.decompressorreader(self._fp)
801 indebug(self.ui, 'start extraction of bundle2 parts')
804 indebug(self.ui, 'start extraction of bundle2 parts')
802 headerblock = self._readpartheader()
805 headerblock = self._readpartheader()
803 while headerblock is not None:
806 while headerblock is not None:
804 part = unbundlepart(self.ui, headerblock, self._fp)
807 part = unbundlepart(self.ui, headerblock, self._fp)
805 yield part
808 yield part
806 part.seek(0, 2)
809 part.seek(0, 2)
807 headerblock = self._readpartheader()
810 headerblock = self._readpartheader()
808 indebug(self.ui, 'end of bundle2 stream')
811 indebug(self.ui, 'end of bundle2 stream')
809
812
810 def _readpartheader(self):
813 def _readpartheader(self):
811 """reads a part header size and return the bytes blob
814 """reads a part header size and return the bytes blob
812
815
813 returns None if empty"""
816 returns None if empty"""
814 headersize = self._unpack(_fpartheadersize)[0]
817 headersize = self._unpack(_fpartheadersize)[0]
815 if headersize < 0:
818 if headersize < 0:
816 raise error.BundleValueError('negative part header size: %i'
819 raise error.BundleValueError('negative part header size: %i'
817 % headersize)
820 % headersize)
818 indebug(self.ui, 'part header size: %i' % headersize)
821 indebug(self.ui, 'part header size: %i' % headersize)
819 if headersize:
822 if headersize:
820 return self._readexact(headersize)
823 return self._readexact(headersize)
821 return None
824 return None
822
825
823 def compressed(self):
826 def compressed(self):
824 self.params # load params
827 self.params # load params
825 return self._compressed
828 return self._compressed
826
829
827 def close(self):
830 def close(self):
828 """close underlying file"""
831 """close underlying file"""
829 if util.safehasattr(self._fp, 'close'):
832 if util.safehasattr(self._fp, 'close'):
830 return self._fp.close()
833 return self._fp.close()
831
834
832 formatmap = {'20': unbundle20}
835 formatmap = {'20': unbundle20}
833
836
834 b2streamparamsmap = {}
837 b2streamparamsmap = {}
835
838
836 def b2streamparamhandler(name):
839 def b2streamparamhandler(name):
837 """register a handler for a stream level parameter"""
840 """register a handler for a stream level parameter"""
838 def decorator(func):
841 def decorator(func):
839 assert name not in formatmap
842 assert name not in formatmap
840 b2streamparamsmap[name] = func
843 b2streamparamsmap[name] = func
841 return func
844 return func
842 return decorator
845 return decorator
843
846
844 @b2streamparamhandler('compression')
847 @b2streamparamhandler('compression')
845 def processcompression(unbundler, param, value):
848 def processcompression(unbundler, param, value):
846 """read compression parameter and install payload decompression"""
849 """read compression parameter and install payload decompression"""
847 if value not in util.compengines.supportedbundletypes:
850 if value not in util.compengines.supportedbundletypes:
848 raise error.BundleUnknownFeatureError(params=(param,),
851 raise error.BundleUnknownFeatureError(params=(param,),
849 values=(value,))
852 values=(value,))
850 unbundler._compengine = util.compengines.forbundletype(value)
853 unbundler._compengine = util.compengines.forbundletype(value)
851 if value is not None:
854 if value is not None:
852 unbundler._compressed = True
855 unbundler._compressed = True
853
856
854 class bundlepart(object):
857 class bundlepart(object):
855 """A bundle2 part contains application level payload
858 """A bundle2 part contains application level payload
856
859
857 The part `type` is used to route the part to the application level
860 The part `type` is used to route the part to the application level
858 handler.
861 handler.
859
862
860 The part payload is contained in ``part.data``. It could be raw bytes or a
863 The part payload is contained in ``part.data``. It could be raw bytes or a
861 generator of byte chunks.
864 generator of byte chunks.
862
865
863 You can add parameters to the part using the ``addparam`` method.
866 You can add parameters to the part using the ``addparam`` method.
864 Parameters can be either mandatory (default) or advisory. Remote side
867 Parameters can be either mandatory (default) or advisory. Remote side
865 should be able to safely ignore the advisory ones.
868 should be able to safely ignore the advisory ones.
866
869
867 Both data and parameters cannot be modified after the generation has begun.
870 Both data and parameters cannot be modified after the generation has begun.
868 """
871 """
869
872
870 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
873 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
871 data='', mandatory=True):
874 data='', mandatory=True):
872 validateparttype(parttype)
875 validateparttype(parttype)
873 self.id = None
876 self.id = None
874 self.type = parttype
877 self.type = parttype
875 self._data = data
878 self._data = data
876 self._mandatoryparams = list(mandatoryparams)
879 self._mandatoryparams = list(mandatoryparams)
877 self._advisoryparams = list(advisoryparams)
880 self._advisoryparams = list(advisoryparams)
878 # checking for duplicated entries
881 # checking for duplicated entries
879 self._seenparams = set()
882 self._seenparams = set()
880 for pname, __ in self._mandatoryparams + self._advisoryparams:
883 for pname, __ in self._mandatoryparams + self._advisoryparams:
881 if pname in self._seenparams:
884 if pname in self._seenparams:
882 raise error.ProgrammingError('duplicated params: %s' % pname)
885 raise error.ProgrammingError('duplicated params: %s' % pname)
883 self._seenparams.add(pname)
886 self._seenparams.add(pname)
884 # status of the part's generation:
887 # status of the part's generation:
885 # - None: not started,
888 # - None: not started,
886 # - False: currently generated,
889 # - False: currently generated,
887 # - True: generation done.
890 # - True: generation done.
888 self._generated = None
891 self._generated = None
889 self.mandatory = mandatory
892 self.mandatory = mandatory
890
893
891 def __repr__(self):
894 def __repr__(self):
892 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
895 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
893 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
896 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
894 % (cls, id(self), self.id, self.type, self.mandatory))
897 % (cls, id(self), self.id, self.type, self.mandatory))
895
898
896 def copy(self):
899 def copy(self):
897 """return a copy of the part
900 """return a copy of the part
898
901
899 The new part have the very same content but no partid assigned yet.
902 The new part have the very same content but no partid assigned yet.
900 Parts with generated data cannot be copied."""
903 Parts with generated data cannot be copied."""
901 assert not util.safehasattr(self.data, 'next')
904 assert not util.safehasattr(self.data, 'next')
902 return self.__class__(self.type, self._mandatoryparams,
905 return self.__class__(self.type, self._mandatoryparams,
903 self._advisoryparams, self._data, self.mandatory)
906 self._advisoryparams, self._data, self.mandatory)
904
907
905 # methods used to defines the part content
908 # methods used to defines the part content
906 @property
909 @property
907 def data(self):
910 def data(self):
908 return self._data
911 return self._data
909
912
910 @data.setter
913 @data.setter
911 def data(self, data):
914 def data(self, data):
912 if self._generated is not None:
915 if self._generated is not None:
913 raise error.ReadOnlyPartError('part is being generated')
916 raise error.ReadOnlyPartError('part is being generated')
914 self._data = data
917 self._data = data
915
918
916 @property
919 @property
917 def mandatoryparams(self):
920 def mandatoryparams(self):
918 # make it an immutable tuple to force people through ``addparam``
921 # make it an immutable tuple to force people through ``addparam``
919 return tuple(self._mandatoryparams)
922 return tuple(self._mandatoryparams)
920
923
921 @property
924 @property
922 def advisoryparams(self):
925 def advisoryparams(self):
923 # make it an immutable tuple to force people through ``addparam``
926 # make it an immutable tuple to force people through ``addparam``
924 return tuple(self._advisoryparams)
927 return tuple(self._advisoryparams)
925
928
926 def addparam(self, name, value='', mandatory=True):
929 def addparam(self, name, value='', mandatory=True):
927 """add a parameter to the part
930 """add a parameter to the part
928
931
929 If 'mandatory' is set to True, the remote handler must claim support
932 If 'mandatory' is set to True, the remote handler must claim support
930 for this parameter or the unbundling will be aborted.
933 for this parameter or the unbundling will be aborted.
931
934
932 The 'name' and 'value' cannot exceed 255 bytes each.
935 The 'name' and 'value' cannot exceed 255 bytes each.
933 """
936 """
934 if self._generated is not None:
937 if self._generated is not None:
935 raise error.ReadOnlyPartError('part is being generated')
938 raise error.ReadOnlyPartError('part is being generated')
936 if name in self._seenparams:
939 if name in self._seenparams:
937 raise ValueError('duplicated params: %s' % name)
940 raise ValueError('duplicated params: %s' % name)
938 self._seenparams.add(name)
941 self._seenparams.add(name)
939 params = self._advisoryparams
942 params = self._advisoryparams
940 if mandatory:
943 if mandatory:
941 params = self._mandatoryparams
944 params = self._mandatoryparams
942 params.append((name, value))
945 params.append((name, value))
943
946
944 # methods used to generates the bundle2 stream
947 # methods used to generates the bundle2 stream
945 def getchunks(self, ui):
948 def getchunks(self, ui):
946 if self._generated is not None:
949 if self._generated is not None:
947 raise error.ProgrammingError('part can only be consumed once')
950 raise error.ProgrammingError('part can only be consumed once')
948 self._generated = False
951 self._generated = False
949
952
950 if ui.debugflag:
953 if ui.debugflag:
951 msg = ['bundle2-output-part: "%s"' % self.type]
954 msg = ['bundle2-output-part: "%s"' % self.type]
952 if not self.mandatory:
955 if not self.mandatory:
953 msg.append(' (advisory)')
956 msg.append(' (advisory)')
954 nbmp = len(self.mandatoryparams)
957 nbmp = len(self.mandatoryparams)
955 nbap = len(self.advisoryparams)
958 nbap = len(self.advisoryparams)
956 if nbmp or nbap:
959 if nbmp or nbap:
957 msg.append(' (params:')
960 msg.append(' (params:')
958 if nbmp:
961 if nbmp:
959 msg.append(' %i mandatory' % nbmp)
962 msg.append(' %i mandatory' % nbmp)
960 if nbap:
963 if nbap:
961 msg.append(' %i advisory' % nbmp)
964 msg.append(' %i advisory' % nbmp)
962 msg.append(')')
965 msg.append(')')
963 if not self.data:
966 if not self.data:
964 msg.append(' empty payload')
967 msg.append(' empty payload')
965 elif util.safehasattr(self.data, 'next'):
968 elif util.safehasattr(self.data, 'next'):
966 msg.append(' streamed payload')
969 msg.append(' streamed payload')
967 else:
970 else:
968 msg.append(' %i bytes payload' % len(self.data))
971 msg.append(' %i bytes payload' % len(self.data))
969 msg.append('\n')
972 msg.append('\n')
970 ui.debug(''.join(msg))
973 ui.debug(''.join(msg))
971
974
972 #### header
975 #### header
973 if self.mandatory:
976 if self.mandatory:
974 parttype = self.type.upper()
977 parttype = self.type.upper()
975 else:
978 else:
976 parttype = self.type.lower()
979 parttype = self.type.lower()
977 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
980 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
978 ## parttype
981 ## parttype
979 header = [_pack(_fparttypesize, len(parttype)),
982 header = [_pack(_fparttypesize, len(parttype)),
980 parttype, _pack(_fpartid, self.id),
983 parttype, _pack(_fpartid, self.id),
981 ]
984 ]
982 ## parameters
985 ## parameters
983 # count
986 # count
984 manpar = self.mandatoryparams
987 manpar = self.mandatoryparams
985 advpar = self.advisoryparams
988 advpar = self.advisoryparams
986 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
989 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
987 # size
990 # size
988 parsizes = []
991 parsizes = []
989 for key, value in manpar:
992 for key, value in manpar:
990 parsizes.append(len(key))
993 parsizes.append(len(key))
991 parsizes.append(len(value))
994 parsizes.append(len(value))
992 for key, value in advpar:
995 for key, value in advpar:
993 parsizes.append(len(key))
996 parsizes.append(len(key))
994 parsizes.append(len(value))
997 parsizes.append(len(value))
995 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
998 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
996 header.append(paramsizes)
999 header.append(paramsizes)
997 # key, value
1000 # key, value
998 for key, value in manpar:
1001 for key, value in manpar:
999 header.append(key)
1002 header.append(key)
1000 header.append(value)
1003 header.append(value)
1001 for key, value in advpar:
1004 for key, value in advpar:
1002 header.append(key)
1005 header.append(key)
1003 header.append(value)
1006 header.append(value)
1004 ## finalize header
1007 ## finalize header
1005 headerchunk = ''.join(header)
1008 headerchunk = ''.join(header)
1006 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1009 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1007 yield _pack(_fpartheadersize, len(headerchunk))
1010 yield _pack(_fpartheadersize, len(headerchunk))
1008 yield headerchunk
1011 yield headerchunk
1009 ## payload
1012 ## payload
1010 try:
1013 try:
1011 for chunk in self._payloadchunks():
1014 for chunk in self._payloadchunks():
1012 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1015 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1013 yield _pack(_fpayloadsize, len(chunk))
1016 yield _pack(_fpayloadsize, len(chunk))
1014 yield chunk
1017 yield chunk
1015 except GeneratorExit:
1018 except GeneratorExit:
1016 # GeneratorExit means that nobody is listening for our
1019 # GeneratorExit means that nobody is listening for our
1017 # results anyway, so just bail quickly rather than trying
1020 # results anyway, so just bail quickly rather than trying
1018 # to produce an error part.
1021 # to produce an error part.
1019 ui.debug('bundle2-generatorexit\n')
1022 ui.debug('bundle2-generatorexit\n')
1020 raise
1023 raise
1021 except BaseException as exc:
1024 except BaseException as exc:
1022 # backup exception data for later
1025 # backup exception data for later
1023 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1026 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1024 % exc)
1027 % exc)
1025 tb = sys.exc_info()[2]
1028 tb = sys.exc_info()[2]
1026 msg = 'unexpected error: %s' % exc
1029 msg = 'unexpected error: %s' % exc
1027 interpart = bundlepart('error:abort', [('message', msg)],
1030 interpart = bundlepart('error:abort', [('message', msg)],
1028 mandatory=False)
1031 mandatory=False)
1029 interpart.id = 0
1032 interpart.id = 0
1030 yield _pack(_fpayloadsize, -1)
1033 yield _pack(_fpayloadsize, -1)
1031 for chunk in interpart.getchunks(ui=ui):
1034 for chunk in interpart.getchunks(ui=ui):
1032 yield chunk
1035 yield chunk
1033 outdebug(ui, 'closing payload chunk')
1036 outdebug(ui, 'closing payload chunk')
1034 # abort current part payload
1037 # abort current part payload
1035 yield _pack(_fpayloadsize, 0)
1038 yield _pack(_fpayloadsize, 0)
1036 pycompat.raisewithtb(exc, tb)
1039 pycompat.raisewithtb(exc, tb)
1037 # end of payload
1040 # end of payload
1038 outdebug(ui, 'closing payload chunk')
1041 outdebug(ui, 'closing payload chunk')
1039 yield _pack(_fpayloadsize, 0)
1042 yield _pack(_fpayloadsize, 0)
1040 self._generated = True
1043 self._generated = True
1041
1044
1042 def _payloadchunks(self):
1045 def _payloadchunks(self):
1043 """yield chunks of a the part payload
1046 """yield chunks of a the part payload
1044
1047
1045 Exists to handle the different methods to provide data to a part."""
1048 Exists to handle the different methods to provide data to a part."""
1046 # we only support fixed size data now.
1049 # we only support fixed size data now.
1047 # This will be improved in the future.
1050 # This will be improved in the future.
1048 if util.safehasattr(self.data, 'next'):
1051 if util.safehasattr(self.data, 'next'):
1049 buff = util.chunkbuffer(self.data)
1052 buff = util.chunkbuffer(self.data)
1050 chunk = buff.read(preferedchunksize)
1053 chunk = buff.read(preferedchunksize)
1051 while chunk:
1054 while chunk:
1052 yield chunk
1055 yield chunk
1053 chunk = buff.read(preferedchunksize)
1056 chunk = buff.read(preferedchunksize)
1054 elif len(self.data):
1057 elif len(self.data):
1055 yield self.data
1058 yield self.data
1056
1059
1057
1060
1058 flaginterrupt = -1
1061 flaginterrupt = -1
1059
1062
1060 class interrupthandler(unpackermixin):
1063 class interrupthandler(unpackermixin):
1061 """read one part and process it with restricted capability
1064 """read one part and process it with restricted capability
1062
1065
1063 This allows to transmit exception raised on the producer size during part
1066 This allows to transmit exception raised on the producer size during part
1064 iteration while the consumer is reading a part.
1067 iteration while the consumer is reading a part.
1065
1068
1066 Part processed in this manner only have access to a ui object,"""
1069 Part processed in this manner only have access to a ui object,"""
1067
1070
1068 def __init__(self, ui, fp):
1071 def __init__(self, ui, fp):
1069 super(interrupthandler, self).__init__(fp)
1072 super(interrupthandler, self).__init__(fp)
1070 self.ui = ui
1073 self.ui = ui
1071
1074
1072 def _readpartheader(self):
1075 def _readpartheader(self):
1073 """reads a part header size and return the bytes blob
1076 """reads a part header size and return the bytes blob
1074
1077
1075 returns None if empty"""
1078 returns None if empty"""
1076 headersize = self._unpack(_fpartheadersize)[0]
1079 headersize = self._unpack(_fpartheadersize)[0]
1077 if headersize < 0:
1080 if headersize < 0:
1078 raise error.BundleValueError('negative part header size: %i'
1081 raise error.BundleValueError('negative part header size: %i'
1079 % headersize)
1082 % headersize)
1080 indebug(self.ui, 'part header size: %i\n' % headersize)
1083 indebug(self.ui, 'part header size: %i\n' % headersize)
1081 if headersize:
1084 if headersize:
1082 return self._readexact(headersize)
1085 return self._readexact(headersize)
1083 return None
1086 return None
1084
1087
1085 def __call__(self):
1088 def __call__(self):
1086
1089
1087 self.ui.debug('bundle2-input-stream-interrupt:'
1090 self.ui.debug('bundle2-input-stream-interrupt:'
1088 ' opening out of band context\n')
1091 ' opening out of band context\n')
1089 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1092 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1090 headerblock = self._readpartheader()
1093 headerblock = self._readpartheader()
1091 if headerblock is None:
1094 if headerblock is None:
1092 indebug(self.ui, 'no part found during interruption.')
1095 indebug(self.ui, 'no part found during interruption.')
1093 return
1096 return
1094 part = unbundlepart(self.ui, headerblock, self._fp)
1097 part = unbundlepart(self.ui, headerblock, self._fp)
1095 op = interruptoperation(self.ui)
1098 op = interruptoperation(self.ui)
1096 _processpart(op, part)
1099 _processpart(op, part)
1097 self.ui.debug('bundle2-input-stream-interrupt:'
1100 self.ui.debug('bundle2-input-stream-interrupt:'
1098 ' closing out of band context\n')
1101 ' closing out of band context\n')
1099
1102
1100 class interruptoperation(object):
1103 class interruptoperation(object):
1101 """A limited operation to be use by part handler during interruption
1104 """A limited operation to be use by part handler during interruption
1102
1105
1103 It only have access to an ui object.
1106 It only have access to an ui object.
1104 """
1107 """
1105
1108
1106 def __init__(self, ui):
1109 def __init__(self, ui):
1107 self.ui = ui
1110 self.ui = ui
1108 self.reply = None
1111 self.reply = None
1109 self.captureoutput = False
1112 self.captureoutput = False
1110
1113
1111 @property
1114 @property
1112 def repo(self):
1115 def repo(self):
1113 raise error.ProgrammingError('no repo access from stream interruption')
1116 raise error.ProgrammingError('no repo access from stream interruption')
1114
1117
1115 def gettransaction(self):
1118 def gettransaction(self):
1116 raise TransactionUnavailable('no repo access from stream interruption')
1119 raise TransactionUnavailable('no repo access from stream interruption')
1117
1120
1118 class unbundlepart(unpackermixin):
1121 class unbundlepart(unpackermixin):
1119 """a bundle part read from a bundle"""
1122 """a bundle part read from a bundle"""
1120
1123
1121 def __init__(self, ui, header, fp):
1124 def __init__(self, ui, header, fp):
1122 super(unbundlepart, self).__init__(fp)
1125 super(unbundlepart, self).__init__(fp)
1123 self._seekable = (util.safehasattr(fp, 'seek') and
1126 self._seekable = (util.safehasattr(fp, 'seek') and
1124 util.safehasattr(fp, 'tell'))
1127 util.safehasattr(fp, 'tell'))
1125 self.ui = ui
1128 self.ui = ui
1126 # unbundle state attr
1129 # unbundle state attr
1127 self._headerdata = header
1130 self._headerdata = header
1128 self._headeroffset = 0
1131 self._headeroffset = 0
1129 self._initialized = False
1132 self._initialized = False
1130 self.consumed = False
1133 self.consumed = False
1131 # part data
1134 # part data
1132 self.id = None
1135 self.id = None
1133 self.type = None
1136 self.type = None
1134 self.mandatoryparams = None
1137 self.mandatoryparams = None
1135 self.advisoryparams = None
1138 self.advisoryparams = None
1136 self.params = None
1139 self.params = None
1137 self.mandatorykeys = ()
1140 self.mandatorykeys = ()
1138 self._payloadstream = None
1141 self._payloadstream = None
1139 self._readheader()
1142 self._readheader()
1140 self._mandatory = None
1143 self._mandatory = None
1141 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1144 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1142 self._pos = 0
1145 self._pos = 0
1143
1146
1144 def _fromheader(self, size):
1147 def _fromheader(self, size):
1145 """return the next <size> byte from the header"""
1148 """return the next <size> byte from the header"""
1146 offset = self._headeroffset
1149 offset = self._headeroffset
1147 data = self._headerdata[offset:(offset + size)]
1150 data = self._headerdata[offset:(offset + size)]
1148 self._headeroffset = offset + size
1151 self._headeroffset = offset + size
1149 return data
1152 return data
1150
1153
1151 def _unpackheader(self, format):
1154 def _unpackheader(self, format):
1152 """read given format from header
1155 """read given format from header
1153
1156
1154 This automatically compute the size of the format to read."""
1157 This automatically compute the size of the format to read."""
1155 data = self._fromheader(struct.calcsize(format))
1158 data = self._fromheader(struct.calcsize(format))
1156 return _unpack(format, data)
1159 return _unpack(format, data)
1157
1160
1158 def _initparams(self, mandatoryparams, advisoryparams):
1161 def _initparams(self, mandatoryparams, advisoryparams):
1159 """internal function to setup all logic related parameters"""
1162 """internal function to setup all logic related parameters"""
1160 # make it read only to prevent people touching it by mistake.
1163 # make it read only to prevent people touching it by mistake.
1161 self.mandatoryparams = tuple(mandatoryparams)
1164 self.mandatoryparams = tuple(mandatoryparams)
1162 self.advisoryparams = tuple(advisoryparams)
1165 self.advisoryparams = tuple(advisoryparams)
1163 # user friendly UI
1166 # user friendly UI
1164 self.params = util.sortdict(self.mandatoryparams)
1167 self.params = util.sortdict(self.mandatoryparams)
1165 self.params.update(self.advisoryparams)
1168 self.params.update(self.advisoryparams)
1166 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1169 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1167
1170
1168 def _payloadchunks(self, chunknum=0):
1171 def _payloadchunks(self, chunknum=0):
1169 '''seek to specified chunk and start yielding data'''
1172 '''seek to specified chunk and start yielding data'''
1170 if len(self._chunkindex) == 0:
1173 if len(self._chunkindex) == 0:
1171 assert chunknum == 0, 'Must start with chunk 0'
1174 assert chunknum == 0, 'Must start with chunk 0'
1172 self._chunkindex.append((0, self._tellfp()))
1175 self._chunkindex.append((0, self._tellfp()))
1173 else:
1176 else:
1174 assert chunknum < len(self._chunkindex), \
1177 assert chunknum < len(self._chunkindex), \
1175 'Unknown chunk %d' % chunknum
1178 'Unknown chunk %d' % chunknum
1176 self._seekfp(self._chunkindex[chunknum][1])
1179 self._seekfp(self._chunkindex[chunknum][1])
1177
1180
1178 pos = self._chunkindex[chunknum][0]
1181 pos = self._chunkindex[chunknum][0]
1179 payloadsize = self._unpack(_fpayloadsize)[0]
1182 payloadsize = self._unpack(_fpayloadsize)[0]
1180 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1183 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1181 while payloadsize:
1184 while payloadsize:
1182 if payloadsize == flaginterrupt:
1185 if payloadsize == flaginterrupt:
1183 # interruption detection, the handler will now read a
1186 # interruption detection, the handler will now read a
1184 # single part and process it.
1187 # single part and process it.
1185 interrupthandler(self.ui, self._fp)()
1188 interrupthandler(self.ui, self._fp)()
1186 elif payloadsize < 0:
1189 elif payloadsize < 0:
1187 msg = 'negative payload chunk size: %i' % payloadsize
1190 msg = 'negative payload chunk size: %i' % payloadsize
1188 raise error.BundleValueError(msg)
1191 raise error.BundleValueError(msg)
1189 else:
1192 else:
1190 result = self._readexact(payloadsize)
1193 result = self._readexact(payloadsize)
1191 chunknum += 1
1194 chunknum += 1
1192 pos += payloadsize
1195 pos += payloadsize
1193 if chunknum == len(self._chunkindex):
1196 if chunknum == len(self._chunkindex):
1194 self._chunkindex.append((pos, self._tellfp()))
1197 self._chunkindex.append((pos, self._tellfp()))
1195 yield result
1198 yield result
1196 payloadsize = self._unpack(_fpayloadsize)[0]
1199 payloadsize = self._unpack(_fpayloadsize)[0]
1197 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1200 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1198
1201
1199 def _findchunk(self, pos):
1202 def _findchunk(self, pos):
1200 '''for a given payload position, return a chunk number and offset'''
1203 '''for a given payload position, return a chunk number and offset'''
1201 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1204 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1202 if ppos == pos:
1205 if ppos == pos:
1203 return chunk, 0
1206 return chunk, 0
1204 elif ppos > pos:
1207 elif ppos > pos:
1205 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1208 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1206 raise ValueError('Unknown chunk')
1209 raise ValueError('Unknown chunk')
1207
1210
1208 def _readheader(self):
1211 def _readheader(self):
1209 """read the header and setup the object"""
1212 """read the header and setup the object"""
1210 typesize = self._unpackheader(_fparttypesize)[0]
1213 typesize = self._unpackheader(_fparttypesize)[0]
1211 self.type = self._fromheader(typesize)
1214 self.type = self._fromheader(typesize)
1212 indebug(self.ui, 'part type: "%s"' % self.type)
1215 indebug(self.ui, 'part type: "%s"' % self.type)
1213 self.id = self._unpackheader(_fpartid)[0]
1216 self.id = self._unpackheader(_fpartid)[0]
1214 indebug(self.ui, 'part id: "%s"' % self.id)
1217 indebug(self.ui, 'part id: "%s"' % self.id)
1215 # extract mandatory bit from type
1218 # extract mandatory bit from type
1216 self.mandatory = (self.type != self.type.lower())
1219 self.mandatory = (self.type != self.type.lower())
1217 self.type = self.type.lower()
1220 self.type = self.type.lower()
1218 ## reading parameters
1221 ## reading parameters
1219 # param count
1222 # param count
1220 mancount, advcount = self._unpackheader(_fpartparamcount)
1223 mancount, advcount = self._unpackheader(_fpartparamcount)
1221 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1224 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1222 # param size
1225 # param size
1223 fparamsizes = _makefpartparamsizes(mancount + advcount)
1226 fparamsizes = _makefpartparamsizes(mancount + advcount)
1224 paramsizes = self._unpackheader(fparamsizes)
1227 paramsizes = self._unpackheader(fparamsizes)
1225 # make it a list of couple again
1228 # make it a list of couple again
1226 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1229 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1227 # split mandatory from advisory
1230 # split mandatory from advisory
1228 mansizes = paramsizes[:mancount]
1231 mansizes = paramsizes[:mancount]
1229 advsizes = paramsizes[mancount:]
1232 advsizes = paramsizes[mancount:]
1230 # retrieve param value
1233 # retrieve param value
1231 manparams = []
1234 manparams = []
1232 for key, value in mansizes:
1235 for key, value in mansizes:
1233 manparams.append((self._fromheader(key), self._fromheader(value)))
1236 manparams.append((self._fromheader(key), self._fromheader(value)))
1234 advparams = []
1237 advparams = []
1235 for key, value in advsizes:
1238 for key, value in advsizes:
1236 advparams.append((self._fromheader(key), self._fromheader(value)))
1239 advparams.append((self._fromheader(key), self._fromheader(value)))
1237 self._initparams(manparams, advparams)
1240 self._initparams(manparams, advparams)
1238 ## part payload
1241 ## part payload
1239 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1242 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1240 # we read the data, tell it
1243 # we read the data, tell it
1241 self._initialized = True
1244 self._initialized = True
1242
1245
1243 def read(self, size=None):
1246 def read(self, size=None):
1244 """read payload data"""
1247 """read payload data"""
1245 if not self._initialized:
1248 if not self._initialized:
1246 self._readheader()
1249 self._readheader()
1247 if size is None:
1250 if size is None:
1248 data = self._payloadstream.read()
1251 data = self._payloadstream.read()
1249 else:
1252 else:
1250 data = self._payloadstream.read(size)
1253 data = self._payloadstream.read(size)
1251 self._pos += len(data)
1254 self._pos += len(data)
1252 if size is None or len(data) < size:
1255 if size is None or len(data) < size:
1253 if not self.consumed and self._pos:
1256 if not self.consumed and self._pos:
1254 self.ui.debug('bundle2-input-part: total payload size %i\n'
1257 self.ui.debug('bundle2-input-part: total payload size %i\n'
1255 % self._pos)
1258 % self._pos)
1256 self.consumed = True
1259 self.consumed = True
1257 return data
1260 return data
1258
1261
1259 def tell(self):
1262 def tell(self):
1260 return self._pos
1263 return self._pos
1261
1264
1262 def seek(self, offset, whence=0):
1265 def seek(self, offset, whence=0):
1263 if whence == 0:
1266 if whence == 0:
1264 newpos = offset
1267 newpos = offset
1265 elif whence == 1:
1268 elif whence == 1:
1266 newpos = self._pos + offset
1269 newpos = self._pos + offset
1267 elif whence == 2:
1270 elif whence == 2:
1268 if not self.consumed:
1271 if not self.consumed:
1269 self.read()
1272 self.read()
1270 newpos = self._chunkindex[-1][0] - offset
1273 newpos = self._chunkindex[-1][0] - offset
1271 else:
1274 else:
1272 raise ValueError('Unknown whence value: %r' % (whence,))
1275 raise ValueError('Unknown whence value: %r' % (whence,))
1273
1276
1274 if newpos > self._chunkindex[-1][0] and not self.consumed:
1277 if newpos > self._chunkindex[-1][0] and not self.consumed:
1275 self.read()
1278 self.read()
1276 if not 0 <= newpos <= self._chunkindex[-1][0]:
1279 if not 0 <= newpos <= self._chunkindex[-1][0]:
1277 raise ValueError('Offset out of range')
1280 raise ValueError('Offset out of range')
1278
1281
1279 if self._pos != newpos:
1282 if self._pos != newpos:
1280 chunk, internaloffset = self._findchunk(newpos)
1283 chunk, internaloffset = self._findchunk(newpos)
1281 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1284 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1282 adjust = self.read(internaloffset)
1285 adjust = self.read(internaloffset)
1283 if len(adjust) != internaloffset:
1286 if len(adjust) != internaloffset:
1284 raise error.Abort(_('Seek failed\n'))
1287 raise error.Abort(_('Seek failed\n'))
1285 self._pos = newpos
1288 self._pos = newpos
1286
1289
1287 def _seekfp(self, offset, whence=0):
1290 def _seekfp(self, offset, whence=0):
1288 """move the underlying file pointer
1291 """move the underlying file pointer
1289
1292
1290 This method is meant for internal usage by the bundle2 protocol only.
1293 This method is meant for internal usage by the bundle2 protocol only.
1291 They directly manipulate the low level stream including bundle2 level
1294 They directly manipulate the low level stream including bundle2 level
1292 instruction.
1295 instruction.
1293
1296
1294 Do not use it to implement higher-level logic or methods."""
1297 Do not use it to implement higher-level logic or methods."""
1295 if self._seekable:
1298 if self._seekable:
1296 return self._fp.seek(offset, whence)
1299 return self._fp.seek(offset, whence)
1297 else:
1300 else:
1298 raise NotImplementedError(_('File pointer is not seekable'))
1301 raise NotImplementedError(_('File pointer is not seekable'))
1299
1302
1300 def _tellfp(self):
1303 def _tellfp(self):
1301 """return the file offset, or None if file is not seekable
1304 """return the file offset, or None if file is not seekable
1302
1305
1303 This method is meant for internal usage by the bundle2 protocol only.
1306 This method is meant for internal usage by the bundle2 protocol only.
1304 They directly manipulate the low level stream including bundle2 level
1307 They directly manipulate the low level stream including bundle2 level
1305 instruction.
1308 instruction.
1306
1309
1307 Do not use it to implement higher-level logic or methods."""
1310 Do not use it to implement higher-level logic or methods."""
1308 if self._seekable:
1311 if self._seekable:
1309 try:
1312 try:
1310 return self._fp.tell()
1313 return self._fp.tell()
1311 except IOError as e:
1314 except IOError as e:
1312 if e.errno == errno.ESPIPE:
1315 if e.errno == errno.ESPIPE:
1313 self._seekable = False
1316 self._seekable = False
1314 else:
1317 else:
1315 raise
1318 raise
1316 return None
1319 return None
1317
1320
1318 # These are only the static capabilities.
1321 # These are only the static capabilities.
1319 # Check the 'getrepocaps' function for the rest.
1322 # Check the 'getrepocaps' function for the rest.
1320 capabilities = {'HG20': (),
1323 capabilities = {'HG20': (),
1321 'error': ('abort', 'unsupportedcontent', 'pushraced',
1324 'error': ('abort', 'unsupportedcontent', 'pushraced',
1322 'pushkey'),
1325 'pushkey'),
1323 'listkeys': (),
1326 'listkeys': (),
1324 'pushkey': (),
1327 'pushkey': (),
1325 'digests': tuple(sorted(util.DIGESTS.keys())),
1328 'digests': tuple(sorted(util.DIGESTS.keys())),
1326 'remote-changegroup': ('http', 'https'),
1329 'remote-changegroup': ('http', 'https'),
1327 'hgtagsfnodes': (),
1330 'hgtagsfnodes': (),
1328 }
1331 }
1329
1332
1330 def getrepocaps(repo, allowpushback=False):
1333 def getrepocaps(repo, allowpushback=False):
1331 """return the bundle2 capabilities for a given repo
1334 """return the bundle2 capabilities for a given repo
1332
1335
1333 Exists to allow extensions (like evolution) to mutate the capabilities.
1336 Exists to allow extensions (like evolution) to mutate the capabilities.
1334 """
1337 """
1335 caps = capabilities.copy()
1338 caps = capabilities.copy()
1336 caps['changegroup'] = tuple(sorted(
1339 caps['changegroup'] = tuple(sorted(
1337 changegroup.supportedincomingversions(repo)))
1340 changegroup.supportedincomingversions(repo)))
1338 if obsolete.isenabled(repo, obsolete.exchangeopt):
1341 if obsolete.isenabled(repo, obsolete.exchangeopt):
1339 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1342 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1340 caps['obsmarkers'] = supportedformat
1343 caps['obsmarkers'] = supportedformat
1341 if allowpushback:
1344 if allowpushback:
1342 caps['pushback'] = ()
1345 caps['pushback'] = ()
1343 cpmode = repo.ui.config('server', 'concurrent-push-mode', 'strict')
1346 cpmode = repo.ui.config('server', 'concurrent-push-mode', 'strict')
1344 if cpmode == 'check-related':
1347 if cpmode == 'check-related':
1345 caps['checkheads'] = ('related',)
1348 caps['checkheads'] = ('related',)
1346 return caps
1349 return caps
1347
1350
1348 def bundle2caps(remote):
1351 def bundle2caps(remote):
1349 """return the bundle capabilities of a peer as dict"""
1352 """return the bundle capabilities of a peer as dict"""
1350 raw = remote.capable('bundle2')
1353 raw = remote.capable('bundle2')
1351 if not raw and raw != '':
1354 if not raw and raw != '':
1352 return {}
1355 return {}
1353 capsblob = urlreq.unquote(remote.capable('bundle2'))
1356 capsblob = urlreq.unquote(remote.capable('bundle2'))
1354 return decodecaps(capsblob)
1357 return decodecaps(capsblob)
1355
1358
1356 def obsmarkersversion(caps):
1359 def obsmarkersversion(caps):
1357 """extract the list of supported obsmarkers versions from a bundle2caps dict
1360 """extract the list of supported obsmarkers versions from a bundle2caps dict
1358 """
1361 """
1359 obscaps = caps.get('obsmarkers', ())
1362 obscaps = caps.get('obsmarkers', ())
1360 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1363 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1361
1364
1362 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1365 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1363 vfs=None, compression=None, compopts=None):
1366 vfs=None, compression=None, compopts=None):
1364 if bundletype.startswith('HG10'):
1367 if bundletype.startswith('HG10'):
1365 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1368 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1366 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1369 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1367 compression=compression, compopts=compopts)
1370 compression=compression, compopts=compopts)
1368 elif not bundletype.startswith('HG20'):
1371 elif not bundletype.startswith('HG20'):
1369 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1372 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1370
1373
1371 caps = {}
1374 caps = {}
1372 if 'obsolescence' in opts:
1375 if 'obsolescence' in opts:
1373 caps['obsmarkers'] = ('V1',)
1376 caps['obsmarkers'] = ('V1',)
1374 bundle = bundle20(ui, caps)
1377 bundle = bundle20(ui, caps)
1375 bundle.setcompression(compression, compopts)
1378 bundle.setcompression(compression, compopts)
1376 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1379 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1377 chunkiter = bundle.getchunks()
1380 chunkiter = bundle.getchunks()
1378
1381
1379 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1382 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1380
1383
1381 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1384 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1382 # We should eventually reconcile this logic with the one behind
1385 # We should eventually reconcile this logic with the one behind
1383 # 'exchange.getbundle2partsgenerator'.
1386 # 'exchange.getbundle2partsgenerator'.
1384 #
1387 #
1385 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1388 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1386 # different right now. So we keep them separated for now for the sake of
1389 # different right now. So we keep them separated for now for the sake of
1387 # simplicity.
1390 # simplicity.
1388
1391
1389 # we always want a changegroup in such bundle
1392 # we always want a changegroup in such bundle
1390 cgversion = opts.get('cg.version')
1393 cgversion = opts.get('cg.version')
1391 if cgversion is None:
1394 if cgversion is None:
1392 cgversion = changegroup.safeversion(repo)
1395 cgversion = changegroup.safeversion(repo)
1393 cg = changegroup.getchangegroup(repo, source, outgoing,
1396 cg = changegroup.getchangegroup(repo, source, outgoing,
1394 version=cgversion)
1397 version=cgversion)
1395 part = bundler.newpart('changegroup', data=cg.getchunks())
1398 part = bundler.newpart('changegroup', data=cg.getchunks())
1396 part.addparam('version', cg.version)
1399 part.addparam('version', cg.version)
1397 if 'clcount' in cg.extras:
1400 if 'clcount' in cg.extras:
1398 part.addparam('nbchanges', str(cg.extras['clcount']),
1401 part.addparam('nbchanges', str(cg.extras['clcount']),
1399 mandatory=False)
1402 mandatory=False)
1400
1403
1401 addparttagsfnodescache(repo, bundler, outgoing)
1404 addparttagsfnodescache(repo, bundler, outgoing)
1402
1405
1403 if opts.get('obsolescence', False):
1406 if opts.get('obsolescence', False):
1404 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1407 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1405 buildobsmarkerspart(bundler, obsmarkers)
1408 buildobsmarkerspart(bundler, obsmarkers)
1406
1409
1407 if opts.get('phases', False):
1410 if opts.get('phases', False):
1408 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1411 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1409 phasedata = []
1412 phasedata = []
1410 for phase in phases.allphases:
1413 for phase in phases.allphases:
1411 for head in headsbyphase[phase]:
1414 for head in headsbyphase[phase]:
1412 phasedata.append(_pack(_fphasesentry, phase, head))
1415 phasedata.append(_pack(_fphasesentry, phase, head))
1413 bundler.newpart('phase-heads', data=''.join(phasedata))
1416 bundler.newpart('phase-heads', data=''.join(phasedata))
1414
1417
1415 def addparttagsfnodescache(repo, bundler, outgoing):
1418 def addparttagsfnodescache(repo, bundler, outgoing):
1416 # we include the tags fnode cache for the bundle changeset
1419 # we include the tags fnode cache for the bundle changeset
1417 # (as an optional parts)
1420 # (as an optional parts)
1418 cache = tags.hgtagsfnodescache(repo.unfiltered())
1421 cache = tags.hgtagsfnodescache(repo.unfiltered())
1419 chunks = []
1422 chunks = []
1420
1423
1421 # .hgtags fnodes are only relevant for head changesets. While we could
1424 # .hgtags fnodes are only relevant for head changesets. While we could
1422 # transfer values for all known nodes, there will likely be little to
1425 # transfer values for all known nodes, there will likely be little to
1423 # no benefit.
1426 # no benefit.
1424 #
1427 #
1425 # We don't bother using a generator to produce output data because
1428 # We don't bother using a generator to produce output data because
1426 # a) we only have 40 bytes per head and even esoteric numbers of heads
1429 # a) we only have 40 bytes per head and even esoteric numbers of heads
1427 # consume little memory (1M heads is 40MB) b) we don't want to send the
1430 # consume little memory (1M heads is 40MB) b) we don't want to send the
1428 # part if we don't have entries and knowing if we have entries requires
1431 # part if we don't have entries and knowing if we have entries requires
1429 # cache lookups.
1432 # cache lookups.
1430 for node in outgoing.missingheads:
1433 for node in outgoing.missingheads:
1431 # Don't compute missing, as this may slow down serving.
1434 # Don't compute missing, as this may slow down serving.
1432 fnode = cache.getfnode(node, computemissing=False)
1435 fnode = cache.getfnode(node, computemissing=False)
1433 if fnode is not None:
1436 if fnode is not None:
1434 chunks.extend([node, fnode])
1437 chunks.extend([node, fnode])
1435
1438
1436 if chunks:
1439 if chunks:
1437 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1440 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1438
1441
1439 def buildobsmarkerspart(bundler, markers):
1442 def buildobsmarkerspart(bundler, markers):
1440 """add an obsmarker part to the bundler with <markers>
1443 """add an obsmarker part to the bundler with <markers>
1441
1444
1442 No part is created if markers is empty.
1445 No part is created if markers is empty.
1443 Raises ValueError if the bundler doesn't support any known obsmarker format.
1446 Raises ValueError if the bundler doesn't support any known obsmarker format.
1444 """
1447 """
1445 if not markers:
1448 if not markers:
1446 return None
1449 return None
1447
1450
1448 remoteversions = obsmarkersversion(bundler.capabilities)
1451 remoteversions = obsmarkersversion(bundler.capabilities)
1449 version = obsolete.commonversion(remoteversions)
1452 version = obsolete.commonversion(remoteversions)
1450 if version is None:
1453 if version is None:
1451 raise ValueError('bundler does not support common obsmarker format')
1454 raise ValueError('bundler does not support common obsmarker format')
1452 stream = obsolete.encodemarkers(markers, True, version=version)
1455 stream = obsolete.encodemarkers(markers, True, version=version)
1453 return bundler.newpart('obsmarkers', data=stream)
1456 return bundler.newpart('obsmarkers', data=stream)
1454
1457
1455 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1458 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1456 compopts=None):
1459 compopts=None):
1457 """Write a bundle file and return its filename.
1460 """Write a bundle file and return its filename.
1458
1461
1459 Existing files will not be overwritten.
1462 Existing files will not be overwritten.
1460 If no filename is specified, a temporary file is created.
1463 If no filename is specified, a temporary file is created.
1461 bz2 compression can be turned off.
1464 bz2 compression can be turned off.
1462 The bundle file will be deleted in case of errors.
1465 The bundle file will be deleted in case of errors.
1463 """
1466 """
1464
1467
1465 if bundletype == "HG20":
1468 if bundletype == "HG20":
1466 bundle = bundle20(ui)
1469 bundle = bundle20(ui)
1467 bundle.setcompression(compression, compopts)
1470 bundle.setcompression(compression, compopts)
1468 part = bundle.newpart('changegroup', data=cg.getchunks())
1471 part = bundle.newpart('changegroup', data=cg.getchunks())
1469 part.addparam('version', cg.version)
1472 part.addparam('version', cg.version)
1470 if 'clcount' in cg.extras:
1473 if 'clcount' in cg.extras:
1471 part.addparam('nbchanges', str(cg.extras['clcount']),
1474 part.addparam('nbchanges', str(cg.extras['clcount']),
1472 mandatory=False)
1475 mandatory=False)
1473 chunkiter = bundle.getchunks()
1476 chunkiter = bundle.getchunks()
1474 else:
1477 else:
1475 # compression argument is only for the bundle2 case
1478 # compression argument is only for the bundle2 case
1476 assert compression is None
1479 assert compression is None
1477 if cg.version != '01':
1480 if cg.version != '01':
1478 raise error.Abort(_('old bundle types only supports v1 '
1481 raise error.Abort(_('old bundle types only supports v1 '
1479 'changegroups'))
1482 'changegroups'))
1480 header, comp = bundletypes[bundletype]
1483 header, comp = bundletypes[bundletype]
1481 if comp not in util.compengines.supportedbundletypes:
1484 if comp not in util.compengines.supportedbundletypes:
1482 raise error.Abort(_('unknown stream compression type: %s')
1485 raise error.Abort(_('unknown stream compression type: %s')
1483 % comp)
1486 % comp)
1484 compengine = util.compengines.forbundletype(comp)
1487 compengine = util.compengines.forbundletype(comp)
1485 def chunkiter():
1488 def chunkiter():
1486 yield header
1489 yield header
1487 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1490 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1488 yield chunk
1491 yield chunk
1489 chunkiter = chunkiter()
1492 chunkiter = chunkiter()
1490
1493
1491 # parse the changegroup data, otherwise we will block
1494 # parse the changegroup data, otherwise we will block
1492 # in case of sshrepo because we don't know the end of the stream
1495 # in case of sshrepo because we don't know the end of the stream
1493 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1496 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1494
1497
1495 def combinechangegroupresults(op):
1498 def combinechangegroupresults(op):
1496 """logic to combine 0 or more addchangegroup results into one"""
1499 """logic to combine 0 or more addchangegroup results into one"""
1497 results = [r.get('return', 0)
1500 results = [r.get('return', 0)
1498 for r in op.records['changegroup']]
1501 for r in op.records['changegroup']]
1499 changedheads = 0
1502 changedheads = 0
1500 result = 1
1503 result = 1
1501 for ret in results:
1504 for ret in results:
1502 # If any changegroup result is 0, return 0
1505 # If any changegroup result is 0, return 0
1503 if ret == 0:
1506 if ret == 0:
1504 result = 0
1507 result = 0
1505 break
1508 break
1506 if ret < -1:
1509 if ret < -1:
1507 changedheads += ret + 1
1510 changedheads += ret + 1
1508 elif ret > 1:
1511 elif ret > 1:
1509 changedheads += ret - 1
1512 changedheads += ret - 1
1510 if changedheads > 0:
1513 if changedheads > 0:
1511 result = 1 + changedheads
1514 result = 1 + changedheads
1512 elif changedheads < 0:
1515 elif changedheads < 0:
1513 result = -1 + changedheads
1516 result = -1 + changedheads
1514 return result
1517 return result
1515
1518
1516 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1519 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1517 def handlechangegroup(op, inpart):
1520 def handlechangegroup(op, inpart):
1518 """apply a changegroup part on the repo
1521 """apply a changegroup part on the repo
1519
1522
1520 This is a very early implementation that will massive rework before being
1523 This is a very early implementation that will massive rework before being
1521 inflicted to any end-user.
1524 inflicted to any end-user.
1522 """
1525 """
1523 tr = op.gettransaction()
1526 tr = op.gettransaction()
1524 unpackerversion = inpart.params.get('version', '01')
1527 unpackerversion = inpart.params.get('version', '01')
1525 # We should raise an appropriate exception here
1528 # We should raise an appropriate exception here
1526 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1529 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1527 # the source and url passed here are overwritten by the one contained in
1530 # the source and url passed here are overwritten by the one contained in
1528 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1531 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1529 nbchangesets = None
1532 nbchangesets = None
1530 if 'nbchanges' in inpart.params:
1533 if 'nbchanges' in inpart.params:
1531 nbchangesets = int(inpart.params.get('nbchanges'))
1534 nbchangesets = int(inpart.params.get('nbchanges'))
1532 if ('treemanifest' in inpart.params and
1535 if ('treemanifest' in inpart.params and
1533 'treemanifest' not in op.repo.requirements):
1536 'treemanifest' not in op.repo.requirements):
1534 if len(op.repo.changelog) != 0:
1537 if len(op.repo.changelog) != 0:
1535 raise error.Abort(_(
1538 raise error.Abort(_(
1536 "bundle contains tree manifests, but local repo is "
1539 "bundle contains tree manifests, but local repo is "
1537 "non-empty and does not use tree manifests"))
1540 "non-empty and does not use tree manifests"))
1538 op.repo.requirements.add('treemanifest')
1541 op.repo.requirements.add('treemanifest')
1539 op.repo._applyopenerreqs()
1542 op.repo._applyopenerreqs()
1540 op.repo._writerequirements()
1543 op.repo._writerequirements()
1541 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1544 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1542 expectedtotal=nbchangesets)
1545 expectedtotal=nbchangesets)
1543 if op.reply is not None:
1546 if op.reply is not None:
1544 # This is definitely not the final form of this
1547 # This is definitely not the final form of this
1545 # return. But one need to start somewhere.
1548 # return. But one need to start somewhere.
1546 part = op.reply.newpart('reply:changegroup', mandatory=False)
1549 part = op.reply.newpart('reply:changegroup', mandatory=False)
1547 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1550 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1548 part.addparam('return', '%i' % ret, mandatory=False)
1551 part.addparam('return', '%i' % ret, mandatory=False)
1549 assert not inpart.read()
1552 assert not inpart.read()
1550
1553
1551 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1554 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1552 ['digest:%s' % k for k in util.DIGESTS.keys()])
1555 ['digest:%s' % k for k in util.DIGESTS.keys()])
1553 @parthandler('remote-changegroup', _remotechangegroupparams)
1556 @parthandler('remote-changegroup', _remotechangegroupparams)
1554 def handleremotechangegroup(op, inpart):
1557 def handleremotechangegroup(op, inpart):
1555 """apply a bundle10 on the repo, given an url and validation information
1558 """apply a bundle10 on the repo, given an url and validation information
1556
1559
1557 All the information about the remote bundle to import are given as
1560 All the information about the remote bundle to import are given as
1558 parameters. The parameters include:
1561 parameters. The parameters include:
1559 - url: the url to the bundle10.
1562 - url: the url to the bundle10.
1560 - size: the bundle10 file size. It is used to validate what was
1563 - size: the bundle10 file size. It is used to validate what was
1561 retrieved by the client matches the server knowledge about the bundle.
1564 retrieved by the client matches the server knowledge about the bundle.
1562 - digests: a space separated list of the digest types provided as
1565 - digests: a space separated list of the digest types provided as
1563 parameters.
1566 parameters.
1564 - digest:<digest-type>: the hexadecimal representation of the digest with
1567 - digest:<digest-type>: the hexadecimal representation of the digest with
1565 that name. Like the size, it is used to validate what was retrieved by
1568 that name. Like the size, it is used to validate what was retrieved by
1566 the client matches what the server knows about the bundle.
1569 the client matches what the server knows about the bundle.
1567
1570
1568 When multiple digest types are given, all of them are checked.
1571 When multiple digest types are given, all of them are checked.
1569 """
1572 """
1570 try:
1573 try:
1571 raw_url = inpart.params['url']
1574 raw_url = inpart.params['url']
1572 except KeyError:
1575 except KeyError:
1573 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1576 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1574 parsed_url = util.url(raw_url)
1577 parsed_url = util.url(raw_url)
1575 if parsed_url.scheme not in capabilities['remote-changegroup']:
1578 if parsed_url.scheme not in capabilities['remote-changegroup']:
1576 raise error.Abort(_('remote-changegroup does not support %s urls') %
1579 raise error.Abort(_('remote-changegroup does not support %s urls') %
1577 parsed_url.scheme)
1580 parsed_url.scheme)
1578
1581
1579 try:
1582 try:
1580 size = int(inpart.params['size'])
1583 size = int(inpart.params['size'])
1581 except ValueError:
1584 except ValueError:
1582 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1585 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1583 % 'size')
1586 % 'size')
1584 except KeyError:
1587 except KeyError:
1585 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1588 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1586
1589
1587 digests = {}
1590 digests = {}
1588 for typ in inpart.params.get('digests', '').split():
1591 for typ in inpart.params.get('digests', '').split():
1589 param = 'digest:%s' % typ
1592 param = 'digest:%s' % typ
1590 try:
1593 try:
1591 value = inpart.params[param]
1594 value = inpart.params[param]
1592 except KeyError:
1595 except KeyError:
1593 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1596 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1594 param)
1597 param)
1595 digests[typ] = value
1598 digests[typ] = value
1596
1599
1597 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1600 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1598
1601
1599 tr = op.gettransaction()
1602 tr = op.gettransaction()
1600 from . import exchange
1603 from . import exchange
1601 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1604 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1602 if not isinstance(cg, changegroup.cg1unpacker):
1605 if not isinstance(cg, changegroup.cg1unpacker):
1603 raise error.Abort(_('%s: not a bundle version 1.0') %
1606 raise error.Abort(_('%s: not a bundle version 1.0') %
1604 util.hidepassword(raw_url))
1607 util.hidepassword(raw_url))
1605 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1608 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1606 if op.reply is not None:
1609 if op.reply is not None:
1607 # This is definitely not the final form of this
1610 # This is definitely not the final form of this
1608 # return. But one need to start somewhere.
1611 # return. But one need to start somewhere.
1609 part = op.reply.newpart('reply:changegroup')
1612 part = op.reply.newpart('reply:changegroup')
1610 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1613 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1611 part.addparam('return', '%i' % ret, mandatory=False)
1614 part.addparam('return', '%i' % ret, mandatory=False)
1612 try:
1615 try:
1613 real_part.validate()
1616 real_part.validate()
1614 except error.Abort as e:
1617 except error.Abort as e:
1615 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1618 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1616 (util.hidepassword(raw_url), str(e)))
1619 (util.hidepassword(raw_url), str(e)))
1617 assert not inpart.read()
1620 assert not inpart.read()
1618
1621
1619 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1622 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1620 def handlereplychangegroup(op, inpart):
1623 def handlereplychangegroup(op, inpart):
1621 ret = int(inpart.params['return'])
1624 ret = int(inpart.params['return'])
1622 replyto = int(inpart.params['in-reply-to'])
1625 replyto = int(inpart.params['in-reply-to'])
1623 op.records.add('changegroup', {'return': ret}, replyto)
1626 op.records.add('changegroup', {'return': ret}, replyto)
1624
1627
1625 @parthandler('check:heads')
1628 @parthandler('check:heads')
1626 def handlecheckheads(op, inpart):
1629 def handlecheckheads(op, inpart):
1627 """check that head of the repo did not change
1630 """check that head of the repo did not change
1628
1631
1629 This is used to detect a push race when using unbundle.
1632 This is used to detect a push race when using unbundle.
1630 This replaces the "heads" argument of unbundle."""
1633 This replaces the "heads" argument of unbundle."""
1631 h = inpart.read(20)
1634 h = inpart.read(20)
1632 heads = []
1635 heads = []
1633 while len(h) == 20:
1636 while len(h) == 20:
1634 heads.append(h)
1637 heads.append(h)
1635 h = inpart.read(20)
1638 h = inpart.read(20)
1636 assert not h
1639 assert not h
1637 # Trigger a transaction so that we are guaranteed to have the lock now.
1640 # Trigger a transaction so that we are guaranteed to have the lock now.
1638 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1641 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1639 op.gettransaction()
1642 op.gettransaction()
1640 if sorted(heads) != sorted(op.repo.heads()):
1643 if sorted(heads) != sorted(op.repo.heads()):
1641 raise error.PushRaced('repository changed while pushing - '
1644 raise error.PushRaced('repository changed while pushing - '
1642 'please try again')
1645 'please try again')
1643
1646
1644 @parthandler('check:updated-heads')
1647 @parthandler('check:updated-heads')
1645 def handlecheckupdatedheads(op, inpart):
1648 def handlecheckupdatedheads(op, inpart):
1646 """check for race on the heads touched by a push
1649 """check for race on the heads touched by a push
1647
1650
1648 This is similar to 'check:heads' but focus on the heads actually updated
1651 This is similar to 'check:heads' but focus on the heads actually updated
1649 during the push. If other activities happen on unrelated heads, it is
1652 during the push. If other activities happen on unrelated heads, it is
1650 ignored.
1653 ignored.
1651
1654
1652 This allow server with high traffic to avoid push contention as long as
1655 This allow server with high traffic to avoid push contention as long as
1653 unrelated parts of the graph are involved."""
1656 unrelated parts of the graph are involved."""
1654 h = inpart.read(20)
1657 h = inpart.read(20)
1655 heads = []
1658 heads = []
1656 while len(h) == 20:
1659 while len(h) == 20:
1657 heads.append(h)
1660 heads.append(h)
1658 h = inpart.read(20)
1661 h = inpart.read(20)
1659 assert not h
1662 assert not h
1660 # trigger a transaction so that we are guaranteed to have the lock now.
1663 # trigger a transaction so that we are guaranteed to have the lock now.
1661 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1664 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1662 op.gettransaction()
1665 op.gettransaction()
1663
1666
1664 currentheads = set()
1667 currentheads = set()
1665 for ls in op.repo.branchmap().itervalues():
1668 for ls in op.repo.branchmap().itervalues():
1666 currentheads.update(ls)
1669 currentheads.update(ls)
1667
1670
1668 for h in heads:
1671 for h in heads:
1669 if h not in currentheads:
1672 if h not in currentheads:
1670 raise error.PushRaced('repository changed while pushing - '
1673 raise error.PushRaced('repository changed while pushing - '
1671 'please try again')
1674 'please try again')
1672
1675
1673 @parthandler('output')
1676 @parthandler('output')
1674 def handleoutput(op, inpart):
1677 def handleoutput(op, inpart):
1675 """forward output captured on the server to the client"""
1678 """forward output captured on the server to the client"""
1676 for line in inpart.read().splitlines():
1679 for line in inpart.read().splitlines():
1677 op.ui.status(_('remote: %s\n') % line)
1680 op.ui.status(_('remote: %s\n') % line)
1678
1681
1679 @parthandler('replycaps')
1682 @parthandler('replycaps')
1680 def handlereplycaps(op, inpart):
1683 def handlereplycaps(op, inpart):
1681 """Notify that a reply bundle should be created
1684 """Notify that a reply bundle should be created
1682
1685
1683 The payload contains the capabilities information for the reply"""
1686 The payload contains the capabilities information for the reply"""
1684 caps = decodecaps(inpart.read())
1687 caps = decodecaps(inpart.read())
1685 if op.reply is None:
1688 if op.reply is None:
1686 op.reply = bundle20(op.ui, caps)
1689 op.reply = bundle20(op.ui, caps)
1687
1690
1688 class AbortFromPart(error.Abort):
1691 class AbortFromPart(error.Abort):
1689 """Sub-class of Abort that denotes an error from a bundle2 part."""
1692 """Sub-class of Abort that denotes an error from a bundle2 part."""
1690
1693
1691 @parthandler('error:abort', ('message', 'hint'))
1694 @parthandler('error:abort', ('message', 'hint'))
1692 def handleerrorabort(op, inpart):
1695 def handleerrorabort(op, inpart):
1693 """Used to transmit abort error over the wire"""
1696 """Used to transmit abort error over the wire"""
1694 raise AbortFromPart(inpart.params['message'],
1697 raise AbortFromPart(inpart.params['message'],
1695 hint=inpart.params.get('hint'))
1698 hint=inpart.params.get('hint'))
1696
1699
1697 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1700 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1698 'in-reply-to'))
1701 'in-reply-to'))
1699 def handleerrorpushkey(op, inpart):
1702 def handleerrorpushkey(op, inpart):
1700 """Used to transmit failure of a mandatory pushkey over the wire"""
1703 """Used to transmit failure of a mandatory pushkey over the wire"""
1701 kwargs = {}
1704 kwargs = {}
1702 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1705 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1703 value = inpart.params.get(name)
1706 value = inpart.params.get(name)
1704 if value is not None:
1707 if value is not None:
1705 kwargs[name] = value
1708 kwargs[name] = value
1706 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1709 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1707
1710
1708 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1711 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1709 def handleerrorunsupportedcontent(op, inpart):
1712 def handleerrorunsupportedcontent(op, inpart):
1710 """Used to transmit unknown content error over the wire"""
1713 """Used to transmit unknown content error over the wire"""
1711 kwargs = {}
1714 kwargs = {}
1712 parttype = inpart.params.get('parttype')
1715 parttype = inpart.params.get('parttype')
1713 if parttype is not None:
1716 if parttype is not None:
1714 kwargs['parttype'] = parttype
1717 kwargs['parttype'] = parttype
1715 params = inpart.params.get('params')
1718 params = inpart.params.get('params')
1716 if params is not None:
1719 if params is not None:
1717 kwargs['params'] = params.split('\0')
1720 kwargs['params'] = params.split('\0')
1718
1721
1719 raise error.BundleUnknownFeatureError(**kwargs)
1722 raise error.BundleUnknownFeatureError(**kwargs)
1720
1723
1721 @parthandler('error:pushraced', ('message',))
1724 @parthandler('error:pushraced', ('message',))
1722 def handleerrorpushraced(op, inpart):
1725 def handleerrorpushraced(op, inpart):
1723 """Used to transmit push race error over the wire"""
1726 """Used to transmit push race error over the wire"""
1724 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1727 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1725
1728
1726 @parthandler('listkeys', ('namespace',))
1729 @parthandler('listkeys', ('namespace',))
1727 def handlelistkeys(op, inpart):
1730 def handlelistkeys(op, inpart):
1728 """retrieve pushkey namespace content stored in a bundle2"""
1731 """retrieve pushkey namespace content stored in a bundle2"""
1729 namespace = inpart.params['namespace']
1732 namespace = inpart.params['namespace']
1730 r = pushkey.decodekeys(inpart.read())
1733 r = pushkey.decodekeys(inpart.read())
1731 op.records.add('listkeys', (namespace, r))
1734 op.records.add('listkeys', (namespace, r))
1732
1735
1733 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1736 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1734 def handlepushkey(op, inpart):
1737 def handlepushkey(op, inpart):
1735 """process a pushkey request"""
1738 """process a pushkey request"""
1736 dec = pushkey.decode
1739 dec = pushkey.decode
1737 namespace = dec(inpart.params['namespace'])
1740 namespace = dec(inpart.params['namespace'])
1738 key = dec(inpart.params['key'])
1741 key = dec(inpart.params['key'])
1739 old = dec(inpart.params['old'])
1742 old = dec(inpart.params['old'])
1740 new = dec(inpart.params['new'])
1743 new = dec(inpart.params['new'])
1741 # Grab the transaction to ensure that we have the lock before performing the
1744 # Grab the transaction to ensure that we have the lock before performing the
1742 # pushkey.
1745 # pushkey.
1743 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1746 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1744 op.gettransaction()
1747 op.gettransaction()
1745 ret = op.repo.pushkey(namespace, key, old, new)
1748 ret = op.repo.pushkey(namespace, key, old, new)
1746 record = {'namespace': namespace,
1749 record = {'namespace': namespace,
1747 'key': key,
1750 'key': key,
1748 'old': old,
1751 'old': old,
1749 'new': new}
1752 'new': new}
1750 op.records.add('pushkey', record)
1753 op.records.add('pushkey', record)
1751 if op.reply is not None:
1754 if op.reply is not None:
1752 rpart = op.reply.newpart('reply:pushkey')
1755 rpart = op.reply.newpart('reply:pushkey')
1753 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1756 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1754 rpart.addparam('return', '%i' % ret, mandatory=False)
1757 rpart.addparam('return', '%i' % ret, mandatory=False)
1755 if inpart.mandatory and not ret:
1758 if inpart.mandatory and not ret:
1756 kwargs = {}
1759 kwargs = {}
1757 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1760 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1758 if key in inpart.params:
1761 if key in inpart.params:
1759 kwargs[key] = inpart.params[key]
1762 kwargs[key] = inpart.params[key]
1760 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1763 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1761
1764
1762 def _readphaseheads(inpart):
1765 def _readphaseheads(inpart):
1763 headsbyphase = [[] for i in phases.allphases]
1766 headsbyphase = [[] for i in phases.allphases]
1764 entrysize = struct.calcsize(_fphasesentry)
1767 entrysize = struct.calcsize(_fphasesentry)
1765 while True:
1768 while True:
1766 entry = inpart.read(entrysize)
1769 entry = inpart.read(entrysize)
1767 if len(entry) < entrysize:
1770 if len(entry) < entrysize:
1768 if entry:
1771 if entry:
1769 raise error.Abort(_('bad phase-heads bundle part'))
1772 raise error.Abort(_('bad phase-heads bundle part'))
1770 break
1773 break
1771 phase, node = struct.unpack(_fphasesentry, entry)
1774 phase, node = struct.unpack(_fphasesentry, entry)
1772 headsbyphase[phase].append(node)
1775 headsbyphase[phase].append(node)
1773 return headsbyphase
1776 return headsbyphase
1774
1777
1775 @parthandler('phase-heads')
1778 @parthandler('phase-heads')
1776 def handlephases(op, inpart):
1779 def handlephases(op, inpart):
1777 """apply phases from bundle part to repo"""
1780 """apply phases from bundle part to repo"""
1778 headsbyphase = _readphaseheads(inpart)
1781 headsbyphase = _readphaseheads(inpart)
1779 addednodes = []
1782 addednodes = []
1780 for entry in op.records['changegroup']:
1783 for entry in op.records['changegroup']:
1781 addednodes.extend(entry['addednodes'])
1784 addednodes.extend(entry['addednodes'])
1782 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase,
1785 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase,
1783 addednodes)
1786 addednodes)
1784
1787
1785 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1788 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1786 def handlepushkeyreply(op, inpart):
1789 def handlepushkeyreply(op, inpart):
1787 """retrieve the result of a pushkey request"""
1790 """retrieve the result of a pushkey request"""
1788 ret = int(inpart.params['return'])
1791 ret = int(inpart.params['return'])
1789 partid = int(inpart.params['in-reply-to'])
1792 partid = int(inpart.params['in-reply-to'])
1790 op.records.add('pushkey', {'return': ret}, partid)
1793 op.records.add('pushkey', {'return': ret}, partid)
1791
1794
1792 @parthandler('obsmarkers')
1795 @parthandler('obsmarkers')
1793 def handleobsmarker(op, inpart):
1796 def handleobsmarker(op, inpart):
1794 """add a stream of obsmarkers to the repo"""
1797 """add a stream of obsmarkers to the repo"""
1795 tr = op.gettransaction()
1798 tr = op.gettransaction()
1796 markerdata = inpart.read()
1799 markerdata = inpart.read()
1797 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1800 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1798 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1801 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1799 % len(markerdata))
1802 % len(markerdata))
1800 # The mergemarkers call will crash if marker creation is not enabled.
1803 # The mergemarkers call will crash if marker creation is not enabled.
1801 # we want to avoid this if the part is advisory.
1804 # we want to avoid this if the part is advisory.
1802 if not inpart.mandatory and op.repo.obsstore.readonly:
1805 if not inpart.mandatory and op.repo.obsstore.readonly:
1803 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1806 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1804 return
1807 return
1805 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1808 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1806 op.repo.invalidatevolatilesets()
1809 op.repo.invalidatevolatilesets()
1807 if new:
1810 if new:
1808 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1811 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1809 op.records.add('obsmarkers', {'new': new})
1812 op.records.add('obsmarkers', {'new': new})
1810 if op.reply is not None:
1813 if op.reply is not None:
1811 rpart = op.reply.newpart('reply:obsmarkers')
1814 rpart = op.reply.newpart('reply:obsmarkers')
1812 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1815 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1813 rpart.addparam('new', '%i' % new, mandatory=False)
1816 rpart.addparam('new', '%i' % new, mandatory=False)
1814
1817
1815
1818
1816 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1819 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1817 def handleobsmarkerreply(op, inpart):
1820 def handleobsmarkerreply(op, inpart):
1818 """retrieve the result of a pushkey request"""
1821 """retrieve the result of a pushkey request"""
1819 ret = int(inpart.params['new'])
1822 ret = int(inpart.params['new'])
1820 partid = int(inpart.params['in-reply-to'])
1823 partid = int(inpart.params['in-reply-to'])
1821 op.records.add('obsmarkers', {'new': ret}, partid)
1824 op.records.add('obsmarkers', {'new': ret}, partid)
1822
1825
1823 @parthandler('hgtagsfnodes')
1826 @parthandler('hgtagsfnodes')
1824 def handlehgtagsfnodes(op, inpart):
1827 def handlehgtagsfnodes(op, inpart):
1825 """Applies .hgtags fnodes cache entries to the local repo.
1828 """Applies .hgtags fnodes cache entries to the local repo.
1826
1829
1827 Payload is pairs of 20 byte changeset nodes and filenodes.
1830 Payload is pairs of 20 byte changeset nodes and filenodes.
1828 """
1831 """
1829 # Grab the transaction so we ensure that we have the lock at this point.
1832 # Grab the transaction so we ensure that we have the lock at this point.
1830 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1833 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1831 op.gettransaction()
1834 op.gettransaction()
1832 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1835 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1833
1836
1834 count = 0
1837 count = 0
1835 while True:
1838 while True:
1836 node = inpart.read(20)
1839 node = inpart.read(20)
1837 fnode = inpart.read(20)
1840 fnode = inpart.read(20)
1838 if len(node) < 20 or len(fnode) < 20:
1841 if len(node) < 20 or len(fnode) < 20:
1839 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1842 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1840 break
1843 break
1841 cache.setfnode(node, fnode)
1844 cache.setfnode(node, fnode)
1842 count += 1
1845 count += 1
1843
1846
1844 cache.write()
1847 cache.write()
1845 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1848 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now