##// END OF EJS Templates
bundle2: add debug info about the number of stream params...
Siddharth Agarwal -
r33123:126eae7d default
parent child Browse files
Show More
@@ -1,1848 +1,1848 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 phases,
161 phases,
162 pushkey,
162 pushkey,
163 pycompat,
163 pycompat,
164 tags,
164 tags,
165 url,
165 url,
166 util,
166 util,
167 )
167 )
168
168
169 urlerr = util.urlerr
169 urlerr = util.urlerr
170 urlreq = util.urlreq
170 urlreq = util.urlreq
171
171
172 _pack = struct.pack
172 _pack = struct.pack
173 _unpack = struct.unpack
173 _unpack = struct.unpack
174
174
175 _fstreamparamsize = '>i'
175 _fstreamparamsize = '>i'
176 _fpartheadersize = '>i'
176 _fpartheadersize = '>i'
177 _fparttypesize = '>B'
177 _fparttypesize = '>B'
178 _fpartid = '>I'
178 _fpartid = '>I'
179 _fpayloadsize = '>i'
179 _fpayloadsize = '>i'
180 _fpartparamcount = '>BB'
180 _fpartparamcount = '>BB'
181
181
182 _fphasesentry = '>i20s'
182 _fphasesentry = '>i20s'
183
183
184 preferedchunksize = 4096
184 preferedchunksize = 4096
185
185
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187
187
188 def outdebug(ui, message):
188 def outdebug(ui, message):
189 """debug regarding output stream (bundling)"""
189 """debug regarding output stream (bundling)"""
190 if ui.configbool('devel', 'bundle2.debug', False):
190 if ui.configbool('devel', 'bundle2.debug', False):
191 ui.debug('bundle2-output: %s\n' % message)
191 ui.debug('bundle2-output: %s\n' % message)
192
192
193 def indebug(ui, message):
193 def indebug(ui, message):
194 """debug on input stream (unbundling)"""
194 """debug on input stream (unbundling)"""
195 if ui.configbool('devel', 'bundle2.debug', False):
195 if ui.configbool('devel', 'bundle2.debug', False):
196 ui.debug('bundle2-input: %s\n' % message)
196 ui.debug('bundle2-input: %s\n' % message)
197
197
198 def validateparttype(parttype):
198 def validateparttype(parttype):
199 """raise ValueError if a parttype contains invalid character"""
199 """raise ValueError if a parttype contains invalid character"""
200 if _parttypeforbidden.search(parttype):
200 if _parttypeforbidden.search(parttype):
201 raise ValueError(parttype)
201 raise ValueError(parttype)
202
202
203 def _makefpartparamsizes(nbparams):
203 def _makefpartparamsizes(nbparams):
204 """return a struct format to read part parameter sizes
204 """return a struct format to read part parameter sizes
205
205
206 The number parameters is variable so we need to build that format
206 The number parameters is variable so we need to build that format
207 dynamically.
207 dynamically.
208 """
208 """
209 return '>'+('BB'*nbparams)
209 return '>'+('BB'*nbparams)
210
210
211 parthandlermapping = {}
211 parthandlermapping = {}
212
212
213 def parthandler(parttype, params=()):
213 def parthandler(parttype, params=()):
214 """decorator that register a function as a bundle2 part handler
214 """decorator that register a function as a bundle2 part handler
215
215
216 eg::
216 eg::
217
217
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 def myparttypehandler(...):
219 def myparttypehandler(...):
220 '''process a part of type "my part".'''
220 '''process a part of type "my part".'''
221 ...
221 ...
222 """
222 """
223 validateparttype(parttype)
223 validateparttype(parttype)
224 def _decorator(func):
224 def _decorator(func):
225 lparttype = parttype.lower() # enforce lower case matching.
225 lparttype = parttype.lower() # enforce lower case matching.
226 assert lparttype not in parthandlermapping
226 assert lparttype not in parthandlermapping
227 parthandlermapping[lparttype] = func
227 parthandlermapping[lparttype] = func
228 func.params = frozenset(params)
228 func.params = frozenset(params)
229 return func
229 return func
230 return _decorator
230 return _decorator
231
231
232 class unbundlerecords(object):
232 class unbundlerecords(object):
233 """keep record of what happens during and unbundle
233 """keep record of what happens during and unbundle
234
234
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 category of record and obj is an arbitrary object.
236 category of record and obj is an arbitrary object.
237
237
238 `records['cat']` will return all entries of this category 'cat'.
238 `records['cat']` will return all entries of this category 'cat'.
239
239
240 Iterating on the object itself will yield `('category', obj)` tuples
240 Iterating on the object itself will yield `('category', obj)` tuples
241 for all entries.
241 for all entries.
242
242
243 All iterations happens in chronological order.
243 All iterations happens in chronological order.
244 """
244 """
245
245
246 def __init__(self):
246 def __init__(self):
247 self._categories = {}
247 self._categories = {}
248 self._sequences = []
248 self._sequences = []
249 self._replies = {}
249 self._replies = {}
250
250
251 def add(self, category, entry, inreplyto=None):
251 def add(self, category, entry, inreplyto=None):
252 """add a new record of a given category.
252 """add a new record of a given category.
253
253
254 The entry can then be retrieved in the list returned by
254 The entry can then be retrieved in the list returned by
255 self['category']."""
255 self['category']."""
256 self._categories.setdefault(category, []).append(entry)
256 self._categories.setdefault(category, []).append(entry)
257 self._sequences.append((category, entry))
257 self._sequences.append((category, entry))
258 if inreplyto is not None:
258 if inreplyto is not None:
259 self.getreplies(inreplyto).add(category, entry)
259 self.getreplies(inreplyto).add(category, entry)
260
260
261 def getreplies(self, partid):
261 def getreplies(self, partid):
262 """get the records that are replies to a specific part"""
262 """get the records that are replies to a specific part"""
263 return self._replies.setdefault(partid, unbundlerecords())
263 return self._replies.setdefault(partid, unbundlerecords())
264
264
265 def __getitem__(self, cat):
265 def __getitem__(self, cat):
266 return tuple(self._categories.get(cat, ()))
266 return tuple(self._categories.get(cat, ()))
267
267
268 def __iter__(self):
268 def __iter__(self):
269 return iter(self._sequences)
269 return iter(self._sequences)
270
270
271 def __len__(self):
271 def __len__(self):
272 return len(self._sequences)
272 return len(self._sequences)
273
273
274 def __nonzero__(self):
274 def __nonzero__(self):
275 return bool(self._sequences)
275 return bool(self._sequences)
276
276
277 __bool__ = __nonzero__
277 __bool__ = __nonzero__
278
278
279 class bundleoperation(object):
279 class bundleoperation(object):
280 """an object that represents a single bundling process
280 """an object that represents a single bundling process
281
281
282 Its purpose is to carry unbundle-related objects and states.
282 Its purpose is to carry unbundle-related objects and states.
283
283
284 A new object should be created at the beginning of each bundle processing.
284 A new object should be created at the beginning of each bundle processing.
285 The object is to be returned by the processing function.
285 The object is to be returned by the processing function.
286
286
287 The object has very little content now it will ultimately contain:
287 The object has very little content now it will ultimately contain:
288 * an access to the repo the bundle is applied to,
288 * an access to the repo the bundle is applied to,
289 * a ui object,
289 * a ui object,
290 * a way to retrieve a transaction to add changes to the repo,
290 * a way to retrieve a transaction to add changes to the repo,
291 * a way to record the result of processing each part,
291 * a way to record the result of processing each part,
292 * a way to construct a bundle response when applicable.
292 * a way to construct a bundle response when applicable.
293 """
293 """
294
294
295 def __init__(self, repo, transactiongetter, captureoutput=True):
295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 self.repo = repo
296 self.repo = repo
297 self.ui = repo.ui
297 self.ui = repo.ui
298 self.records = unbundlerecords()
298 self.records = unbundlerecords()
299 self.gettransaction = transactiongetter
299 self.gettransaction = transactiongetter
300 self.reply = None
300 self.reply = None
301 self.captureoutput = captureoutput
301 self.captureoutput = captureoutput
302
302
303 class TransactionUnavailable(RuntimeError):
303 class TransactionUnavailable(RuntimeError):
304 pass
304 pass
305
305
306 def _notransaction():
306 def _notransaction():
307 """default method to get a transaction while processing a bundle
307 """default method to get a transaction while processing a bundle
308
308
309 Raise an exception to highlight the fact that no transaction was expected
309 Raise an exception to highlight the fact that no transaction was expected
310 to be created"""
310 to be created"""
311 raise TransactionUnavailable()
311 raise TransactionUnavailable()
312
312
313 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
313 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
314 # transform me into unbundler.apply() as soon as the freeze is lifted
314 # transform me into unbundler.apply() as soon as the freeze is lifted
315 if isinstance(unbundler, unbundle20):
315 if isinstance(unbundler, unbundle20):
316 tr.hookargs['bundle2'] = '1'
316 tr.hookargs['bundle2'] = '1'
317 if source is not None and 'source' not in tr.hookargs:
317 if source is not None and 'source' not in tr.hookargs:
318 tr.hookargs['source'] = source
318 tr.hookargs['source'] = source
319 if url is not None and 'url' not in tr.hookargs:
319 if url is not None and 'url' not in tr.hookargs:
320 tr.hookargs['url'] = url
320 tr.hookargs['url'] = url
321 return processbundle(repo, unbundler, lambda: tr)
321 return processbundle(repo, unbundler, lambda: tr)
322 else:
322 else:
323 # the transactiongetter won't be used, but we might as well set it
323 # the transactiongetter won't be used, but we might as well set it
324 op = bundleoperation(repo, lambda: tr)
324 op = bundleoperation(repo, lambda: tr)
325 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
325 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
326 return op
326 return op
327
327
328 def processbundle(repo, unbundler, transactiongetter=None, op=None):
328 def processbundle(repo, unbundler, transactiongetter=None, op=None):
329 """This function process a bundle, apply effect to/from a repo
329 """This function process a bundle, apply effect to/from a repo
330
330
331 It iterates over each part then searches for and uses the proper handling
331 It iterates over each part then searches for and uses the proper handling
332 code to process the part. Parts are processed in order.
332 code to process the part. Parts are processed in order.
333
333
334 Unknown Mandatory part will abort the process.
334 Unknown Mandatory part will abort the process.
335
335
336 It is temporarily possible to provide a prebuilt bundleoperation to the
336 It is temporarily possible to provide a prebuilt bundleoperation to the
337 function. This is used to ensure output is properly propagated in case of
337 function. This is used to ensure output is properly propagated in case of
338 an error during the unbundling. This output capturing part will likely be
338 an error during the unbundling. This output capturing part will likely be
339 reworked and this ability will probably go away in the process.
339 reworked and this ability will probably go away in the process.
340 """
340 """
341 if op is None:
341 if op is None:
342 if transactiongetter is None:
342 if transactiongetter is None:
343 transactiongetter = _notransaction
343 transactiongetter = _notransaction
344 op = bundleoperation(repo, transactiongetter)
344 op = bundleoperation(repo, transactiongetter)
345 # todo:
345 # todo:
346 # - replace this is a init function soon.
346 # - replace this is a init function soon.
347 # - exception catching
347 # - exception catching
348 unbundler.params
348 unbundler.params
349 if repo.ui.debugflag:
349 if repo.ui.debugflag:
350 msg = ['bundle2-input-bundle:']
350 msg = ['bundle2-input-bundle:']
351 if unbundler.params:
351 if unbundler.params:
352 msg.append(' %i params')
352 msg.append(' %i params' % len(unbundler.params))
353 if op.gettransaction is None or op.gettransaction is _notransaction:
353 if op.gettransaction is None or op.gettransaction is _notransaction:
354 msg.append(' no-transaction')
354 msg.append(' no-transaction')
355 else:
355 else:
356 msg.append(' with-transaction')
356 msg.append(' with-transaction')
357 msg.append('\n')
357 msg.append('\n')
358 repo.ui.debug(''.join(msg))
358 repo.ui.debug(''.join(msg))
359 iterparts = enumerate(unbundler.iterparts())
359 iterparts = enumerate(unbundler.iterparts())
360 part = None
360 part = None
361 nbpart = 0
361 nbpart = 0
362 try:
362 try:
363 for nbpart, part in iterparts:
363 for nbpart, part in iterparts:
364 _processpart(op, part)
364 _processpart(op, part)
365 except Exception as exc:
365 except Exception as exc:
366 # Any exceptions seeking to the end of the bundle at this point are
366 # Any exceptions seeking to the end of the bundle at this point are
367 # almost certainly related to the underlying stream being bad.
367 # almost certainly related to the underlying stream being bad.
368 # And, chances are that the exception we're handling is related to
368 # And, chances are that the exception we're handling is related to
369 # getting in that bad state. So, we swallow the seeking error and
369 # getting in that bad state. So, we swallow the seeking error and
370 # re-raise the original error.
370 # re-raise the original error.
371 seekerror = False
371 seekerror = False
372 try:
372 try:
373 for nbpart, part in iterparts:
373 for nbpart, part in iterparts:
374 # consume the bundle content
374 # consume the bundle content
375 part.seek(0, 2)
375 part.seek(0, 2)
376 except Exception:
376 except Exception:
377 seekerror = True
377 seekerror = True
378
378
379 # Small hack to let caller code distinguish exceptions from bundle2
379 # Small hack to let caller code distinguish exceptions from bundle2
380 # processing from processing the old format. This is mostly
380 # processing from processing the old format. This is mostly
381 # needed to handle different return codes to unbundle according to the
381 # needed to handle different return codes to unbundle according to the
382 # type of bundle. We should probably clean up or drop this return code
382 # type of bundle. We should probably clean up or drop this return code
383 # craziness in a future version.
383 # craziness in a future version.
384 exc.duringunbundle2 = True
384 exc.duringunbundle2 = True
385 salvaged = []
385 salvaged = []
386 replycaps = None
386 replycaps = None
387 if op.reply is not None:
387 if op.reply is not None:
388 salvaged = op.reply.salvageoutput()
388 salvaged = op.reply.salvageoutput()
389 replycaps = op.reply.capabilities
389 replycaps = op.reply.capabilities
390 exc._replycaps = replycaps
390 exc._replycaps = replycaps
391 exc._bundle2salvagedoutput = salvaged
391 exc._bundle2salvagedoutput = salvaged
392
392
393 # Re-raising from a variable loses the original stack. So only use
393 # Re-raising from a variable loses the original stack. So only use
394 # that form if we need to.
394 # that form if we need to.
395 if seekerror:
395 if seekerror:
396 raise exc
396 raise exc
397 else:
397 else:
398 raise
398 raise
399 finally:
399 finally:
400 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
400 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
401
401
402 return op
402 return op
403
403
404 def _processchangegroup(op, cg, tr, source, url, **kwargs):
404 def _processchangegroup(op, cg, tr, source, url, **kwargs):
405 ret, addednodes = cg.apply(op.repo, tr, source, url, **kwargs)
405 ret, addednodes = cg.apply(op.repo, tr, source, url, **kwargs)
406 op.records.add('changegroup', {
406 op.records.add('changegroup', {
407 'return': ret,
407 'return': ret,
408 'addednodes': addednodes,
408 'addednodes': addednodes,
409 })
409 })
410 return ret
410 return ret
411
411
412 def _processpart(op, part):
412 def _processpart(op, part):
413 """process a single part from a bundle
413 """process a single part from a bundle
414
414
415 The part is guaranteed to have been fully consumed when the function exits
415 The part is guaranteed to have been fully consumed when the function exits
416 (even if an exception is raised)."""
416 (even if an exception is raised)."""
417 status = 'unknown' # used by debug output
417 status = 'unknown' # used by debug output
418 hardabort = False
418 hardabort = False
419 try:
419 try:
420 try:
420 try:
421 handler = parthandlermapping.get(part.type)
421 handler = parthandlermapping.get(part.type)
422 if handler is None:
422 if handler is None:
423 status = 'unsupported-type'
423 status = 'unsupported-type'
424 raise error.BundleUnknownFeatureError(parttype=part.type)
424 raise error.BundleUnknownFeatureError(parttype=part.type)
425 indebug(op.ui, 'found a handler for part %r' % part.type)
425 indebug(op.ui, 'found a handler for part %r' % part.type)
426 unknownparams = part.mandatorykeys - handler.params
426 unknownparams = part.mandatorykeys - handler.params
427 if unknownparams:
427 if unknownparams:
428 unknownparams = list(unknownparams)
428 unknownparams = list(unknownparams)
429 unknownparams.sort()
429 unknownparams.sort()
430 status = 'unsupported-params (%s)' % unknownparams
430 status = 'unsupported-params (%s)' % unknownparams
431 raise error.BundleUnknownFeatureError(parttype=part.type,
431 raise error.BundleUnknownFeatureError(parttype=part.type,
432 params=unknownparams)
432 params=unknownparams)
433 status = 'supported'
433 status = 'supported'
434 except error.BundleUnknownFeatureError as exc:
434 except error.BundleUnknownFeatureError as exc:
435 if part.mandatory: # mandatory parts
435 if part.mandatory: # mandatory parts
436 raise
436 raise
437 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
437 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
438 return # skip to part processing
438 return # skip to part processing
439 finally:
439 finally:
440 if op.ui.debugflag:
440 if op.ui.debugflag:
441 msg = ['bundle2-input-part: "%s"' % part.type]
441 msg = ['bundle2-input-part: "%s"' % part.type]
442 if not part.mandatory:
442 if not part.mandatory:
443 msg.append(' (advisory)')
443 msg.append(' (advisory)')
444 nbmp = len(part.mandatorykeys)
444 nbmp = len(part.mandatorykeys)
445 nbap = len(part.params) - nbmp
445 nbap = len(part.params) - nbmp
446 if nbmp or nbap:
446 if nbmp or nbap:
447 msg.append(' (params:')
447 msg.append(' (params:')
448 if nbmp:
448 if nbmp:
449 msg.append(' %i mandatory' % nbmp)
449 msg.append(' %i mandatory' % nbmp)
450 if nbap:
450 if nbap:
451 msg.append(' %i advisory' % nbmp)
451 msg.append(' %i advisory' % nbmp)
452 msg.append(')')
452 msg.append(')')
453 msg.append(' %s\n' % status)
453 msg.append(' %s\n' % status)
454 op.ui.debug(''.join(msg))
454 op.ui.debug(''.join(msg))
455
455
456 # handler is called outside the above try block so that we don't
456 # handler is called outside the above try block so that we don't
457 # risk catching KeyErrors from anything other than the
457 # risk catching KeyErrors from anything other than the
458 # parthandlermapping lookup (any KeyError raised by handler()
458 # parthandlermapping lookup (any KeyError raised by handler()
459 # itself represents a defect of a different variety).
459 # itself represents a defect of a different variety).
460 output = None
460 output = None
461 if op.captureoutput and op.reply is not None:
461 if op.captureoutput and op.reply is not None:
462 op.ui.pushbuffer(error=True, subproc=True)
462 op.ui.pushbuffer(error=True, subproc=True)
463 output = ''
463 output = ''
464 try:
464 try:
465 handler(op, part)
465 handler(op, part)
466 finally:
466 finally:
467 if output is not None:
467 if output is not None:
468 output = op.ui.popbuffer()
468 output = op.ui.popbuffer()
469 if output:
469 if output:
470 outpart = op.reply.newpart('output', data=output,
470 outpart = op.reply.newpart('output', data=output,
471 mandatory=False)
471 mandatory=False)
472 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
472 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
473 # If exiting or interrupted, do not attempt to seek the stream in the
473 # If exiting or interrupted, do not attempt to seek the stream in the
474 # finally block below. This makes abort faster.
474 # finally block below. This makes abort faster.
475 except (SystemExit, KeyboardInterrupt):
475 except (SystemExit, KeyboardInterrupt):
476 hardabort = True
476 hardabort = True
477 raise
477 raise
478 finally:
478 finally:
479 # consume the part content to not corrupt the stream.
479 # consume the part content to not corrupt the stream.
480 if not hardabort:
480 if not hardabort:
481 part.seek(0, 2)
481 part.seek(0, 2)
482
482
483
483
484 def decodecaps(blob):
484 def decodecaps(blob):
485 """decode a bundle2 caps bytes blob into a dictionary
485 """decode a bundle2 caps bytes blob into a dictionary
486
486
487 The blob is a list of capabilities (one per line)
487 The blob is a list of capabilities (one per line)
488 Capabilities may have values using a line of the form::
488 Capabilities may have values using a line of the form::
489
489
490 capability=value1,value2,value3
490 capability=value1,value2,value3
491
491
492 The values are always a list."""
492 The values are always a list."""
493 caps = {}
493 caps = {}
494 for line in blob.splitlines():
494 for line in blob.splitlines():
495 if not line:
495 if not line:
496 continue
496 continue
497 if '=' not in line:
497 if '=' not in line:
498 key, vals = line, ()
498 key, vals = line, ()
499 else:
499 else:
500 key, vals = line.split('=', 1)
500 key, vals = line.split('=', 1)
501 vals = vals.split(',')
501 vals = vals.split(',')
502 key = urlreq.unquote(key)
502 key = urlreq.unquote(key)
503 vals = [urlreq.unquote(v) for v in vals]
503 vals = [urlreq.unquote(v) for v in vals]
504 caps[key] = vals
504 caps[key] = vals
505 return caps
505 return caps
506
506
507 def encodecaps(caps):
507 def encodecaps(caps):
508 """encode a bundle2 caps dictionary into a bytes blob"""
508 """encode a bundle2 caps dictionary into a bytes blob"""
509 chunks = []
509 chunks = []
510 for ca in sorted(caps):
510 for ca in sorted(caps):
511 vals = caps[ca]
511 vals = caps[ca]
512 ca = urlreq.quote(ca)
512 ca = urlreq.quote(ca)
513 vals = [urlreq.quote(v) for v in vals]
513 vals = [urlreq.quote(v) for v in vals]
514 if vals:
514 if vals:
515 ca = "%s=%s" % (ca, ','.join(vals))
515 ca = "%s=%s" % (ca, ','.join(vals))
516 chunks.append(ca)
516 chunks.append(ca)
517 return '\n'.join(chunks)
517 return '\n'.join(chunks)
518
518
519 bundletypes = {
519 bundletypes = {
520 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
520 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
521 # since the unification ssh accepts a header but there
521 # since the unification ssh accepts a header but there
522 # is no capability signaling it.
522 # is no capability signaling it.
523 "HG20": (), # special-cased below
523 "HG20": (), # special-cased below
524 "HG10UN": ("HG10UN", 'UN'),
524 "HG10UN": ("HG10UN", 'UN'),
525 "HG10BZ": ("HG10", 'BZ'),
525 "HG10BZ": ("HG10", 'BZ'),
526 "HG10GZ": ("HG10GZ", 'GZ'),
526 "HG10GZ": ("HG10GZ", 'GZ'),
527 }
527 }
528
528
529 # hgweb uses this list to communicate its preferred type
529 # hgweb uses this list to communicate its preferred type
530 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
530 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
531
531
532 class bundle20(object):
532 class bundle20(object):
533 """represent an outgoing bundle2 container
533 """represent an outgoing bundle2 container
534
534
535 Use the `addparam` method to add stream level parameter. and `newpart` to
535 Use the `addparam` method to add stream level parameter. and `newpart` to
536 populate it. Then call `getchunks` to retrieve all the binary chunks of
536 populate it. Then call `getchunks` to retrieve all the binary chunks of
537 data that compose the bundle2 container."""
537 data that compose the bundle2 container."""
538
538
539 _magicstring = 'HG20'
539 _magicstring = 'HG20'
540
540
541 def __init__(self, ui, capabilities=()):
541 def __init__(self, ui, capabilities=()):
542 self.ui = ui
542 self.ui = ui
543 self._params = []
543 self._params = []
544 self._parts = []
544 self._parts = []
545 self.capabilities = dict(capabilities)
545 self.capabilities = dict(capabilities)
546 self._compengine = util.compengines.forbundletype('UN')
546 self._compengine = util.compengines.forbundletype('UN')
547 self._compopts = None
547 self._compopts = None
548
548
549 def setcompression(self, alg, compopts=None):
549 def setcompression(self, alg, compopts=None):
550 """setup core part compression to <alg>"""
550 """setup core part compression to <alg>"""
551 if alg in (None, 'UN'):
551 if alg in (None, 'UN'):
552 return
552 return
553 assert not any(n.lower() == 'compression' for n, v in self._params)
553 assert not any(n.lower() == 'compression' for n, v in self._params)
554 self.addparam('Compression', alg)
554 self.addparam('Compression', alg)
555 self._compengine = util.compengines.forbundletype(alg)
555 self._compengine = util.compengines.forbundletype(alg)
556 self._compopts = compopts
556 self._compopts = compopts
557
557
558 @property
558 @property
559 def nbparts(self):
559 def nbparts(self):
560 """total number of parts added to the bundler"""
560 """total number of parts added to the bundler"""
561 return len(self._parts)
561 return len(self._parts)
562
562
563 # methods used to defines the bundle2 content
563 # methods used to defines the bundle2 content
564 def addparam(self, name, value=None):
564 def addparam(self, name, value=None):
565 """add a stream level parameter"""
565 """add a stream level parameter"""
566 if not name:
566 if not name:
567 raise ValueError('empty parameter name')
567 raise ValueError('empty parameter name')
568 if name[0] not in string.letters:
568 if name[0] not in string.letters:
569 raise ValueError('non letter first character: %r' % name)
569 raise ValueError('non letter first character: %r' % name)
570 self._params.append((name, value))
570 self._params.append((name, value))
571
571
572 def addpart(self, part):
572 def addpart(self, part):
573 """add a new part to the bundle2 container
573 """add a new part to the bundle2 container
574
574
575 Parts contains the actual applicative payload."""
575 Parts contains the actual applicative payload."""
576 assert part.id is None
576 assert part.id is None
577 part.id = len(self._parts) # very cheap counter
577 part.id = len(self._parts) # very cheap counter
578 self._parts.append(part)
578 self._parts.append(part)
579
579
580 def newpart(self, typeid, *args, **kwargs):
580 def newpart(self, typeid, *args, **kwargs):
581 """create a new part and add it to the containers
581 """create a new part and add it to the containers
582
582
583 As the part is directly added to the containers. For now, this means
583 As the part is directly added to the containers. For now, this means
584 that any failure to properly initialize the part after calling
584 that any failure to properly initialize the part after calling
585 ``newpart`` should result in a failure of the whole bundling process.
585 ``newpart`` should result in a failure of the whole bundling process.
586
586
587 You can still fall back to manually create and add if you need better
587 You can still fall back to manually create and add if you need better
588 control."""
588 control."""
589 part = bundlepart(typeid, *args, **kwargs)
589 part = bundlepart(typeid, *args, **kwargs)
590 self.addpart(part)
590 self.addpart(part)
591 return part
591 return part
592
592
593 # methods used to generate the bundle2 stream
593 # methods used to generate the bundle2 stream
594 def getchunks(self):
594 def getchunks(self):
595 if self.ui.debugflag:
595 if self.ui.debugflag:
596 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
596 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
597 if self._params:
597 if self._params:
598 msg.append(' (%i params)' % len(self._params))
598 msg.append(' (%i params)' % len(self._params))
599 msg.append(' %i parts total\n' % len(self._parts))
599 msg.append(' %i parts total\n' % len(self._parts))
600 self.ui.debug(''.join(msg))
600 self.ui.debug(''.join(msg))
601 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
601 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
602 yield self._magicstring
602 yield self._magicstring
603 param = self._paramchunk()
603 param = self._paramchunk()
604 outdebug(self.ui, 'bundle parameter: %s' % param)
604 outdebug(self.ui, 'bundle parameter: %s' % param)
605 yield _pack(_fstreamparamsize, len(param))
605 yield _pack(_fstreamparamsize, len(param))
606 if param:
606 if param:
607 yield param
607 yield param
608 for chunk in self._compengine.compressstream(self._getcorechunk(),
608 for chunk in self._compengine.compressstream(self._getcorechunk(),
609 self._compopts):
609 self._compopts):
610 yield chunk
610 yield chunk
611
611
612 def _paramchunk(self):
612 def _paramchunk(self):
613 """return a encoded version of all stream parameters"""
613 """return a encoded version of all stream parameters"""
614 blocks = []
614 blocks = []
615 for par, value in self._params:
615 for par, value in self._params:
616 par = urlreq.quote(par)
616 par = urlreq.quote(par)
617 if value is not None:
617 if value is not None:
618 value = urlreq.quote(value)
618 value = urlreq.quote(value)
619 par = '%s=%s' % (par, value)
619 par = '%s=%s' % (par, value)
620 blocks.append(par)
620 blocks.append(par)
621 return ' '.join(blocks)
621 return ' '.join(blocks)
622
622
623 def _getcorechunk(self):
623 def _getcorechunk(self):
624 """yield chunk for the core part of the bundle
624 """yield chunk for the core part of the bundle
625
625
626 (all but headers and parameters)"""
626 (all but headers and parameters)"""
627 outdebug(self.ui, 'start of parts')
627 outdebug(self.ui, 'start of parts')
628 for part in self._parts:
628 for part in self._parts:
629 outdebug(self.ui, 'bundle part: "%s"' % part.type)
629 outdebug(self.ui, 'bundle part: "%s"' % part.type)
630 for chunk in part.getchunks(ui=self.ui):
630 for chunk in part.getchunks(ui=self.ui):
631 yield chunk
631 yield chunk
632 outdebug(self.ui, 'end of bundle')
632 outdebug(self.ui, 'end of bundle')
633 yield _pack(_fpartheadersize, 0)
633 yield _pack(_fpartheadersize, 0)
634
634
635
635
636 def salvageoutput(self):
636 def salvageoutput(self):
637 """return a list with a copy of all output parts in the bundle
637 """return a list with a copy of all output parts in the bundle
638
638
639 This is meant to be used during error handling to make sure we preserve
639 This is meant to be used during error handling to make sure we preserve
640 server output"""
640 server output"""
641 salvaged = []
641 salvaged = []
642 for part in self._parts:
642 for part in self._parts:
643 if part.type.startswith('output'):
643 if part.type.startswith('output'):
644 salvaged.append(part.copy())
644 salvaged.append(part.copy())
645 return salvaged
645 return salvaged
646
646
647
647
648 class unpackermixin(object):
648 class unpackermixin(object):
649 """A mixin to extract bytes and struct data from a stream"""
649 """A mixin to extract bytes and struct data from a stream"""
650
650
651 def __init__(self, fp):
651 def __init__(self, fp):
652 self._fp = fp
652 self._fp = fp
653
653
654 def _unpack(self, format):
654 def _unpack(self, format):
655 """unpack this struct format from the stream
655 """unpack this struct format from the stream
656
656
657 This method is meant for internal usage by the bundle2 protocol only.
657 This method is meant for internal usage by the bundle2 protocol only.
658 They directly manipulate the low level stream including bundle2 level
658 They directly manipulate the low level stream including bundle2 level
659 instruction.
659 instruction.
660
660
661 Do not use it to implement higher-level logic or methods."""
661 Do not use it to implement higher-level logic or methods."""
662 data = self._readexact(struct.calcsize(format))
662 data = self._readexact(struct.calcsize(format))
663 return _unpack(format, data)
663 return _unpack(format, data)
664
664
665 def _readexact(self, size):
665 def _readexact(self, size):
666 """read exactly <size> bytes from the stream
666 """read exactly <size> bytes from the stream
667
667
668 This method is meant for internal usage by the bundle2 protocol only.
668 This method is meant for internal usage by the bundle2 protocol only.
669 They directly manipulate the low level stream including bundle2 level
669 They directly manipulate the low level stream including bundle2 level
670 instruction.
670 instruction.
671
671
672 Do not use it to implement higher-level logic or methods."""
672 Do not use it to implement higher-level logic or methods."""
673 return changegroup.readexactly(self._fp, size)
673 return changegroup.readexactly(self._fp, size)
674
674
675 def getunbundler(ui, fp, magicstring=None):
675 def getunbundler(ui, fp, magicstring=None):
676 """return a valid unbundler object for a given magicstring"""
676 """return a valid unbundler object for a given magicstring"""
677 if magicstring is None:
677 if magicstring is None:
678 magicstring = changegroup.readexactly(fp, 4)
678 magicstring = changegroup.readexactly(fp, 4)
679 magic, version = magicstring[0:2], magicstring[2:4]
679 magic, version = magicstring[0:2], magicstring[2:4]
680 if magic != 'HG':
680 if magic != 'HG':
681 ui.debug(
681 ui.debug(
682 "error: invalid magic: %r (version %r), should be 'HG'\n"
682 "error: invalid magic: %r (version %r), should be 'HG'\n"
683 % (magic, version))
683 % (magic, version))
684 raise error.Abort(_('not a Mercurial bundle'))
684 raise error.Abort(_('not a Mercurial bundle'))
685 unbundlerclass = formatmap.get(version)
685 unbundlerclass = formatmap.get(version)
686 if unbundlerclass is None:
686 if unbundlerclass is None:
687 raise error.Abort(_('unknown bundle version %s') % version)
687 raise error.Abort(_('unknown bundle version %s') % version)
688 unbundler = unbundlerclass(ui, fp)
688 unbundler = unbundlerclass(ui, fp)
689 indebug(ui, 'start processing of %s stream' % magicstring)
689 indebug(ui, 'start processing of %s stream' % magicstring)
690 return unbundler
690 return unbundler
691
691
692 class unbundle20(unpackermixin):
692 class unbundle20(unpackermixin):
693 """interpret a bundle2 stream
693 """interpret a bundle2 stream
694
694
695 This class is fed with a binary stream and yields parts through its
695 This class is fed with a binary stream and yields parts through its
696 `iterparts` methods."""
696 `iterparts` methods."""
697
697
698 _magicstring = 'HG20'
698 _magicstring = 'HG20'
699
699
700 def __init__(self, ui, fp):
700 def __init__(self, ui, fp):
701 """If header is specified, we do not read it out of the stream."""
701 """If header is specified, we do not read it out of the stream."""
702 self.ui = ui
702 self.ui = ui
703 self._compengine = util.compengines.forbundletype('UN')
703 self._compengine = util.compengines.forbundletype('UN')
704 self._compressed = None
704 self._compressed = None
705 super(unbundle20, self).__init__(fp)
705 super(unbundle20, self).__init__(fp)
706
706
707 @util.propertycache
707 @util.propertycache
708 def params(self):
708 def params(self):
709 """dictionary of stream level parameters"""
709 """dictionary of stream level parameters"""
710 indebug(self.ui, 'reading bundle2 stream parameters')
710 indebug(self.ui, 'reading bundle2 stream parameters')
711 params = {}
711 params = {}
712 paramssize = self._unpack(_fstreamparamsize)[0]
712 paramssize = self._unpack(_fstreamparamsize)[0]
713 if paramssize < 0:
713 if paramssize < 0:
714 raise error.BundleValueError('negative bundle param size: %i'
714 raise error.BundleValueError('negative bundle param size: %i'
715 % paramssize)
715 % paramssize)
716 if paramssize:
716 if paramssize:
717 params = self._readexact(paramssize)
717 params = self._readexact(paramssize)
718 params = self._processallparams(params)
718 params = self._processallparams(params)
719 return params
719 return params
720
720
721 def _processallparams(self, paramsblock):
721 def _processallparams(self, paramsblock):
722 """"""
722 """"""
723 params = util.sortdict()
723 params = util.sortdict()
724 for p in paramsblock.split(' '):
724 for p in paramsblock.split(' '):
725 p = p.split('=', 1)
725 p = p.split('=', 1)
726 p = [urlreq.unquote(i) for i in p]
726 p = [urlreq.unquote(i) for i in p]
727 if len(p) < 2:
727 if len(p) < 2:
728 p.append(None)
728 p.append(None)
729 self._processparam(*p)
729 self._processparam(*p)
730 params[p[0]] = p[1]
730 params[p[0]] = p[1]
731 return params
731 return params
732
732
733
733
734 def _processparam(self, name, value):
734 def _processparam(self, name, value):
735 """process a parameter, applying its effect if needed
735 """process a parameter, applying its effect if needed
736
736
737 Parameter starting with a lower case letter are advisory and will be
737 Parameter starting with a lower case letter are advisory and will be
738 ignored when unknown. Those starting with an upper case letter are
738 ignored when unknown. Those starting with an upper case letter are
739 mandatory and will this function will raise a KeyError when unknown.
739 mandatory and will this function will raise a KeyError when unknown.
740
740
741 Note: no option are currently supported. Any input will be either
741 Note: no option are currently supported. Any input will be either
742 ignored or failing.
742 ignored or failing.
743 """
743 """
744 if not name:
744 if not name:
745 raise ValueError('empty parameter name')
745 raise ValueError('empty parameter name')
746 if name[0] not in string.letters:
746 if name[0] not in string.letters:
747 raise ValueError('non letter first character: %r' % name)
747 raise ValueError('non letter first character: %r' % name)
748 try:
748 try:
749 handler = b2streamparamsmap[name.lower()]
749 handler = b2streamparamsmap[name.lower()]
750 except KeyError:
750 except KeyError:
751 if name[0].islower():
751 if name[0].islower():
752 indebug(self.ui, "ignoring unknown parameter %r" % name)
752 indebug(self.ui, "ignoring unknown parameter %r" % name)
753 else:
753 else:
754 raise error.BundleUnknownFeatureError(params=(name,))
754 raise error.BundleUnknownFeatureError(params=(name,))
755 else:
755 else:
756 handler(self, name, value)
756 handler(self, name, value)
757
757
758 def _forwardchunks(self):
758 def _forwardchunks(self):
759 """utility to transfer a bundle2 as binary
759 """utility to transfer a bundle2 as binary
760
760
761 This is made necessary by the fact the 'getbundle' command over 'ssh'
761 This is made necessary by the fact the 'getbundle' command over 'ssh'
762 have no way to know then the reply end, relying on the bundle to be
762 have no way to know then the reply end, relying on the bundle to be
763 interpreted to know its end. This is terrible and we are sorry, but we
763 interpreted to know its end. This is terrible and we are sorry, but we
764 needed to move forward to get general delta enabled.
764 needed to move forward to get general delta enabled.
765 """
765 """
766 yield self._magicstring
766 yield self._magicstring
767 assert 'params' not in vars(self)
767 assert 'params' not in vars(self)
768 paramssize = self._unpack(_fstreamparamsize)[0]
768 paramssize = self._unpack(_fstreamparamsize)[0]
769 if paramssize < 0:
769 if paramssize < 0:
770 raise error.BundleValueError('negative bundle param size: %i'
770 raise error.BundleValueError('negative bundle param size: %i'
771 % paramssize)
771 % paramssize)
772 yield _pack(_fstreamparamsize, paramssize)
772 yield _pack(_fstreamparamsize, paramssize)
773 if paramssize:
773 if paramssize:
774 params = self._readexact(paramssize)
774 params = self._readexact(paramssize)
775 self._processallparams(params)
775 self._processallparams(params)
776 yield params
776 yield params
777 assert self._compengine.bundletype == 'UN'
777 assert self._compengine.bundletype == 'UN'
778 # From there, payload might need to be decompressed
778 # From there, payload might need to be decompressed
779 self._fp = self._compengine.decompressorreader(self._fp)
779 self._fp = self._compengine.decompressorreader(self._fp)
780 emptycount = 0
780 emptycount = 0
781 while emptycount < 2:
781 while emptycount < 2:
782 # so we can brainlessly loop
782 # so we can brainlessly loop
783 assert _fpartheadersize == _fpayloadsize
783 assert _fpartheadersize == _fpayloadsize
784 size = self._unpack(_fpartheadersize)[0]
784 size = self._unpack(_fpartheadersize)[0]
785 yield _pack(_fpartheadersize, size)
785 yield _pack(_fpartheadersize, size)
786 if size:
786 if size:
787 emptycount = 0
787 emptycount = 0
788 else:
788 else:
789 emptycount += 1
789 emptycount += 1
790 continue
790 continue
791 if size == flaginterrupt:
791 if size == flaginterrupt:
792 continue
792 continue
793 elif size < 0:
793 elif size < 0:
794 raise error.BundleValueError('negative chunk size: %i')
794 raise error.BundleValueError('negative chunk size: %i')
795 yield self._readexact(size)
795 yield self._readexact(size)
796
796
797
797
798 def iterparts(self):
798 def iterparts(self):
799 """yield all parts contained in the stream"""
799 """yield all parts contained in the stream"""
800 # make sure param have been loaded
800 # make sure param have been loaded
801 self.params
801 self.params
802 # From there, payload need to be decompressed
802 # From there, payload need to be decompressed
803 self._fp = self._compengine.decompressorreader(self._fp)
803 self._fp = self._compengine.decompressorreader(self._fp)
804 indebug(self.ui, 'start extraction of bundle2 parts')
804 indebug(self.ui, 'start extraction of bundle2 parts')
805 headerblock = self._readpartheader()
805 headerblock = self._readpartheader()
806 while headerblock is not None:
806 while headerblock is not None:
807 part = unbundlepart(self.ui, headerblock, self._fp)
807 part = unbundlepart(self.ui, headerblock, self._fp)
808 yield part
808 yield part
809 part.seek(0, 2)
809 part.seek(0, 2)
810 headerblock = self._readpartheader()
810 headerblock = self._readpartheader()
811 indebug(self.ui, 'end of bundle2 stream')
811 indebug(self.ui, 'end of bundle2 stream')
812
812
813 def _readpartheader(self):
813 def _readpartheader(self):
814 """reads a part header size and return the bytes blob
814 """reads a part header size and return the bytes blob
815
815
816 returns None if empty"""
816 returns None if empty"""
817 headersize = self._unpack(_fpartheadersize)[0]
817 headersize = self._unpack(_fpartheadersize)[0]
818 if headersize < 0:
818 if headersize < 0:
819 raise error.BundleValueError('negative part header size: %i'
819 raise error.BundleValueError('negative part header size: %i'
820 % headersize)
820 % headersize)
821 indebug(self.ui, 'part header size: %i' % headersize)
821 indebug(self.ui, 'part header size: %i' % headersize)
822 if headersize:
822 if headersize:
823 return self._readexact(headersize)
823 return self._readexact(headersize)
824 return None
824 return None
825
825
826 def compressed(self):
826 def compressed(self):
827 self.params # load params
827 self.params # load params
828 return self._compressed
828 return self._compressed
829
829
830 def close(self):
830 def close(self):
831 """close underlying file"""
831 """close underlying file"""
832 if util.safehasattr(self._fp, 'close'):
832 if util.safehasattr(self._fp, 'close'):
833 return self._fp.close()
833 return self._fp.close()
834
834
835 formatmap = {'20': unbundle20}
835 formatmap = {'20': unbundle20}
836
836
837 b2streamparamsmap = {}
837 b2streamparamsmap = {}
838
838
839 def b2streamparamhandler(name):
839 def b2streamparamhandler(name):
840 """register a handler for a stream level parameter"""
840 """register a handler for a stream level parameter"""
841 def decorator(func):
841 def decorator(func):
842 assert name not in formatmap
842 assert name not in formatmap
843 b2streamparamsmap[name] = func
843 b2streamparamsmap[name] = func
844 return func
844 return func
845 return decorator
845 return decorator
846
846
847 @b2streamparamhandler('compression')
847 @b2streamparamhandler('compression')
848 def processcompression(unbundler, param, value):
848 def processcompression(unbundler, param, value):
849 """read compression parameter and install payload decompression"""
849 """read compression parameter and install payload decompression"""
850 if value not in util.compengines.supportedbundletypes:
850 if value not in util.compengines.supportedbundletypes:
851 raise error.BundleUnknownFeatureError(params=(param,),
851 raise error.BundleUnknownFeatureError(params=(param,),
852 values=(value,))
852 values=(value,))
853 unbundler._compengine = util.compengines.forbundletype(value)
853 unbundler._compengine = util.compengines.forbundletype(value)
854 if value is not None:
854 if value is not None:
855 unbundler._compressed = True
855 unbundler._compressed = True
856
856
857 class bundlepart(object):
857 class bundlepart(object):
858 """A bundle2 part contains application level payload
858 """A bundle2 part contains application level payload
859
859
860 The part `type` is used to route the part to the application level
860 The part `type` is used to route the part to the application level
861 handler.
861 handler.
862
862
863 The part payload is contained in ``part.data``. It could be raw bytes or a
863 The part payload is contained in ``part.data``. It could be raw bytes or a
864 generator of byte chunks.
864 generator of byte chunks.
865
865
866 You can add parameters to the part using the ``addparam`` method.
866 You can add parameters to the part using the ``addparam`` method.
867 Parameters can be either mandatory (default) or advisory. Remote side
867 Parameters can be either mandatory (default) or advisory. Remote side
868 should be able to safely ignore the advisory ones.
868 should be able to safely ignore the advisory ones.
869
869
870 Both data and parameters cannot be modified after the generation has begun.
870 Both data and parameters cannot be modified after the generation has begun.
871 """
871 """
872
872
873 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
873 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
874 data='', mandatory=True):
874 data='', mandatory=True):
875 validateparttype(parttype)
875 validateparttype(parttype)
876 self.id = None
876 self.id = None
877 self.type = parttype
877 self.type = parttype
878 self._data = data
878 self._data = data
879 self._mandatoryparams = list(mandatoryparams)
879 self._mandatoryparams = list(mandatoryparams)
880 self._advisoryparams = list(advisoryparams)
880 self._advisoryparams = list(advisoryparams)
881 # checking for duplicated entries
881 # checking for duplicated entries
882 self._seenparams = set()
882 self._seenparams = set()
883 for pname, __ in self._mandatoryparams + self._advisoryparams:
883 for pname, __ in self._mandatoryparams + self._advisoryparams:
884 if pname in self._seenparams:
884 if pname in self._seenparams:
885 raise error.ProgrammingError('duplicated params: %s' % pname)
885 raise error.ProgrammingError('duplicated params: %s' % pname)
886 self._seenparams.add(pname)
886 self._seenparams.add(pname)
887 # status of the part's generation:
887 # status of the part's generation:
888 # - None: not started,
888 # - None: not started,
889 # - False: currently generated,
889 # - False: currently generated,
890 # - True: generation done.
890 # - True: generation done.
891 self._generated = None
891 self._generated = None
892 self.mandatory = mandatory
892 self.mandatory = mandatory
893
893
894 def __repr__(self):
894 def __repr__(self):
895 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
895 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
896 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
896 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
897 % (cls, id(self), self.id, self.type, self.mandatory))
897 % (cls, id(self), self.id, self.type, self.mandatory))
898
898
899 def copy(self):
899 def copy(self):
900 """return a copy of the part
900 """return a copy of the part
901
901
902 The new part have the very same content but no partid assigned yet.
902 The new part have the very same content but no partid assigned yet.
903 Parts with generated data cannot be copied."""
903 Parts with generated data cannot be copied."""
904 assert not util.safehasattr(self.data, 'next')
904 assert not util.safehasattr(self.data, 'next')
905 return self.__class__(self.type, self._mandatoryparams,
905 return self.__class__(self.type, self._mandatoryparams,
906 self._advisoryparams, self._data, self.mandatory)
906 self._advisoryparams, self._data, self.mandatory)
907
907
908 # methods used to defines the part content
908 # methods used to defines the part content
909 @property
909 @property
910 def data(self):
910 def data(self):
911 return self._data
911 return self._data
912
912
913 @data.setter
913 @data.setter
914 def data(self, data):
914 def data(self, data):
915 if self._generated is not None:
915 if self._generated is not None:
916 raise error.ReadOnlyPartError('part is being generated')
916 raise error.ReadOnlyPartError('part is being generated')
917 self._data = data
917 self._data = data
918
918
919 @property
919 @property
920 def mandatoryparams(self):
920 def mandatoryparams(self):
921 # make it an immutable tuple to force people through ``addparam``
921 # make it an immutable tuple to force people through ``addparam``
922 return tuple(self._mandatoryparams)
922 return tuple(self._mandatoryparams)
923
923
924 @property
924 @property
925 def advisoryparams(self):
925 def advisoryparams(self):
926 # make it an immutable tuple to force people through ``addparam``
926 # make it an immutable tuple to force people through ``addparam``
927 return tuple(self._advisoryparams)
927 return tuple(self._advisoryparams)
928
928
929 def addparam(self, name, value='', mandatory=True):
929 def addparam(self, name, value='', mandatory=True):
930 """add a parameter to the part
930 """add a parameter to the part
931
931
932 If 'mandatory' is set to True, the remote handler must claim support
932 If 'mandatory' is set to True, the remote handler must claim support
933 for this parameter or the unbundling will be aborted.
933 for this parameter or the unbundling will be aborted.
934
934
935 The 'name' and 'value' cannot exceed 255 bytes each.
935 The 'name' and 'value' cannot exceed 255 bytes each.
936 """
936 """
937 if self._generated is not None:
937 if self._generated is not None:
938 raise error.ReadOnlyPartError('part is being generated')
938 raise error.ReadOnlyPartError('part is being generated')
939 if name in self._seenparams:
939 if name in self._seenparams:
940 raise ValueError('duplicated params: %s' % name)
940 raise ValueError('duplicated params: %s' % name)
941 self._seenparams.add(name)
941 self._seenparams.add(name)
942 params = self._advisoryparams
942 params = self._advisoryparams
943 if mandatory:
943 if mandatory:
944 params = self._mandatoryparams
944 params = self._mandatoryparams
945 params.append((name, value))
945 params.append((name, value))
946
946
947 # methods used to generates the bundle2 stream
947 # methods used to generates the bundle2 stream
948 def getchunks(self, ui):
948 def getchunks(self, ui):
949 if self._generated is not None:
949 if self._generated is not None:
950 raise error.ProgrammingError('part can only be consumed once')
950 raise error.ProgrammingError('part can only be consumed once')
951 self._generated = False
951 self._generated = False
952
952
953 if ui.debugflag:
953 if ui.debugflag:
954 msg = ['bundle2-output-part: "%s"' % self.type]
954 msg = ['bundle2-output-part: "%s"' % self.type]
955 if not self.mandatory:
955 if not self.mandatory:
956 msg.append(' (advisory)')
956 msg.append(' (advisory)')
957 nbmp = len(self.mandatoryparams)
957 nbmp = len(self.mandatoryparams)
958 nbap = len(self.advisoryparams)
958 nbap = len(self.advisoryparams)
959 if nbmp or nbap:
959 if nbmp or nbap:
960 msg.append(' (params:')
960 msg.append(' (params:')
961 if nbmp:
961 if nbmp:
962 msg.append(' %i mandatory' % nbmp)
962 msg.append(' %i mandatory' % nbmp)
963 if nbap:
963 if nbap:
964 msg.append(' %i advisory' % nbmp)
964 msg.append(' %i advisory' % nbmp)
965 msg.append(')')
965 msg.append(')')
966 if not self.data:
966 if not self.data:
967 msg.append(' empty payload')
967 msg.append(' empty payload')
968 elif util.safehasattr(self.data, 'next'):
968 elif util.safehasattr(self.data, 'next'):
969 msg.append(' streamed payload')
969 msg.append(' streamed payload')
970 else:
970 else:
971 msg.append(' %i bytes payload' % len(self.data))
971 msg.append(' %i bytes payload' % len(self.data))
972 msg.append('\n')
972 msg.append('\n')
973 ui.debug(''.join(msg))
973 ui.debug(''.join(msg))
974
974
975 #### header
975 #### header
976 if self.mandatory:
976 if self.mandatory:
977 parttype = self.type.upper()
977 parttype = self.type.upper()
978 else:
978 else:
979 parttype = self.type.lower()
979 parttype = self.type.lower()
980 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
980 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
981 ## parttype
981 ## parttype
982 header = [_pack(_fparttypesize, len(parttype)),
982 header = [_pack(_fparttypesize, len(parttype)),
983 parttype, _pack(_fpartid, self.id),
983 parttype, _pack(_fpartid, self.id),
984 ]
984 ]
985 ## parameters
985 ## parameters
986 # count
986 # count
987 manpar = self.mandatoryparams
987 manpar = self.mandatoryparams
988 advpar = self.advisoryparams
988 advpar = self.advisoryparams
989 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
989 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
990 # size
990 # size
991 parsizes = []
991 parsizes = []
992 for key, value in manpar:
992 for key, value in manpar:
993 parsizes.append(len(key))
993 parsizes.append(len(key))
994 parsizes.append(len(value))
994 parsizes.append(len(value))
995 for key, value in advpar:
995 for key, value in advpar:
996 parsizes.append(len(key))
996 parsizes.append(len(key))
997 parsizes.append(len(value))
997 parsizes.append(len(value))
998 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
998 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
999 header.append(paramsizes)
999 header.append(paramsizes)
1000 # key, value
1000 # key, value
1001 for key, value in manpar:
1001 for key, value in manpar:
1002 header.append(key)
1002 header.append(key)
1003 header.append(value)
1003 header.append(value)
1004 for key, value in advpar:
1004 for key, value in advpar:
1005 header.append(key)
1005 header.append(key)
1006 header.append(value)
1006 header.append(value)
1007 ## finalize header
1007 ## finalize header
1008 headerchunk = ''.join(header)
1008 headerchunk = ''.join(header)
1009 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1009 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1010 yield _pack(_fpartheadersize, len(headerchunk))
1010 yield _pack(_fpartheadersize, len(headerchunk))
1011 yield headerchunk
1011 yield headerchunk
1012 ## payload
1012 ## payload
1013 try:
1013 try:
1014 for chunk in self._payloadchunks():
1014 for chunk in self._payloadchunks():
1015 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1015 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1016 yield _pack(_fpayloadsize, len(chunk))
1016 yield _pack(_fpayloadsize, len(chunk))
1017 yield chunk
1017 yield chunk
1018 except GeneratorExit:
1018 except GeneratorExit:
1019 # GeneratorExit means that nobody is listening for our
1019 # GeneratorExit means that nobody is listening for our
1020 # results anyway, so just bail quickly rather than trying
1020 # results anyway, so just bail quickly rather than trying
1021 # to produce an error part.
1021 # to produce an error part.
1022 ui.debug('bundle2-generatorexit\n')
1022 ui.debug('bundle2-generatorexit\n')
1023 raise
1023 raise
1024 except BaseException as exc:
1024 except BaseException as exc:
1025 # backup exception data for later
1025 # backup exception data for later
1026 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1026 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1027 % exc)
1027 % exc)
1028 tb = sys.exc_info()[2]
1028 tb = sys.exc_info()[2]
1029 msg = 'unexpected error: %s' % exc
1029 msg = 'unexpected error: %s' % exc
1030 interpart = bundlepart('error:abort', [('message', msg)],
1030 interpart = bundlepart('error:abort', [('message', msg)],
1031 mandatory=False)
1031 mandatory=False)
1032 interpart.id = 0
1032 interpart.id = 0
1033 yield _pack(_fpayloadsize, -1)
1033 yield _pack(_fpayloadsize, -1)
1034 for chunk in interpart.getchunks(ui=ui):
1034 for chunk in interpart.getchunks(ui=ui):
1035 yield chunk
1035 yield chunk
1036 outdebug(ui, 'closing payload chunk')
1036 outdebug(ui, 'closing payload chunk')
1037 # abort current part payload
1037 # abort current part payload
1038 yield _pack(_fpayloadsize, 0)
1038 yield _pack(_fpayloadsize, 0)
1039 pycompat.raisewithtb(exc, tb)
1039 pycompat.raisewithtb(exc, tb)
1040 # end of payload
1040 # end of payload
1041 outdebug(ui, 'closing payload chunk')
1041 outdebug(ui, 'closing payload chunk')
1042 yield _pack(_fpayloadsize, 0)
1042 yield _pack(_fpayloadsize, 0)
1043 self._generated = True
1043 self._generated = True
1044
1044
1045 def _payloadchunks(self):
1045 def _payloadchunks(self):
1046 """yield chunks of a the part payload
1046 """yield chunks of a the part payload
1047
1047
1048 Exists to handle the different methods to provide data to a part."""
1048 Exists to handle the different methods to provide data to a part."""
1049 # we only support fixed size data now.
1049 # we only support fixed size data now.
1050 # This will be improved in the future.
1050 # This will be improved in the future.
1051 if util.safehasattr(self.data, 'next'):
1051 if util.safehasattr(self.data, 'next'):
1052 buff = util.chunkbuffer(self.data)
1052 buff = util.chunkbuffer(self.data)
1053 chunk = buff.read(preferedchunksize)
1053 chunk = buff.read(preferedchunksize)
1054 while chunk:
1054 while chunk:
1055 yield chunk
1055 yield chunk
1056 chunk = buff.read(preferedchunksize)
1056 chunk = buff.read(preferedchunksize)
1057 elif len(self.data):
1057 elif len(self.data):
1058 yield self.data
1058 yield self.data
1059
1059
1060
1060
1061 flaginterrupt = -1
1061 flaginterrupt = -1
1062
1062
1063 class interrupthandler(unpackermixin):
1063 class interrupthandler(unpackermixin):
1064 """read one part and process it with restricted capability
1064 """read one part and process it with restricted capability
1065
1065
1066 This allows to transmit exception raised on the producer size during part
1066 This allows to transmit exception raised on the producer size during part
1067 iteration while the consumer is reading a part.
1067 iteration while the consumer is reading a part.
1068
1068
1069 Part processed in this manner only have access to a ui object,"""
1069 Part processed in this manner only have access to a ui object,"""
1070
1070
1071 def __init__(self, ui, fp):
1071 def __init__(self, ui, fp):
1072 super(interrupthandler, self).__init__(fp)
1072 super(interrupthandler, self).__init__(fp)
1073 self.ui = ui
1073 self.ui = ui
1074
1074
1075 def _readpartheader(self):
1075 def _readpartheader(self):
1076 """reads a part header size and return the bytes blob
1076 """reads a part header size and return the bytes blob
1077
1077
1078 returns None if empty"""
1078 returns None if empty"""
1079 headersize = self._unpack(_fpartheadersize)[0]
1079 headersize = self._unpack(_fpartheadersize)[0]
1080 if headersize < 0:
1080 if headersize < 0:
1081 raise error.BundleValueError('negative part header size: %i'
1081 raise error.BundleValueError('negative part header size: %i'
1082 % headersize)
1082 % headersize)
1083 indebug(self.ui, 'part header size: %i\n' % headersize)
1083 indebug(self.ui, 'part header size: %i\n' % headersize)
1084 if headersize:
1084 if headersize:
1085 return self._readexact(headersize)
1085 return self._readexact(headersize)
1086 return None
1086 return None
1087
1087
1088 def __call__(self):
1088 def __call__(self):
1089
1089
1090 self.ui.debug('bundle2-input-stream-interrupt:'
1090 self.ui.debug('bundle2-input-stream-interrupt:'
1091 ' opening out of band context\n')
1091 ' opening out of band context\n')
1092 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1092 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1093 headerblock = self._readpartheader()
1093 headerblock = self._readpartheader()
1094 if headerblock is None:
1094 if headerblock is None:
1095 indebug(self.ui, 'no part found during interruption.')
1095 indebug(self.ui, 'no part found during interruption.')
1096 return
1096 return
1097 part = unbundlepart(self.ui, headerblock, self._fp)
1097 part = unbundlepart(self.ui, headerblock, self._fp)
1098 op = interruptoperation(self.ui)
1098 op = interruptoperation(self.ui)
1099 _processpart(op, part)
1099 _processpart(op, part)
1100 self.ui.debug('bundle2-input-stream-interrupt:'
1100 self.ui.debug('bundle2-input-stream-interrupt:'
1101 ' closing out of band context\n')
1101 ' closing out of band context\n')
1102
1102
1103 class interruptoperation(object):
1103 class interruptoperation(object):
1104 """A limited operation to be use by part handler during interruption
1104 """A limited operation to be use by part handler during interruption
1105
1105
1106 It only have access to an ui object.
1106 It only have access to an ui object.
1107 """
1107 """
1108
1108
1109 def __init__(self, ui):
1109 def __init__(self, ui):
1110 self.ui = ui
1110 self.ui = ui
1111 self.reply = None
1111 self.reply = None
1112 self.captureoutput = False
1112 self.captureoutput = False
1113
1113
1114 @property
1114 @property
1115 def repo(self):
1115 def repo(self):
1116 raise error.ProgrammingError('no repo access from stream interruption')
1116 raise error.ProgrammingError('no repo access from stream interruption')
1117
1117
1118 def gettransaction(self):
1118 def gettransaction(self):
1119 raise TransactionUnavailable('no repo access from stream interruption')
1119 raise TransactionUnavailable('no repo access from stream interruption')
1120
1120
1121 class unbundlepart(unpackermixin):
1121 class unbundlepart(unpackermixin):
1122 """a bundle part read from a bundle"""
1122 """a bundle part read from a bundle"""
1123
1123
1124 def __init__(self, ui, header, fp):
1124 def __init__(self, ui, header, fp):
1125 super(unbundlepart, self).__init__(fp)
1125 super(unbundlepart, self).__init__(fp)
1126 self._seekable = (util.safehasattr(fp, 'seek') and
1126 self._seekable = (util.safehasattr(fp, 'seek') and
1127 util.safehasattr(fp, 'tell'))
1127 util.safehasattr(fp, 'tell'))
1128 self.ui = ui
1128 self.ui = ui
1129 # unbundle state attr
1129 # unbundle state attr
1130 self._headerdata = header
1130 self._headerdata = header
1131 self._headeroffset = 0
1131 self._headeroffset = 0
1132 self._initialized = False
1132 self._initialized = False
1133 self.consumed = False
1133 self.consumed = False
1134 # part data
1134 # part data
1135 self.id = None
1135 self.id = None
1136 self.type = None
1136 self.type = None
1137 self.mandatoryparams = None
1137 self.mandatoryparams = None
1138 self.advisoryparams = None
1138 self.advisoryparams = None
1139 self.params = None
1139 self.params = None
1140 self.mandatorykeys = ()
1140 self.mandatorykeys = ()
1141 self._payloadstream = None
1141 self._payloadstream = None
1142 self._readheader()
1142 self._readheader()
1143 self._mandatory = None
1143 self._mandatory = None
1144 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1144 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1145 self._pos = 0
1145 self._pos = 0
1146
1146
1147 def _fromheader(self, size):
1147 def _fromheader(self, size):
1148 """return the next <size> byte from the header"""
1148 """return the next <size> byte from the header"""
1149 offset = self._headeroffset
1149 offset = self._headeroffset
1150 data = self._headerdata[offset:(offset + size)]
1150 data = self._headerdata[offset:(offset + size)]
1151 self._headeroffset = offset + size
1151 self._headeroffset = offset + size
1152 return data
1152 return data
1153
1153
1154 def _unpackheader(self, format):
1154 def _unpackheader(self, format):
1155 """read given format from header
1155 """read given format from header
1156
1156
1157 This automatically compute the size of the format to read."""
1157 This automatically compute the size of the format to read."""
1158 data = self._fromheader(struct.calcsize(format))
1158 data = self._fromheader(struct.calcsize(format))
1159 return _unpack(format, data)
1159 return _unpack(format, data)
1160
1160
1161 def _initparams(self, mandatoryparams, advisoryparams):
1161 def _initparams(self, mandatoryparams, advisoryparams):
1162 """internal function to setup all logic related parameters"""
1162 """internal function to setup all logic related parameters"""
1163 # make it read only to prevent people touching it by mistake.
1163 # make it read only to prevent people touching it by mistake.
1164 self.mandatoryparams = tuple(mandatoryparams)
1164 self.mandatoryparams = tuple(mandatoryparams)
1165 self.advisoryparams = tuple(advisoryparams)
1165 self.advisoryparams = tuple(advisoryparams)
1166 # user friendly UI
1166 # user friendly UI
1167 self.params = util.sortdict(self.mandatoryparams)
1167 self.params = util.sortdict(self.mandatoryparams)
1168 self.params.update(self.advisoryparams)
1168 self.params.update(self.advisoryparams)
1169 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1169 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1170
1170
1171 def _payloadchunks(self, chunknum=0):
1171 def _payloadchunks(self, chunknum=0):
1172 '''seek to specified chunk and start yielding data'''
1172 '''seek to specified chunk and start yielding data'''
1173 if len(self._chunkindex) == 0:
1173 if len(self._chunkindex) == 0:
1174 assert chunknum == 0, 'Must start with chunk 0'
1174 assert chunknum == 0, 'Must start with chunk 0'
1175 self._chunkindex.append((0, self._tellfp()))
1175 self._chunkindex.append((0, self._tellfp()))
1176 else:
1176 else:
1177 assert chunknum < len(self._chunkindex), \
1177 assert chunknum < len(self._chunkindex), \
1178 'Unknown chunk %d' % chunknum
1178 'Unknown chunk %d' % chunknum
1179 self._seekfp(self._chunkindex[chunknum][1])
1179 self._seekfp(self._chunkindex[chunknum][1])
1180
1180
1181 pos = self._chunkindex[chunknum][0]
1181 pos = self._chunkindex[chunknum][0]
1182 payloadsize = self._unpack(_fpayloadsize)[0]
1182 payloadsize = self._unpack(_fpayloadsize)[0]
1183 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1183 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1184 while payloadsize:
1184 while payloadsize:
1185 if payloadsize == flaginterrupt:
1185 if payloadsize == flaginterrupt:
1186 # interruption detection, the handler will now read a
1186 # interruption detection, the handler will now read a
1187 # single part and process it.
1187 # single part and process it.
1188 interrupthandler(self.ui, self._fp)()
1188 interrupthandler(self.ui, self._fp)()
1189 elif payloadsize < 0:
1189 elif payloadsize < 0:
1190 msg = 'negative payload chunk size: %i' % payloadsize
1190 msg = 'negative payload chunk size: %i' % payloadsize
1191 raise error.BundleValueError(msg)
1191 raise error.BundleValueError(msg)
1192 else:
1192 else:
1193 result = self._readexact(payloadsize)
1193 result = self._readexact(payloadsize)
1194 chunknum += 1
1194 chunknum += 1
1195 pos += payloadsize
1195 pos += payloadsize
1196 if chunknum == len(self._chunkindex):
1196 if chunknum == len(self._chunkindex):
1197 self._chunkindex.append((pos, self._tellfp()))
1197 self._chunkindex.append((pos, self._tellfp()))
1198 yield result
1198 yield result
1199 payloadsize = self._unpack(_fpayloadsize)[0]
1199 payloadsize = self._unpack(_fpayloadsize)[0]
1200 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1200 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1201
1201
1202 def _findchunk(self, pos):
1202 def _findchunk(self, pos):
1203 '''for a given payload position, return a chunk number and offset'''
1203 '''for a given payload position, return a chunk number and offset'''
1204 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1204 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1205 if ppos == pos:
1205 if ppos == pos:
1206 return chunk, 0
1206 return chunk, 0
1207 elif ppos > pos:
1207 elif ppos > pos:
1208 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1208 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1209 raise ValueError('Unknown chunk')
1209 raise ValueError('Unknown chunk')
1210
1210
1211 def _readheader(self):
1211 def _readheader(self):
1212 """read the header and setup the object"""
1212 """read the header and setup the object"""
1213 typesize = self._unpackheader(_fparttypesize)[0]
1213 typesize = self._unpackheader(_fparttypesize)[0]
1214 self.type = self._fromheader(typesize)
1214 self.type = self._fromheader(typesize)
1215 indebug(self.ui, 'part type: "%s"' % self.type)
1215 indebug(self.ui, 'part type: "%s"' % self.type)
1216 self.id = self._unpackheader(_fpartid)[0]
1216 self.id = self._unpackheader(_fpartid)[0]
1217 indebug(self.ui, 'part id: "%s"' % self.id)
1217 indebug(self.ui, 'part id: "%s"' % self.id)
1218 # extract mandatory bit from type
1218 # extract mandatory bit from type
1219 self.mandatory = (self.type != self.type.lower())
1219 self.mandatory = (self.type != self.type.lower())
1220 self.type = self.type.lower()
1220 self.type = self.type.lower()
1221 ## reading parameters
1221 ## reading parameters
1222 # param count
1222 # param count
1223 mancount, advcount = self._unpackheader(_fpartparamcount)
1223 mancount, advcount = self._unpackheader(_fpartparamcount)
1224 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1224 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1225 # param size
1225 # param size
1226 fparamsizes = _makefpartparamsizes(mancount + advcount)
1226 fparamsizes = _makefpartparamsizes(mancount + advcount)
1227 paramsizes = self._unpackheader(fparamsizes)
1227 paramsizes = self._unpackheader(fparamsizes)
1228 # make it a list of couple again
1228 # make it a list of couple again
1229 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1229 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1230 # split mandatory from advisory
1230 # split mandatory from advisory
1231 mansizes = paramsizes[:mancount]
1231 mansizes = paramsizes[:mancount]
1232 advsizes = paramsizes[mancount:]
1232 advsizes = paramsizes[mancount:]
1233 # retrieve param value
1233 # retrieve param value
1234 manparams = []
1234 manparams = []
1235 for key, value in mansizes:
1235 for key, value in mansizes:
1236 manparams.append((self._fromheader(key), self._fromheader(value)))
1236 manparams.append((self._fromheader(key), self._fromheader(value)))
1237 advparams = []
1237 advparams = []
1238 for key, value in advsizes:
1238 for key, value in advsizes:
1239 advparams.append((self._fromheader(key), self._fromheader(value)))
1239 advparams.append((self._fromheader(key), self._fromheader(value)))
1240 self._initparams(manparams, advparams)
1240 self._initparams(manparams, advparams)
1241 ## part payload
1241 ## part payload
1242 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1242 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1243 # we read the data, tell it
1243 # we read the data, tell it
1244 self._initialized = True
1244 self._initialized = True
1245
1245
1246 def read(self, size=None):
1246 def read(self, size=None):
1247 """read payload data"""
1247 """read payload data"""
1248 if not self._initialized:
1248 if not self._initialized:
1249 self._readheader()
1249 self._readheader()
1250 if size is None:
1250 if size is None:
1251 data = self._payloadstream.read()
1251 data = self._payloadstream.read()
1252 else:
1252 else:
1253 data = self._payloadstream.read(size)
1253 data = self._payloadstream.read(size)
1254 self._pos += len(data)
1254 self._pos += len(data)
1255 if size is None or len(data) < size:
1255 if size is None or len(data) < size:
1256 if not self.consumed and self._pos:
1256 if not self.consumed and self._pos:
1257 self.ui.debug('bundle2-input-part: total payload size %i\n'
1257 self.ui.debug('bundle2-input-part: total payload size %i\n'
1258 % self._pos)
1258 % self._pos)
1259 self.consumed = True
1259 self.consumed = True
1260 return data
1260 return data
1261
1261
1262 def tell(self):
1262 def tell(self):
1263 return self._pos
1263 return self._pos
1264
1264
1265 def seek(self, offset, whence=0):
1265 def seek(self, offset, whence=0):
1266 if whence == 0:
1266 if whence == 0:
1267 newpos = offset
1267 newpos = offset
1268 elif whence == 1:
1268 elif whence == 1:
1269 newpos = self._pos + offset
1269 newpos = self._pos + offset
1270 elif whence == 2:
1270 elif whence == 2:
1271 if not self.consumed:
1271 if not self.consumed:
1272 self.read()
1272 self.read()
1273 newpos = self._chunkindex[-1][0] - offset
1273 newpos = self._chunkindex[-1][0] - offset
1274 else:
1274 else:
1275 raise ValueError('Unknown whence value: %r' % (whence,))
1275 raise ValueError('Unknown whence value: %r' % (whence,))
1276
1276
1277 if newpos > self._chunkindex[-1][0] and not self.consumed:
1277 if newpos > self._chunkindex[-1][0] and not self.consumed:
1278 self.read()
1278 self.read()
1279 if not 0 <= newpos <= self._chunkindex[-1][0]:
1279 if not 0 <= newpos <= self._chunkindex[-1][0]:
1280 raise ValueError('Offset out of range')
1280 raise ValueError('Offset out of range')
1281
1281
1282 if self._pos != newpos:
1282 if self._pos != newpos:
1283 chunk, internaloffset = self._findchunk(newpos)
1283 chunk, internaloffset = self._findchunk(newpos)
1284 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1284 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1285 adjust = self.read(internaloffset)
1285 adjust = self.read(internaloffset)
1286 if len(adjust) != internaloffset:
1286 if len(adjust) != internaloffset:
1287 raise error.Abort(_('Seek failed\n'))
1287 raise error.Abort(_('Seek failed\n'))
1288 self._pos = newpos
1288 self._pos = newpos
1289
1289
1290 def _seekfp(self, offset, whence=0):
1290 def _seekfp(self, offset, whence=0):
1291 """move the underlying file pointer
1291 """move the underlying file pointer
1292
1292
1293 This method is meant for internal usage by the bundle2 protocol only.
1293 This method is meant for internal usage by the bundle2 protocol only.
1294 They directly manipulate the low level stream including bundle2 level
1294 They directly manipulate the low level stream including bundle2 level
1295 instruction.
1295 instruction.
1296
1296
1297 Do not use it to implement higher-level logic or methods."""
1297 Do not use it to implement higher-level logic or methods."""
1298 if self._seekable:
1298 if self._seekable:
1299 return self._fp.seek(offset, whence)
1299 return self._fp.seek(offset, whence)
1300 else:
1300 else:
1301 raise NotImplementedError(_('File pointer is not seekable'))
1301 raise NotImplementedError(_('File pointer is not seekable'))
1302
1302
1303 def _tellfp(self):
1303 def _tellfp(self):
1304 """return the file offset, or None if file is not seekable
1304 """return the file offset, or None if file is not seekable
1305
1305
1306 This method is meant for internal usage by the bundle2 protocol only.
1306 This method is meant for internal usage by the bundle2 protocol only.
1307 They directly manipulate the low level stream including bundle2 level
1307 They directly manipulate the low level stream including bundle2 level
1308 instruction.
1308 instruction.
1309
1309
1310 Do not use it to implement higher-level logic or methods."""
1310 Do not use it to implement higher-level logic or methods."""
1311 if self._seekable:
1311 if self._seekable:
1312 try:
1312 try:
1313 return self._fp.tell()
1313 return self._fp.tell()
1314 except IOError as e:
1314 except IOError as e:
1315 if e.errno == errno.ESPIPE:
1315 if e.errno == errno.ESPIPE:
1316 self._seekable = False
1316 self._seekable = False
1317 else:
1317 else:
1318 raise
1318 raise
1319 return None
1319 return None
1320
1320
1321 # These are only the static capabilities.
1321 # These are only the static capabilities.
1322 # Check the 'getrepocaps' function for the rest.
1322 # Check the 'getrepocaps' function for the rest.
1323 capabilities = {'HG20': (),
1323 capabilities = {'HG20': (),
1324 'error': ('abort', 'unsupportedcontent', 'pushraced',
1324 'error': ('abort', 'unsupportedcontent', 'pushraced',
1325 'pushkey'),
1325 'pushkey'),
1326 'listkeys': (),
1326 'listkeys': (),
1327 'pushkey': (),
1327 'pushkey': (),
1328 'digests': tuple(sorted(util.DIGESTS.keys())),
1328 'digests': tuple(sorted(util.DIGESTS.keys())),
1329 'remote-changegroup': ('http', 'https'),
1329 'remote-changegroup': ('http', 'https'),
1330 'hgtagsfnodes': (),
1330 'hgtagsfnodes': (),
1331 }
1331 }
1332
1332
1333 def getrepocaps(repo, allowpushback=False):
1333 def getrepocaps(repo, allowpushback=False):
1334 """return the bundle2 capabilities for a given repo
1334 """return the bundle2 capabilities for a given repo
1335
1335
1336 Exists to allow extensions (like evolution) to mutate the capabilities.
1336 Exists to allow extensions (like evolution) to mutate the capabilities.
1337 """
1337 """
1338 caps = capabilities.copy()
1338 caps = capabilities.copy()
1339 caps['changegroup'] = tuple(sorted(
1339 caps['changegroup'] = tuple(sorted(
1340 changegroup.supportedincomingversions(repo)))
1340 changegroup.supportedincomingversions(repo)))
1341 if obsolete.isenabled(repo, obsolete.exchangeopt):
1341 if obsolete.isenabled(repo, obsolete.exchangeopt):
1342 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1342 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1343 caps['obsmarkers'] = supportedformat
1343 caps['obsmarkers'] = supportedformat
1344 if allowpushback:
1344 if allowpushback:
1345 caps['pushback'] = ()
1345 caps['pushback'] = ()
1346 cpmode = repo.ui.config('server', 'concurrent-push-mode', 'strict')
1346 cpmode = repo.ui.config('server', 'concurrent-push-mode', 'strict')
1347 if cpmode == 'check-related':
1347 if cpmode == 'check-related':
1348 caps['checkheads'] = ('related',)
1348 caps['checkheads'] = ('related',)
1349 return caps
1349 return caps
1350
1350
1351 def bundle2caps(remote):
1351 def bundle2caps(remote):
1352 """return the bundle capabilities of a peer as dict"""
1352 """return the bundle capabilities of a peer as dict"""
1353 raw = remote.capable('bundle2')
1353 raw = remote.capable('bundle2')
1354 if not raw and raw != '':
1354 if not raw and raw != '':
1355 return {}
1355 return {}
1356 capsblob = urlreq.unquote(remote.capable('bundle2'))
1356 capsblob = urlreq.unquote(remote.capable('bundle2'))
1357 return decodecaps(capsblob)
1357 return decodecaps(capsblob)
1358
1358
1359 def obsmarkersversion(caps):
1359 def obsmarkersversion(caps):
1360 """extract the list of supported obsmarkers versions from a bundle2caps dict
1360 """extract the list of supported obsmarkers versions from a bundle2caps dict
1361 """
1361 """
1362 obscaps = caps.get('obsmarkers', ())
1362 obscaps = caps.get('obsmarkers', ())
1363 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1363 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1364
1364
1365 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1365 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1366 vfs=None, compression=None, compopts=None):
1366 vfs=None, compression=None, compopts=None):
1367 if bundletype.startswith('HG10'):
1367 if bundletype.startswith('HG10'):
1368 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1368 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1369 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1369 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1370 compression=compression, compopts=compopts)
1370 compression=compression, compopts=compopts)
1371 elif not bundletype.startswith('HG20'):
1371 elif not bundletype.startswith('HG20'):
1372 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1372 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1373
1373
1374 caps = {}
1374 caps = {}
1375 if 'obsolescence' in opts:
1375 if 'obsolescence' in opts:
1376 caps['obsmarkers'] = ('V1',)
1376 caps['obsmarkers'] = ('V1',)
1377 bundle = bundle20(ui, caps)
1377 bundle = bundle20(ui, caps)
1378 bundle.setcompression(compression, compopts)
1378 bundle.setcompression(compression, compopts)
1379 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1379 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1380 chunkiter = bundle.getchunks()
1380 chunkiter = bundle.getchunks()
1381
1381
1382 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1382 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1383
1383
1384 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1384 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1385 # We should eventually reconcile this logic with the one behind
1385 # We should eventually reconcile this logic with the one behind
1386 # 'exchange.getbundle2partsgenerator'.
1386 # 'exchange.getbundle2partsgenerator'.
1387 #
1387 #
1388 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1388 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1389 # different right now. So we keep them separated for now for the sake of
1389 # different right now. So we keep them separated for now for the sake of
1390 # simplicity.
1390 # simplicity.
1391
1391
1392 # we always want a changegroup in such bundle
1392 # we always want a changegroup in such bundle
1393 cgversion = opts.get('cg.version')
1393 cgversion = opts.get('cg.version')
1394 if cgversion is None:
1394 if cgversion is None:
1395 cgversion = changegroup.safeversion(repo)
1395 cgversion = changegroup.safeversion(repo)
1396 cg = changegroup.getchangegroup(repo, source, outgoing,
1396 cg = changegroup.getchangegroup(repo, source, outgoing,
1397 version=cgversion)
1397 version=cgversion)
1398 part = bundler.newpart('changegroup', data=cg.getchunks())
1398 part = bundler.newpart('changegroup', data=cg.getchunks())
1399 part.addparam('version', cg.version)
1399 part.addparam('version', cg.version)
1400 if 'clcount' in cg.extras:
1400 if 'clcount' in cg.extras:
1401 part.addparam('nbchanges', str(cg.extras['clcount']),
1401 part.addparam('nbchanges', str(cg.extras['clcount']),
1402 mandatory=False)
1402 mandatory=False)
1403
1403
1404 addparttagsfnodescache(repo, bundler, outgoing)
1404 addparttagsfnodescache(repo, bundler, outgoing)
1405
1405
1406 if opts.get('obsolescence', False):
1406 if opts.get('obsolescence', False):
1407 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1407 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1408 buildobsmarkerspart(bundler, obsmarkers)
1408 buildobsmarkerspart(bundler, obsmarkers)
1409
1409
1410 if opts.get('phases', False):
1410 if opts.get('phases', False):
1411 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1411 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1412 phasedata = []
1412 phasedata = []
1413 for phase in phases.allphases:
1413 for phase in phases.allphases:
1414 for head in headsbyphase[phase]:
1414 for head in headsbyphase[phase]:
1415 phasedata.append(_pack(_fphasesentry, phase, head))
1415 phasedata.append(_pack(_fphasesentry, phase, head))
1416 bundler.newpart('phase-heads', data=''.join(phasedata))
1416 bundler.newpart('phase-heads', data=''.join(phasedata))
1417
1417
1418 def addparttagsfnodescache(repo, bundler, outgoing):
1418 def addparttagsfnodescache(repo, bundler, outgoing):
1419 # we include the tags fnode cache for the bundle changeset
1419 # we include the tags fnode cache for the bundle changeset
1420 # (as an optional parts)
1420 # (as an optional parts)
1421 cache = tags.hgtagsfnodescache(repo.unfiltered())
1421 cache = tags.hgtagsfnodescache(repo.unfiltered())
1422 chunks = []
1422 chunks = []
1423
1423
1424 # .hgtags fnodes are only relevant for head changesets. While we could
1424 # .hgtags fnodes are only relevant for head changesets. While we could
1425 # transfer values for all known nodes, there will likely be little to
1425 # transfer values for all known nodes, there will likely be little to
1426 # no benefit.
1426 # no benefit.
1427 #
1427 #
1428 # We don't bother using a generator to produce output data because
1428 # We don't bother using a generator to produce output data because
1429 # a) we only have 40 bytes per head and even esoteric numbers of heads
1429 # a) we only have 40 bytes per head and even esoteric numbers of heads
1430 # consume little memory (1M heads is 40MB) b) we don't want to send the
1430 # consume little memory (1M heads is 40MB) b) we don't want to send the
1431 # part if we don't have entries and knowing if we have entries requires
1431 # part if we don't have entries and knowing if we have entries requires
1432 # cache lookups.
1432 # cache lookups.
1433 for node in outgoing.missingheads:
1433 for node in outgoing.missingheads:
1434 # Don't compute missing, as this may slow down serving.
1434 # Don't compute missing, as this may slow down serving.
1435 fnode = cache.getfnode(node, computemissing=False)
1435 fnode = cache.getfnode(node, computemissing=False)
1436 if fnode is not None:
1436 if fnode is not None:
1437 chunks.extend([node, fnode])
1437 chunks.extend([node, fnode])
1438
1438
1439 if chunks:
1439 if chunks:
1440 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1440 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1441
1441
1442 def buildobsmarkerspart(bundler, markers):
1442 def buildobsmarkerspart(bundler, markers):
1443 """add an obsmarker part to the bundler with <markers>
1443 """add an obsmarker part to the bundler with <markers>
1444
1444
1445 No part is created if markers is empty.
1445 No part is created if markers is empty.
1446 Raises ValueError if the bundler doesn't support any known obsmarker format.
1446 Raises ValueError if the bundler doesn't support any known obsmarker format.
1447 """
1447 """
1448 if not markers:
1448 if not markers:
1449 return None
1449 return None
1450
1450
1451 remoteversions = obsmarkersversion(bundler.capabilities)
1451 remoteversions = obsmarkersversion(bundler.capabilities)
1452 version = obsolete.commonversion(remoteversions)
1452 version = obsolete.commonversion(remoteversions)
1453 if version is None:
1453 if version is None:
1454 raise ValueError('bundler does not support common obsmarker format')
1454 raise ValueError('bundler does not support common obsmarker format')
1455 stream = obsolete.encodemarkers(markers, True, version=version)
1455 stream = obsolete.encodemarkers(markers, True, version=version)
1456 return bundler.newpart('obsmarkers', data=stream)
1456 return bundler.newpart('obsmarkers', data=stream)
1457
1457
1458 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1458 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1459 compopts=None):
1459 compopts=None):
1460 """Write a bundle file and return its filename.
1460 """Write a bundle file and return its filename.
1461
1461
1462 Existing files will not be overwritten.
1462 Existing files will not be overwritten.
1463 If no filename is specified, a temporary file is created.
1463 If no filename is specified, a temporary file is created.
1464 bz2 compression can be turned off.
1464 bz2 compression can be turned off.
1465 The bundle file will be deleted in case of errors.
1465 The bundle file will be deleted in case of errors.
1466 """
1466 """
1467
1467
1468 if bundletype == "HG20":
1468 if bundletype == "HG20":
1469 bundle = bundle20(ui)
1469 bundle = bundle20(ui)
1470 bundle.setcompression(compression, compopts)
1470 bundle.setcompression(compression, compopts)
1471 part = bundle.newpart('changegroup', data=cg.getchunks())
1471 part = bundle.newpart('changegroup', data=cg.getchunks())
1472 part.addparam('version', cg.version)
1472 part.addparam('version', cg.version)
1473 if 'clcount' in cg.extras:
1473 if 'clcount' in cg.extras:
1474 part.addparam('nbchanges', str(cg.extras['clcount']),
1474 part.addparam('nbchanges', str(cg.extras['clcount']),
1475 mandatory=False)
1475 mandatory=False)
1476 chunkiter = bundle.getchunks()
1476 chunkiter = bundle.getchunks()
1477 else:
1477 else:
1478 # compression argument is only for the bundle2 case
1478 # compression argument is only for the bundle2 case
1479 assert compression is None
1479 assert compression is None
1480 if cg.version != '01':
1480 if cg.version != '01':
1481 raise error.Abort(_('old bundle types only supports v1 '
1481 raise error.Abort(_('old bundle types only supports v1 '
1482 'changegroups'))
1482 'changegroups'))
1483 header, comp = bundletypes[bundletype]
1483 header, comp = bundletypes[bundletype]
1484 if comp not in util.compengines.supportedbundletypes:
1484 if comp not in util.compengines.supportedbundletypes:
1485 raise error.Abort(_('unknown stream compression type: %s')
1485 raise error.Abort(_('unknown stream compression type: %s')
1486 % comp)
1486 % comp)
1487 compengine = util.compengines.forbundletype(comp)
1487 compengine = util.compengines.forbundletype(comp)
1488 def chunkiter():
1488 def chunkiter():
1489 yield header
1489 yield header
1490 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1490 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1491 yield chunk
1491 yield chunk
1492 chunkiter = chunkiter()
1492 chunkiter = chunkiter()
1493
1493
1494 # parse the changegroup data, otherwise we will block
1494 # parse the changegroup data, otherwise we will block
1495 # in case of sshrepo because we don't know the end of the stream
1495 # in case of sshrepo because we don't know the end of the stream
1496 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1496 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1497
1497
1498 def combinechangegroupresults(op):
1498 def combinechangegroupresults(op):
1499 """logic to combine 0 or more addchangegroup results into one"""
1499 """logic to combine 0 or more addchangegroup results into one"""
1500 results = [r.get('return', 0)
1500 results = [r.get('return', 0)
1501 for r in op.records['changegroup']]
1501 for r in op.records['changegroup']]
1502 changedheads = 0
1502 changedheads = 0
1503 result = 1
1503 result = 1
1504 for ret in results:
1504 for ret in results:
1505 # If any changegroup result is 0, return 0
1505 # If any changegroup result is 0, return 0
1506 if ret == 0:
1506 if ret == 0:
1507 result = 0
1507 result = 0
1508 break
1508 break
1509 if ret < -1:
1509 if ret < -1:
1510 changedheads += ret + 1
1510 changedheads += ret + 1
1511 elif ret > 1:
1511 elif ret > 1:
1512 changedheads += ret - 1
1512 changedheads += ret - 1
1513 if changedheads > 0:
1513 if changedheads > 0:
1514 result = 1 + changedheads
1514 result = 1 + changedheads
1515 elif changedheads < 0:
1515 elif changedheads < 0:
1516 result = -1 + changedheads
1516 result = -1 + changedheads
1517 return result
1517 return result
1518
1518
1519 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1519 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1520 def handlechangegroup(op, inpart):
1520 def handlechangegroup(op, inpart):
1521 """apply a changegroup part on the repo
1521 """apply a changegroup part on the repo
1522
1522
1523 This is a very early implementation that will massive rework before being
1523 This is a very early implementation that will massive rework before being
1524 inflicted to any end-user.
1524 inflicted to any end-user.
1525 """
1525 """
1526 tr = op.gettransaction()
1526 tr = op.gettransaction()
1527 unpackerversion = inpart.params.get('version', '01')
1527 unpackerversion = inpart.params.get('version', '01')
1528 # We should raise an appropriate exception here
1528 # We should raise an appropriate exception here
1529 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1529 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1530 # the source and url passed here are overwritten by the one contained in
1530 # the source and url passed here are overwritten by the one contained in
1531 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1531 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1532 nbchangesets = None
1532 nbchangesets = None
1533 if 'nbchanges' in inpart.params:
1533 if 'nbchanges' in inpart.params:
1534 nbchangesets = int(inpart.params.get('nbchanges'))
1534 nbchangesets = int(inpart.params.get('nbchanges'))
1535 if ('treemanifest' in inpart.params and
1535 if ('treemanifest' in inpart.params and
1536 'treemanifest' not in op.repo.requirements):
1536 'treemanifest' not in op.repo.requirements):
1537 if len(op.repo.changelog) != 0:
1537 if len(op.repo.changelog) != 0:
1538 raise error.Abort(_(
1538 raise error.Abort(_(
1539 "bundle contains tree manifests, but local repo is "
1539 "bundle contains tree manifests, but local repo is "
1540 "non-empty and does not use tree manifests"))
1540 "non-empty and does not use tree manifests"))
1541 op.repo.requirements.add('treemanifest')
1541 op.repo.requirements.add('treemanifest')
1542 op.repo._applyopenerreqs()
1542 op.repo._applyopenerreqs()
1543 op.repo._writerequirements()
1543 op.repo._writerequirements()
1544 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1544 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1545 expectedtotal=nbchangesets)
1545 expectedtotal=nbchangesets)
1546 if op.reply is not None:
1546 if op.reply is not None:
1547 # This is definitely not the final form of this
1547 # This is definitely not the final form of this
1548 # return. But one need to start somewhere.
1548 # return. But one need to start somewhere.
1549 part = op.reply.newpart('reply:changegroup', mandatory=False)
1549 part = op.reply.newpart('reply:changegroup', mandatory=False)
1550 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1550 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1551 part.addparam('return', '%i' % ret, mandatory=False)
1551 part.addparam('return', '%i' % ret, mandatory=False)
1552 assert not inpart.read()
1552 assert not inpart.read()
1553
1553
1554 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1554 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1555 ['digest:%s' % k for k in util.DIGESTS.keys()])
1555 ['digest:%s' % k for k in util.DIGESTS.keys()])
1556 @parthandler('remote-changegroup', _remotechangegroupparams)
1556 @parthandler('remote-changegroup', _remotechangegroupparams)
1557 def handleremotechangegroup(op, inpart):
1557 def handleremotechangegroup(op, inpart):
1558 """apply a bundle10 on the repo, given an url and validation information
1558 """apply a bundle10 on the repo, given an url and validation information
1559
1559
1560 All the information about the remote bundle to import are given as
1560 All the information about the remote bundle to import are given as
1561 parameters. The parameters include:
1561 parameters. The parameters include:
1562 - url: the url to the bundle10.
1562 - url: the url to the bundle10.
1563 - size: the bundle10 file size. It is used to validate what was
1563 - size: the bundle10 file size. It is used to validate what was
1564 retrieved by the client matches the server knowledge about the bundle.
1564 retrieved by the client matches the server knowledge about the bundle.
1565 - digests: a space separated list of the digest types provided as
1565 - digests: a space separated list of the digest types provided as
1566 parameters.
1566 parameters.
1567 - digest:<digest-type>: the hexadecimal representation of the digest with
1567 - digest:<digest-type>: the hexadecimal representation of the digest with
1568 that name. Like the size, it is used to validate what was retrieved by
1568 that name. Like the size, it is used to validate what was retrieved by
1569 the client matches what the server knows about the bundle.
1569 the client matches what the server knows about the bundle.
1570
1570
1571 When multiple digest types are given, all of them are checked.
1571 When multiple digest types are given, all of them are checked.
1572 """
1572 """
1573 try:
1573 try:
1574 raw_url = inpart.params['url']
1574 raw_url = inpart.params['url']
1575 except KeyError:
1575 except KeyError:
1576 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1576 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1577 parsed_url = util.url(raw_url)
1577 parsed_url = util.url(raw_url)
1578 if parsed_url.scheme not in capabilities['remote-changegroup']:
1578 if parsed_url.scheme not in capabilities['remote-changegroup']:
1579 raise error.Abort(_('remote-changegroup does not support %s urls') %
1579 raise error.Abort(_('remote-changegroup does not support %s urls') %
1580 parsed_url.scheme)
1580 parsed_url.scheme)
1581
1581
1582 try:
1582 try:
1583 size = int(inpart.params['size'])
1583 size = int(inpart.params['size'])
1584 except ValueError:
1584 except ValueError:
1585 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1585 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1586 % 'size')
1586 % 'size')
1587 except KeyError:
1587 except KeyError:
1588 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1588 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1589
1589
1590 digests = {}
1590 digests = {}
1591 for typ in inpart.params.get('digests', '').split():
1591 for typ in inpart.params.get('digests', '').split():
1592 param = 'digest:%s' % typ
1592 param = 'digest:%s' % typ
1593 try:
1593 try:
1594 value = inpart.params[param]
1594 value = inpart.params[param]
1595 except KeyError:
1595 except KeyError:
1596 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1596 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1597 param)
1597 param)
1598 digests[typ] = value
1598 digests[typ] = value
1599
1599
1600 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1600 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1601
1601
1602 tr = op.gettransaction()
1602 tr = op.gettransaction()
1603 from . import exchange
1603 from . import exchange
1604 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1604 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1605 if not isinstance(cg, changegroup.cg1unpacker):
1605 if not isinstance(cg, changegroup.cg1unpacker):
1606 raise error.Abort(_('%s: not a bundle version 1.0') %
1606 raise error.Abort(_('%s: not a bundle version 1.0') %
1607 util.hidepassword(raw_url))
1607 util.hidepassword(raw_url))
1608 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1608 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1609 if op.reply is not None:
1609 if op.reply is not None:
1610 # This is definitely not the final form of this
1610 # This is definitely not the final form of this
1611 # return. But one need to start somewhere.
1611 # return. But one need to start somewhere.
1612 part = op.reply.newpart('reply:changegroup')
1612 part = op.reply.newpart('reply:changegroup')
1613 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1613 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1614 part.addparam('return', '%i' % ret, mandatory=False)
1614 part.addparam('return', '%i' % ret, mandatory=False)
1615 try:
1615 try:
1616 real_part.validate()
1616 real_part.validate()
1617 except error.Abort as e:
1617 except error.Abort as e:
1618 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1618 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1619 (util.hidepassword(raw_url), str(e)))
1619 (util.hidepassword(raw_url), str(e)))
1620 assert not inpart.read()
1620 assert not inpart.read()
1621
1621
1622 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1622 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1623 def handlereplychangegroup(op, inpart):
1623 def handlereplychangegroup(op, inpart):
1624 ret = int(inpart.params['return'])
1624 ret = int(inpart.params['return'])
1625 replyto = int(inpart.params['in-reply-to'])
1625 replyto = int(inpart.params['in-reply-to'])
1626 op.records.add('changegroup', {'return': ret}, replyto)
1626 op.records.add('changegroup', {'return': ret}, replyto)
1627
1627
1628 @parthandler('check:heads')
1628 @parthandler('check:heads')
1629 def handlecheckheads(op, inpart):
1629 def handlecheckheads(op, inpart):
1630 """check that head of the repo did not change
1630 """check that head of the repo did not change
1631
1631
1632 This is used to detect a push race when using unbundle.
1632 This is used to detect a push race when using unbundle.
1633 This replaces the "heads" argument of unbundle."""
1633 This replaces the "heads" argument of unbundle."""
1634 h = inpart.read(20)
1634 h = inpart.read(20)
1635 heads = []
1635 heads = []
1636 while len(h) == 20:
1636 while len(h) == 20:
1637 heads.append(h)
1637 heads.append(h)
1638 h = inpart.read(20)
1638 h = inpart.read(20)
1639 assert not h
1639 assert not h
1640 # Trigger a transaction so that we are guaranteed to have the lock now.
1640 # Trigger a transaction so that we are guaranteed to have the lock now.
1641 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1641 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1642 op.gettransaction()
1642 op.gettransaction()
1643 if sorted(heads) != sorted(op.repo.heads()):
1643 if sorted(heads) != sorted(op.repo.heads()):
1644 raise error.PushRaced('repository changed while pushing - '
1644 raise error.PushRaced('repository changed while pushing - '
1645 'please try again')
1645 'please try again')
1646
1646
1647 @parthandler('check:updated-heads')
1647 @parthandler('check:updated-heads')
1648 def handlecheckupdatedheads(op, inpart):
1648 def handlecheckupdatedheads(op, inpart):
1649 """check for race on the heads touched by a push
1649 """check for race on the heads touched by a push
1650
1650
1651 This is similar to 'check:heads' but focus on the heads actually updated
1651 This is similar to 'check:heads' but focus on the heads actually updated
1652 during the push. If other activities happen on unrelated heads, it is
1652 during the push. If other activities happen on unrelated heads, it is
1653 ignored.
1653 ignored.
1654
1654
1655 This allow server with high traffic to avoid push contention as long as
1655 This allow server with high traffic to avoid push contention as long as
1656 unrelated parts of the graph are involved."""
1656 unrelated parts of the graph are involved."""
1657 h = inpart.read(20)
1657 h = inpart.read(20)
1658 heads = []
1658 heads = []
1659 while len(h) == 20:
1659 while len(h) == 20:
1660 heads.append(h)
1660 heads.append(h)
1661 h = inpart.read(20)
1661 h = inpart.read(20)
1662 assert not h
1662 assert not h
1663 # trigger a transaction so that we are guaranteed to have the lock now.
1663 # trigger a transaction so that we are guaranteed to have the lock now.
1664 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1664 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1665 op.gettransaction()
1665 op.gettransaction()
1666
1666
1667 currentheads = set()
1667 currentheads = set()
1668 for ls in op.repo.branchmap().itervalues():
1668 for ls in op.repo.branchmap().itervalues():
1669 currentheads.update(ls)
1669 currentheads.update(ls)
1670
1670
1671 for h in heads:
1671 for h in heads:
1672 if h not in currentheads:
1672 if h not in currentheads:
1673 raise error.PushRaced('repository changed while pushing - '
1673 raise error.PushRaced('repository changed while pushing - '
1674 'please try again')
1674 'please try again')
1675
1675
1676 @parthandler('output')
1676 @parthandler('output')
1677 def handleoutput(op, inpart):
1677 def handleoutput(op, inpart):
1678 """forward output captured on the server to the client"""
1678 """forward output captured on the server to the client"""
1679 for line in inpart.read().splitlines():
1679 for line in inpart.read().splitlines():
1680 op.ui.status(_('remote: %s\n') % line)
1680 op.ui.status(_('remote: %s\n') % line)
1681
1681
1682 @parthandler('replycaps')
1682 @parthandler('replycaps')
1683 def handlereplycaps(op, inpart):
1683 def handlereplycaps(op, inpart):
1684 """Notify that a reply bundle should be created
1684 """Notify that a reply bundle should be created
1685
1685
1686 The payload contains the capabilities information for the reply"""
1686 The payload contains the capabilities information for the reply"""
1687 caps = decodecaps(inpart.read())
1687 caps = decodecaps(inpart.read())
1688 if op.reply is None:
1688 if op.reply is None:
1689 op.reply = bundle20(op.ui, caps)
1689 op.reply = bundle20(op.ui, caps)
1690
1690
1691 class AbortFromPart(error.Abort):
1691 class AbortFromPart(error.Abort):
1692 """Sub-class of Abort that denotes an error from a bundle2 part."""
1692 """Sub-class of Abort that denotes an error from a bundle2 part."""
1693
1693
1694 @parthandler('error:abort', ('message', 'hint'))
1694 @parthandler('error:abort', ('message', 'hint'))
1695 def handleerrorabort(op, inpart):
1695 def handleerrorabort(op, inpart):
1696 """Used to transmit abort error over the wire"""
1696 """Used to transmit abort error over the wire"""
1697 raise AbortFromPart(inpart.params['message'],
1697 raise AbortFromPart(inpart.params['message'],
1698 hint=inpart.params.get('hint'))
1698 hint=inpart.params.get('hint'))
1699
1699
1700 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1700 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1701 'in-reply-to'))
1701 'in-reply-to'))
1702 def handleerrorpushkey(op, inpart):
1702 def handleerrorpushkey(op, inpart):
1703 """Used to transmit failure of a mandatory pushkey over the wire"""
1703 """Used to transmit failure of a mandatory pushkey over the wire"""
1704 kwargs = {}
1704 kwargs = {}
1705 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1705 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1706 value = inpart.params.get(name)
1706 value = inpart.params.get(name)
1707 if value is not None:
1707 if value is not None:
1708 kwargs[name] = value
1708 kwargs[name] = value
1709 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1709 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1710
1710
1711 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1711 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1712 def handleerrorunsupportedcontent(op, inpart):
1712 def handleerrorunsupportedcontent(op, inpart):
1713 """Used to transmit unknown content error over the wire"""
1713 """Used to transmit unknown content error over the wire"""
1714 kwargs = {}
1714 kwargs = {}
1715 parttype = inpart.params.get('parttype')
1715 parttype = inpart.params.get('parttype')
1716 if parttype is not None:
1716 if parttype is not None:
1717 kwargs['parttype'] = parttype
1717 kwargs['parttype'] = parttype
1718 params = inpart.params.get('params')
1718 params = inpart.params.get('params')
1719 if params is not None:
1719 if params is not None:
1720 kwargs['params'] = params.split('\0')
1720 kwargs['params'] = params.split('\0')
1721
1721
1722 raise error.BundleUnknownFeatureError(**kwargs)
1722 raise error.BundleUnknownFeatureError(**kwargs)
1723
1723
1724 @parthandler('error:pushraced', ('message',))
1724 @parthandler('error:pushraced', ('message',))
1725 def handleerrorpushraced(op, inpart):
1725 def handleerrorpushraced(op, inpart):
1726 """Used to transmit push race error over the wire"""
1726 """Used to transmit push race error over the wire"""
1727 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1727 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1728
1728
1729 @parthandler('listkeys', ('namespace',))
1729 @parthandler('listkeys', ('namespace',))
1730 def handlelistkeys(op, inpart):
1730 def handlelistkeys(op, inpart):
1731 """retrieve pushkey namespace content stored in a bundle2"""
1731 """retrieve pushkey namespace content stored in a bundle2"""
1732 namespace = inpart.params['namespace']
1732 namespace = inpart.params['namespace']
1733 r = pushkey.decodekeys(inpart.read())
1733 r = pushkey.decodekeys(inpart.read())
1734 op.records.add('listkeys', (namespace, r))
1734 op.records.add('listkeys', (namespace, r))
1735
1735
1736 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1736 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1737 def handlepushkey(op, inpart):
1737 def handlepushkey(op, inpart):
1738 """process a pushkey request"""
1738 """process a pushkey request"""
1739 dec = pushkey.decode
1739 dec = pushkey.decode
1740 namespace = dec(inpart.params['namespace'])
1740 namespace = dec(inpart.params['namespace'])
1741 key = dec(inpart.params['key'])
1741 key = dec(inpart.params['key'])
1742 old = dec(inpart.params['old'])
1742 old = dec(inpart.params['old'])
1743 new = dec(inpart.params['new'])
1743 new = dec(inpart.params['new'])
1744 # Grab the transaction to ensure that we have the lock before performing the
1744 # Grab the transaction to ensure that we have the lock before performing the
1745 # pushkey.
1745 # pushkey.
1746 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1746 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1747 op.gettransaction()
1747 op.gettransaction()
1748 ret = op.repo.pushkey(namespace, key, old, new)
1748 ret = op.repo.pushkey(namespace, key, old, new)
1749 record = {'namespace': namespace,
1749 record = {'namespace': namespace,
1750 'key': key,
1750 'key': key,
1751 'old': old,
1751 'old': old,
1752 'new': new}
1752 'new': new}
1753 op.records.add('pushkey', record)
1753 op.records.add('pushkey', record)
1754 if op.reply is not None:
1754 if op.reply is not None:
1755 rpart = op.reply.newpart('reply:pushkey')
1755 rpart = op.reply.newpart('reply:pushkey')
1756 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1756 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1757 rpart.addparam('return', '%i' % ret, mandatory=False)
1757 rpart.addparam('return', '%i' % ret, mandatory=False)
1758 if inpart.mandatory and not ret:
1758 if inpart.mandatory and not ret:
1759 kwargs = {}
1759 kwargs = {}
1760 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1760 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1761 if key in inpart.params:
1761 if key in inpart.params:
1762 kwargs[key] = inpart.params[key]
1762 kwargs[key] = inpart.params[key]
1763 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1763 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1764
1764
1765 def _readphaseheads(inpart):
1765 def _readphaseheads(inpart):
1766 headsbyphase = [[] for i in phases.allphases]
1766 headsbyphase = [[] for i in phases.allphases]
1767 entrysize = struct.calcsize(_fphasesentry)
1767 entrysize = struct.calcsize(_fphasesentry)
1768 while True:
1768 while True:
1769 entry = inpart.read(entrysize)
1769 entry = inpart.read(entrysize)
1770 if len(entry) < entrysize:
1770 if len(entry) < entrysize:
1771 if entry:
1771 if entry:
1772 raise error.Abort(_('bad phase-heads bundle part'))
1772 raise error.Abort(_('bad phase-heads bundle part'))
1773 break
1773 break
1774 phase, node = struct.unpack(_fphasesentry, entry)
1774 phase, node = struct.unpack(_fphasesentry, entry)
1775 headsbyphase[phase].append(node)
1775 headsbyphase[phase].append(node)
1776 return headsbyphase
1776 return headsbyphase
1777
1777
1778 @parthandler('phase-heads')
1778 @parthandler('phase-heads')
1779 def handlephases(op, inpart):
1779 def handlephases(op, inpart):
1780 """apply phases from bundle part to repo"""
1780 """apply phases from bundle part to repo"""
1781 headsbyphase = _readphaseheads(inpart)
1781 headsbyphase = _readphaseheads(inpart)
1782 addednodes = []
1782 addednodes = []
1783 for entry in op.records['changegroup']:
1783 for entry in op.records['changegroup']:
1784 addednodes.extend(entry['addednodes'])
1784 addednodes.extend(entry['addednodes'])
1785 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase,
1785 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase,
1786 addednodes)
1786 addednodes)
1787
1787
1788 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1788 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1789 def handlepushkeyreply(op, inpart):
1789 def handlepushkeyreply(op, inpart):
1790 """retrieve the result of a pushkey request"""
1790 """retrieve the result of a pushkey request"""
1791 ret = int(inpart.params['return'])
1791 ret = int(inpart.params['return'])
1792 partid = int(inpart.params['in-reply-to'])
1792 partid = int(inpart.params['in-reply-to'])
1793 op.records.add('pushkey', {'return': ret}, partid)
1793 op.records.add('pushkey', {'return': ret}, partid)
1794
1794
1795 @parthandler('obsmarkers')
1795 @parthandler('obsmarkers')
1796 def handleobsmarker(op, inpart):
1796 def handleobsmarker(op, inpart):
1797 """add a stream of obsmarkers to the repo"""
1797 """add a stream of obsmarkers to the repo"""
1798 tr = op.gettransaction()
1798 tr = op.gettransaction()
1799 markerdata = inpart.read()
1799 markerdata = inpart.read()
1800 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1800 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1801 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1801 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1802 % len(markerdata))
1802 % len(markerdata))
1803 # The mergemarkers call will crash if marker creation is not enabled.
1803 # The mergemarkers call will crash if marker creation is not enabled.
1804 # we want to avoid this if the part is advisory.
1804 # we want to avoid this if the part is advisory.
1805 if not inpart.mandatory and op.repo.obsstore.readonly:
1805 if not inpart.mandatory and op.repo.obsstore.readonly:
1806 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1806 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1807 return
1807 return
1808 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1808 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1809 op.repo.invalidatevolatilesets()
1809 op.repo.invalidatevolatilesets()
1810 if new:
1810 if new:
1811 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1811 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1812 op.records.add('obsmarkers', {'new': new})
1812 op.records.add('obsmarkers', {'new': new})
1813 if op.reply is not None:
1813 if op.reply is not None:
1814 rpart = op.reply.newpart('reply:obsmarkers')
1814 rpart = op.reply.newpart('reply:obsmarkers')
1815 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1815 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1816 rpart.addparam('new', '%i' % new, mandatory=False)
1816 rpart.addparam('new', '%i' % new, mandatory=False)
1817
1817
1818
1818
1819 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1819 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1820 def handleobsmarkerreply(op, inpart):
1820 def handleobsmarkerreply(op, inpart):
1821 """retrieve the result of a pushkey request"""
1821 """retrieve the result of a pushkey request"""
1822 ret = int(inpart.params['new'])
1822 ret = int(inpart.params['new'])
1823 partid = int(inpart.params['in-reply-to'])
1823 partid = int(inpart.params['in-reply-to'])
1824 op.records.add('obsmarkers', {'new': ret}, partid)
1824 op.records.add('obsmarkers', {'new': ret}, partid)
1825
1825
1826 @parthandler('hgtagsfnodes')
1826 @parthandler('hgtagsfnodes')
1827 def handlehgtagsfnodes(op, inpart):
1827 def handlehgtagsfnodes(op, inpart):
1828 """Applies .hgtags fnodes cache entries to the local repo.
1828 """Applies .hgtags fnodes cache entries to the local repo.
1829
1829
1830 Payload is pairs of 20 byte changeset nodes and filenodes.
1830 Payload is pairs of 20 byte changeset nodes and filenodes.
1831 """
1831 """
1832 # Grab the transaction so we ensure that we have the lock at this point.
1832 # Grab the transaction so we ensure that we have the lock at this point.
1833 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1833 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1834 op.gettransaction()
1834 op.gettransaction()
1835 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1835 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1836
1836
1837 count = 0
1837 count = 0
1838 while True:
1838 while True:
1839 node = inpart.read(20)
1839 node = inpart.read(20)
1840 fnode = inpart.read(20)
1840 fnode = inpart.read(20)
1841 if len(node) < 20 or len(fnode) < 20:
1841 if len(node) < 20 or len(fnode) < 20:
1842 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1842 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1843 break
1843 break
1844 cache.setfnode(node, fnode)
1844 cache.setfnode(node, fnode)
1845 count += 1
1845 count += 1
1846
1846
1847 cache.write()
1847 cache.write()
1848 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1848 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now