##// END OF EJS Templates
merge with stable
Augie Fackler -
r32386:655f1e2c merge default
parent child Browse files
Show More
@@ -1,1731 +1,1732 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 pushkey,
161 pushkey,
162 pycompat,
162 pycompat,
163 tags,
163 tags,
164 url,
164 url,
165 util,
165 util,
166 )
166 )
167
167
168 urlerr = util.urlerr
168 urlerr = util.urlerr
169 urlreq = util.urlreq
169 urlreq = util.urlreq
170
170
171 _pack = struct.pack
171 _pack = struct.pack
172 _unpack = struct.unpack
172 _unpack = struct.unpack
173
173
174 _fstreamparamsize = '>i'
174 _fstreamparamsize = '>i'
175 _fpartheadersize = '>i'
175 _fpartheadersize = '>i'
176 _fparttypesize = '>B'
176 _fparttypesize = '>B'
177 _fpartid = '>I'
177 _fpartid = '>I'
178 _fpayloadsize = '>i'
178 _fpayloadsize = '>i'
179 _fpartparamcount = '>BB'
179 _fpartparamcount = '>BB'
180
180
181 preferedchunksize = 4096
181 preferedchunksize = 4096
182
182
183 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
183 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
184
184
185 def outdebug(ui, message):
185 def outdebug(ui, message):
186 """debug regarding output stream (bundling)"""
186 """debug regarding output stream (bundling)"""
187 if ui.configbool('devel', 'bundle2.debug', False):
187 if ui.configbool('devel', 'bundle2.debug', False):
188 ui.debug('bundle2-output: %s\n' % message)
188 ui.debug('bundle2-output: %s\n' % message)
189
189
190 def indebug(ui, message):
190 def indebug(ui, message):
191 """debug on input stream (unbundling)"""
191 """debug on input stream (unbundling)"""
192 if ui.configbool('devel', 'bundle2.debug', False):
192 if ui.configbool('devel', 'bundle2.debug', False):
193 ui.debug('bundle2-input: %s\n' % message)
193 ui.debug('bundle2-input: %s\n' % message)
194
194
195 def validateparttype(parttype):
195 def validateparttype(parttype):
196 """raise ValueError if a parttype contains invalid character"""
196 """raise ValueError if a parttype contains invalid character"""
197 if _parttypeforbidden.search(parttype):
197 if _parttypeforbidden.search(parttype):
198 raise ValueError(parttype)
198 raise ValueError(parttype)
199
199
200 def _makefpartparamsizes(nbparams):
200 def _makefpartparamsizes(nbparams):
201 """return a struct format to read part parameter sizes
201 """return a struct format to read part parameter sizes
202
202
203 The number parameters is variable so we need to build that format
203 The number parameters is variable so we need to build that format
204 dynamically.
204 dynamically.
205 """
205 """
206 return '>'+('BB'*nbparams)
206 return '>'+('BB'*nbparams)
207
207
208 parthandlermapping = {}
208 parthandlermapping = {}
209
209
210 def parthandler(parttype, params=()):
210 def parthandler(parttype, params=()):
211 """decorator that register a function as a bundle2 part handler
211 """decorator that register a function as a bundle2 part handler
212
212
213 eg::
213 eg::
214
214
215 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
215 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
216 def myparttypehandler(...):
216 def myparttypehandler(...):
217 '''process a part of type "my part".'''
217 '''process a part of type "my part".'''
218 ...
218 ...
219 """
219 """
220 validateparttype(parttype)
220 validateparttype(parttype)
221 def _decorator(func):
221 def _decorator(func):
222 lparttype = parttype.lower() # enforce lower case matching.
222 lparttype = parttype.lower() # enforce lower case matching.
223 assert lparttype not in parthandlermapping
223 assert lparttype not in parthandlermapping
224 parthandlermapping[lparttype] = func
224 parthandlermapping[lparttype] = func
225 func.params = frozenset(params)
225 func.params = frozenset(params)
226 return func
226 return func
227 return _decorator
227 return _decorator
228
228
229 class unbundlerecords(object):
229 class unbundlerecords(object):
230 """keep record of what happens during and unbundle
230 """keep record of what happens during and unbundle
231
231
232 New records are added using `records.add('cat', obj)`. Where 'cat' is a
232 New records are added using `records.add('cat', obj)`. Where 'cat' is a
233 category of record and obj is an arbitrary object.
233 category of record and obj is an arbitrary object.
234
234
235 `records['cat']` will return all entries of this category 'cat'.
235 `records['cat']` will return all entries of this category 'cat'.
236
236
237 Iterating on the object itself will yield `('category', obj)` tuples
237 Iterating on the object itself will yield `('category', obj)` tuples
238 for all entries.
238 for all entries.
239
239
240 All iterations happens in chronological order.
240 All iterations happens in chronological order.
241 """
241 """
242
242
243 def __init__(self):
243 def __init__(self):
244 self._categories = {}
244 self._categories = {}
245 self._sequences = []
245 self._sequences = []
246 self._replies = {}
246 self._replies = {}
247
247
248 def add(self, category, entry, inreplyto=None):
248 def add(self, category, entry, inreplyto=None):
249 """add a new record of a given category.
249 """add a new record of a given category.
250
250
251 The entry can then be retrieved in the list returned by
251 The entry can then be retrieved in the list returned by
252 self['category']."""
252 self['category']."""
253 self._categories.setdefault(category, []).append(entry)
253 self._categories.setdefault(category, []).append(entry)
254 self._sequences.append((category, entry))
254 self._sequences.append((category, entry))
255 if inreplyto is not None:
255 if inreplyto is not None:
256 self.getreplies(inreplyto).add(category, entry)
256 self.getreplies(inreplyto).add(category, entry)
257
257
258 def getreplies(self, partid):
258 def getreplies(self, partid):
259 """get the records that are replies to a specific part"""
259 """get the records that are replies to a specific part"""
260 return self._replies.setdefault(partid, unbundlerecords())
260 return self._replies.setdefault(partid, unbundlerecords())
261
261
262 def __getitem__(self, cat):
262 def __getitem__(self, cat):
263 return tuple(self._categories.get(cat, ()))
263 return tuple(self._categories.get(cat, ()))
264
264
265 def __iter__(self):
265 def __iter__(self):
266 return iter(self._sequences)
266 return iter(self._sequences)
267
267
268 def __len__(self):
268 def __len__(self):
269 return len(self._sequences)
269 return len(self._sequences)
270
270
271 def __nonzero__(self):
271 def __nonzero__(self):
272 return bool(self._sequences)
272 return bool(self._sequences)
273
273
274 __bool__ = __nonzero__
274 __bool__ = __nonzero__
275
275
276 class bundleoperation(object):
276 class bundleoperation(object):
277 """an object that represents a single bundling process
277 """an object that represents a single bundling process
278
278
279 Its purpose is to carry unbundle-related objects and states.
279 Its purpose is to carry unbundle-related objects and states.
280
280
281 A new object should be created at the beginning of each bundle processing.
281 A new object should be created at the beginning of each bundle processing.
282 The object is to be returned by the processing function.
282 The object is to be returned by the processing function.
283
283
284 The object has very little content now it will ultimately contain:
284 The object has very little content now it will ultimately contain:
285 * an access to the repo the bundle is applied to,
285 * an access to the repo the bundle is applied to,
286 * a ui object,
286 * a ui object,
287 * a way to retrieve a transaction to add changes to the repo,
287 * a way to retrieve a transaction to add changes to the repo,
288 * a way to record the result of processing each part,
288 * a way to record the result of processing each part,
289 * a way to construct a bundle response when applicable.
289 * a way to construct a bundle response when applicable.
290 """
290 """
291
291
292 def __init__(self, repo, transactiongetter, captureoutput=True):
292 def __init__(self, repo, transactiongetter, captureoutput=True):
293 self.repo = repo
293 self.repo = repo
294 self.ui = repo.ui
294 self.ui = repo.ui
295 self.records = unbundlerecords()
295 self.records = unbundlerecords()
296 self.gettransaction = transactiongetter
296 self.gettransaction = transactiongetter
297 self.reply = None
297 self.reply = None
298 self.captureoutput = captureoutput
298 self.captureoutput = captureoutput
299
299
300 class TransactionUnavailable(RuntimeError):
300 class TransactionUnavailable(RuntimeError):
301 pass
301 pass
302
302
303 def _notransaction():
303 def _notransaction():
304 """default method to get a transaction while processing a bundle
304 """default method to get a transaction while processing a bundle
305
305
306 Raise an exception to highlight the fact that no transaction was expected
306 Raise an exception to highlight the fact that no transaction was expected
307 to be created"""
307 to be created"""
308 raise TransactionUnavailable()
308 raise TransactionUnavailable()
309
309
310 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
310 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
311 # transform me into unbundler.apply() as soon as the freeze is lifted
311 # transform me into unbundler.apply() as soon as the freeze is lifted
312 tr.hookargs['bundle2'] = '1'
312 tr.hookargs['bundle2'] = '1'
313 if source is not None and 'source' not in tr.hookargs:
313 if source is not None and 'source' not in tr.hookargs:
314 tr.hookargs['source'] = source
314 tr.hookargs['source'] = source
315 if url is not None and 'url' not in tr.hookargs:
315 if url is not None and 'url' not in tr.hookargs:
316 tr.hookargs['url'] = url
316 tr.hookargs['url'] = url
317 return processbundle(repo, unbundler, lambda: tr, op=op)
317 return processbundle(repo, unbundler, lambda: tr, op=op)
318
318
319 def processbundle(repo, unbundler, transactiongetter=None, op=None):
319 def processbundle(repo, unbundler, transactiongetter=None, op=None):
320 """This function process a bundle, apply effect to/from a repo
320 """This function process a bundle, apply effect to/from a repo
321
321
322 It iterates over each part then searches for and uses the proper handling
322 It iterates over each part then searches for and uses the proper handling
323 code to process the part. Parts are processed in order.
323 code to process the part. Parts are processed in order.
324
324
325 Unknown Mandatory part will abort the process.
325 Unknown Mandatory part will abort the process.
326
326
327 It is temporarily possible to provide a prebuilt bundleoperation to the
327 It is temporarily possible to provide a prebuilt bundleoperation to the
328 function. This is used to ensure output is properly propagated in case of
328 function. This is used to ensure output is properly propagated in case of
329 an error during the unbundling. This output capturing part will likely be
329 an error during the unbundling. This output capturing part will likely be
330 reworked and this ability will probably go away in the process.
330 reworked and this ability will probably go away in the process.
331 """
331 """
332 if op is None:
332 if op is None:
333 if transactiongetter is None:
333 if transactiongetter is None:
334 transactiongetter = _notransaction
334 transactiongetter = _notransaction
335 op = bundleoperation(repo, transactiongetter)
335 op = bundleoperation(repo, transactiongetter)
336 # todo:
336 # todo:
337 # - replace this is a init function soon.
337 # - replace this is a init function soon.
338 # - exception catching
338 # - exception catching
339 unbundler.params
339 unbundler.params
340 if repo.ui.debugflag:
340 if repo.ui.debugflag:
341 msg = ['bundle2-input-bundle:']
341 msg = ['bundle2-input-bundle:']
342 if unbundler.params:
342 if unbundler.params:
343 msg.append(' %i params')
343 msg.append(' %i params')
344 if op.gettransaction is None:
344 if op.gettransaction is None:
345 msg.append(' no-transaction')
345 msg.append(' no-transaction')
346 else:
346 else:
347 msg.append(' with-transaction')
347 msg.append(' with-transaction')
348 msg.append('\n')
348 msg.append('\n')
349 repo.ui.debug(''.join(msg))
349 repo.ui.debug(''.join(msg))
350 iterparts = enumerate(unbundler.iterparts())
350 iterparts = enumerate(unbundler.iterparts())
351 part = None
351 part = None
352 nbpart = 0
352 nbpart = 0
353 try:
353 try:
354 for nbpart, part in iterparts:
354 for nbpart, part in iterparts:
355 _processpart(op, part)
355 _processpart(op, part)
356 except Exception as exc:
356 except Exception as exc:
357 # Any exceptions seeking to the end of the bundle at this point are
357 # Any exceptions seeking to the end of the bundle at this point are
358 # almost certainly related to the underlying stream being bad.
358 # almost certainly related to the underlying stream being bad.
359 # And, chances are that the exception we're handling is related to
359 # And, chances are that the exception we're handling is related to
360 # getting in that bad state. So, we swallow the seeking error and
360 # getting in that bad state. So, we swallow the seeking error and
361 # re-raise the original error.
361 # re-raise the original error.
362 seekerror = False
362 seekerror = False
363 try:
363 try:
364 for nbpart, part in iterparts:
364 for nbpart, part in iterparts:
365 # consume the bundle content
365 # consume the bundle content
366 part.seek(0, 2)
366 part.seek(0, 2)
367 except Exception:
367 except Exception:
368 seekerror = True
368 seekerror = True
369
369
370 # Small hack to let caller code distinguish exceptions from bundle2
370 # Small hack to let caller code distinguish exceptions from bundle2
371 # processing from processing the old format. This is mostly
371 # processing from processing the old format. This is mostly
372 # needed to handle different return codes to unbundle according to the
372 # needed to handle different return codes to unbundle according to the
373 # type of bundle. We should probably clean up or drop this return code
373 # type of bundle. We should probably clean up or drop this return code
374 # craziness in a future version.
374 # craziness in a future version.
375 exc.duringunbundle2 = True
375 exc.duringunbundle2 = True
376 salvaged = []
376 salvaged = []
377 replycaps = None
377 replycaps = None
378 if op.reply is not None:
378 if op.reply is not None:
379 salvaged = op.reply.salvageoutput()
379 salvaged = op.reply.salvageoutput()
380 replycaps = op.reply.capabilities
380 replycaps = op.reply.capabilities
381 exc._replycaps = replycaps
381 exc._replycaps = replycaps
382 exc._bundle2salvagedoutput = salvaged
382 exc._bundle2salvagedoutput = salvaged
383
383
384 # Re-raising from a variable loses the original stack. So only use
384 # Re-raising from a variable loses the original stack. So only use
385 # that form if we need to.
385 # that form if we need to.
386 if seekerror:
386 if seekerror:
387 raise exc
387 raise exc
388 else:
388 else:
389 raise
389 raise
390 finally:
390 finally:
391 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
391 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
392
392
393 return op
393 return op
394
394
395 def _processpart(op, part):
395 def _processpart(op, part):
396 """process a single part from a bundle
396 """process a single part from a bundle
397
397
398 The part is guaranteed to have been fully consumed when the function exits
398 The part is guaranteed to have been fully consumed when the function exits
399 (even if an exception is raised)."""
399 (even if an exception is raised)."""
400 status = 'unknown' # used by debug output
400 status = 'unknown' # used by debug output
401 hardabort = False
401 hardabort = False
402 try:
402 try:
403 try:
403 try:
404 handler = parthandlermapping.get(part.type)
404 handler = parthandlermapping.get(part.type)
405 if handler is None:
405 if handler is None:
406 status = 'unsupported-type'
406 status = 'unsupported-type'
407 raise error.BundleUnknownFeatureError(parttype=part.type)
407 raise error.BundleUnknownFeatureError(parttype=part.type)
408 indebug(op.ui, 'found a handler for part %r' % part.type)
408 indebug(op.ui, 'found a handler for part %r' % part.type)
409 unknownparams = part.mandatorykeys - handler.params
409 unknownparams = part.mandatorykeys - handler.params
410 if unknownparams:
410 if unknownparams:
411 unknownparams = list(unknownparams)
411 unknownparams = list(unknownparams)
412 unknownparams.sort()
412 unknownparams.sort()
413 status = 'unsupported-params (%s)' % unknownparams
413 status = 'unsupported-params (%s)' % unknownparams
414 raise error.BundleUnknownFeatureError(parttype=part.type,
414 raise error.BundleUnknownFeatureError(parttype=part.type,
415 params=unknownparams)
415 params=unknownparams)
416 status = 'supported'
416 status = 'supported'
417 except error.BundleUnknownFeatureError as exc:
417 except error.BundleUnknownFeatureError as exc:
418 if part.mandatory: # mandatory parts
418 if part.mandatory: # mandatory parts
419 raise
419 raise
420 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
420 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
421 return # skip to part processing
421 return # skip to part processing
422 finally:
422 finally:
423 if op.ui.debugflag:
423 if op.ui.debugflag:
424 msg = ['bundle2-input-part: "%s"' % part.type]
424 msg = ['bundle2-input-part: "%s"' % part.type]
425 if not part.mandatory:
425 if not part.mandatory:
426 msg.append(' (advisory)')
426 msg.append(' (advisory)')
427 nbmp = len(part.mandatorykeys)
427 nbmp = len(part.mandatorykeys)
428 nbap = len(part.params) - nbmp
428 nbap = len(part.params) - nbmp
429 if nbmp or nbap:
429 if nbmp or nbap:
430 msg.append(' (params:')
430 msg.append(' (params:')
431 if nbmp:
431 if nbmp:
432 msg.append(' %i mandatory' % nbmp)
432 msg.append(' %i mandatory' % nbmp)
433 if nbap:
433 if nbap:
434 msg.append(' %i advisory' % nbmp)
434 msg.append(' %i advisory' % nbmp)
435 msg.append(')')
435 msg.append(')')
436 msg.append(' %s\n' % status)
436 msg.append(' %s\n' % status)
437 op.ui.debug(''.join(msg))
437 op.ui.debug(''.join(msg))
438
438
439 # handler is called outside the above try block so that we don't
439 # handler is called outside the above try block so that we don't
440 # risk catching KeyErrors from anything other than the
440 # risk catching KeyErrors from anything other than the
441 # parthandlermapping lookup (any KeyError raised by handler()
441 # parthandlermapping lookup (any KeyError raised by handler()
442 # itself represents a defect of a different variety).
442 # itself represents a defect of a different variety).
443 output = None
443 output = None
444 if op.captureoutput and op.reply is not None:
444 if op.captureoutput and op.reply is not None:
445 op.ui.pushbuffer(error=True, subproc=True)
445 op.ui.pushbuffer(error=True, subproc=True)
446 output = ''
446 output = ''
447 try:
447 try:
448 handler(op, part)
448 handler(op, part)
449 finally:
449 finally:
450 if output is not None:
450 if output is not None:
451 output = op.ui.popbuffer()
451 output = op.ui.popbuffer()
452 if output:
452 if output:
453 outpart = op.reply.newpart('output', data=output,
453 outpart = op.reply.newpart('output', data=output,
454 mandatory=False)
454 mandatory=False)
455 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
455 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
456 # If exiting or interrupted, do not attempt to seek the stream in the
456 # If exiting or interrupted, do not attempt to seek the stream in the
457 # finally block below. This makes abort faster.
457 # finally block below. This makes abort faster.
458 except (SystemExit, KeyboardInterrupt):
458 except (SystemExit, KeyboardInterrupt):
459 hardabort = True
459 hardabort = True
460 raise
460 raise
461 finally:
461 finally:
462 # consume the part content to not corrupt the stream.
462 # consume the part content to not corrupt the stream.
463 if not hardabort:
463 if not hardabort:
464 part.seek(0, 2)
464 part.seek(0, 2)
465
465
466
466
467 def decodecaps(blob):
467 def decodecaps(blob):
468 """decode a bundle2 caps bytes blob into a dictionary
468 """decode a bundle2 caps bytes blob into a dictionary
469
469
470 The blob is a list of capabilities (one per line)
470 The blob is a list of capabilities (one per line)
471 Capabilities may have values using a line of the form::
471 Capabilities may have values using a line of the form::
472
472
473 capability=value1,value2,value3
473 capability=value1,value2,value3
474
474
475 The values are always a list."""
475 The values are always a list."""
476 caps = {}
476 caps = {}
477 for line in blob.splitlines():
477 for line in blob.splitlines():
478 if not line:
478 if not line:
479 continue
479 continue
480 if '=' not in line:
480 if '=' not in line:
481 key, vals = line, ()
481 key, vals = line, ()
482 else:
482 else:
483 key, vals = line.split('=', 1)
483 key, vals = line.split('=', 1)
484 vals = vals.split(',')
484 vals = vals.split(',')
485 key = urlreq.unquote(key)
485 key = urlreq.unquote(key)
486 vals = [urlreq.unquote(v) for v in vals]
486 vals = [urlreq.unquote(v) for v in vals]
487 caps[key] = vals
487 caps[key] = vals
488 return caps
488 return caps
489
489
490 def encodecaps(caps):
490 def encodecaps(caps):
491 """encode a bundle2 caps dictionary into a bytes blob"""
491 """encode a bundle2 caps dictionary into a bytes blob"""
492 chunks = []
492 chunks = []
493 for ca in sorted(caps):
493 for ca in sorted(caps):
494 vals = caps[ca]
494 vals = caps[ca]
495 ca = urlreq.quote(ca)
495 ca = urlreq.quote(ca)
496 vals = [urlreq.quote(v) for v in vals]
496 vals = [urlreq.quote(v) for v in vals]
497 if vals:
497 if vals:
498 ca = "%s=%s" % (ca, ','.join(vals))
498 ca = "%s=%s" % (ca, ','.join(vals))
499 chunks.append(ca)
499 chunks.append(ca)
500 return '\n'.join(chunks)
500 return '\n'.join(chunks)
501
501
502 bundletypes = {
502 bundletypes = {
503 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
503 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
504 # since the unification ssh accepts a header but there
504 # since the unification ssh accepts a header but there
505 # is no capability signaling it.
505 # is no capability signaling it.
506 "HG20": (), # special-cased below
506 "HG20": (), # special-cased below
507 "HG10UN": ("HG10UN", 'UN'),
507 "HG10UN": ("HG10UN", 'UN'),
508 "HG10BZ": ("HG10", 'BZ'),
508 "HG10BZ": ("HG10", 'BZ'),
509 "HG10GZ": ("HG10GZ", 'GZ'),
509 "HG10GZ": ("HG10GZ", 'GZ'),
510 }
510 }
511
511
512 # hgweb uses this list to communicate its preferred type
512 # hgweb uses this list to communicate its preferred type
513 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
513 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
514
514
515 class bundle20(object):
515 class bundle20(object):
516 """represent an outgoing bundle2 container
516 """represent an outgoing bundle2 container
517
517
518 Use the `addparam` method to add stream level parameter. and `newpart` to
518 Use the `addparam` method to add stream level parameter. and `newpart` to
519 populate it. Then call `getchunks` to retrieve all the binary chunks of
519 populate it. Then call `getchunks` to retrieve all the binary chunks of
520 data that compose the bundle2 container."""
520 data that compose the bundle2 container."""
521
521
522 _magicstring = 'HG20'
522 _magicstring = 'HG20'
523
523
524 def __init__(self, ui, capabilities=()):
524 def __init__(self, ui, capabilities=()):
525 self.ui = ui
525 self.ui = ui
526 self._params = []
526 self._params = []
527 self._parts = []
527 self._parts = []
528 self.capabilities = dict(capabilities)
528 self.capabilities = dict(capabilities)
529 self._compengine = util.compengines.forbundletype('UN')
529 self._compengine = util.compengines.forbundletype('UN')
530 self._compopts = None
530 self._compopts = None
531
531
532 def setcompression(self, alg, compopts=None):
532 def setcompression(self, alg, compopts=None):
533 """setup core part compression to <alg>"""
533 """setup core part compression to <alg>"""
534 if alg in (None, 'UN'):
534 if alg in (None, 'UN'):
535 return
535 return
536 assert not any(n.lower() == 'compression' for n, v in self._params)
536 assert not any(n.lower() == 'compression' for n, v in self._params)
537 self.addparam('Compression', alg)
537 self.addparam('Compression', alg)
538 self._compengine = util.compengines.forbundletype(alg)
538 self._compengine = util.compengines.forbundletype(alg)
539 self._compopts = compopts
539 self._compopts = compopts
540
540
541 @property
541 @property
542 def nbparts(self):
542 def nbparts(self):
543 """total number of parts added to the bundler"""
543 """total number of parts added to the bundler"""
544 return len(self._parts)
544 return len(self._parts)
545
545
546 # methods used to defines the bundle2 content
546 # methods used to defines the bundle2 content
547 def addparam(self, name, value=None):
547 def addparam(self, name, value=None):
548 """add a stream level parameter"""
548 """add a stream level parameter"""
549 if not name:
549 if not name:
550 raise ValueError('empty parameter name')
550 raise ValueError('empty parameter name')
551 if name[0] not in string.letters:
551 if name[0] not in string.letters:
552 raise ValueError('non letter first character: %r' % name)
552 raise ValueError('non letter first character: %r' % name)
553 self._params.append((name, value))
553 self._params.append((name, value))
554
554
555 def addpart(self, part):
555 def addpart(self, part):
556 """add a new part to the bundle2 container
556 """add a new part to the bundle2 container
557
557
558 Parts contains the actual applicative payload."""
558 Parts contains the actual applicative payload."""
559 assert part.id is None
559 assert part.id is None
560 part.id = len(self._parts) # very cheap counter
560 part.id = len(self._parts) # very cheap counter
561 self._parts.append(part)
561 self._parts.append(part)
562
562
563 def newpart(self, typeid, *args, **kwargs):
563 def newpart(self, typeid, *args, **kwargs):
564 """create a new part and add it to the containers
564 """create a new part and add it to the containers
565
565
566 As the part is directly added to the containers. For now, this means
566 As the part is directly added to the containers. For now, this means
567 that any failure to properly initialize the part after calling
567 that any failure to properly initialize the part after calling
568 ``newpart`` should result in a failure of the whole bundling process.
568 ``newpart`` should result in a failure of the whole bundling process.
569
569
570 You can still fall back to manually create and add if you need better
570 You can still fall back to manually create and add if you need better
571 control."""
571 control."""
572 part = bundlepart(typeid, *args, **kwargs)
572 part = bundlepart(typeid, *args, **kwargs)
573 self.addpart(part)
573 self.addpart(part)
574 return part
574 return part
575
575
576 # methods used to generate the bundle2 stream
576 # methods used to generate the bundle2 stream
577 def getchunks(self):
577 def getchunks(self):
578 if self.ui.debugflag:
578 if self.ui.debugflag:
579 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
579 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
580 if self._params:
580 if self._params:
581 msg.append(' (%i params)' % len(self._params))
581 msg.append(' (%i params)' % len(self._params))
582 msg.append(' %i parts total\n' % len(self._parts))
582 msg.append(' %i parts total\n' % len(self._parts))
583 self.ui.debug(''.join(msg))
583 self.ui.debug(''.join(msg))
584 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
584 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
585 yield self._magicstring
585 yield self._magicstring
586 param = self._paramchunk()
586 param = self._paramchunk()
587 outdebug(self.ui, 'bundle parameter: %s' % param)
587 outdebug(self.ui, 'bundle parameter: %s' % param)
588 yield _pack(_fstreamparamsize, len(param))
588 yield _pack(_fstreamparamsize, len(param))
589 if param:
589 if param:
590 yield param
590 yield param
591 for chunk in self._compengine.compressstream(self._getcorechunk(),
591 for chunk in self._compengine.compressstream(self._getcorechunk(),
592 self._compopts):
592 self._compopts):
593 yield chunk
593 yield chunk
594
594
595 def _paramchunk(self):
595 def _paramchunk(self):
596 """return a encoded version of all stream parameters"""
596 """return a encoded version of all stream parameters"""
597 blocks = []
597 blocks = []
598 for par, value in self._params:
598 for par, value in self._params:
599 par = urlreq.quote(par)
599 par = urlreq.quote(par)
600 if value is not None:
600 if value is not None:
601 value = urlreq.quote(value)
601 value = urlreq.quote(value)
602 par = '%s=%s' % (par, value)
602 par = '%s=%s' % (par, value)
603 blocks.append(par)
603 blocks.append(par)
604 return ' '.join(blocks)
604 return ' '.join(blocks)
605
605
606 def _getcorechunk(self):
606 def _getcorechunk(self):
607 """yield chunk for the core part of the bundle
607 """yield chunk for the core part of the bundle
608
608
609 (all but headers and parameters)"""
609 (all but headers and parameters)"""
610 outdebug(self.ui, 'start of parts')
610 outdebug(self.ui, 'start of parts')
611 for part in self._parts:
611 for part in self._parts:
612 outdebug(self.ui, 'bundle part: "%s"' % part.type)
612 outdebug(self.ui, 'bundle part: "%s"' % part.type)
613 for chunk in part.getchunks(ui=self.ui):
613 for chunk in part.getchunks(ui=self.ui):
614 yield chunk
614 yield chunk
615 outdebug(self.ui, 'end of bundle')
615 outdebug(self.ui, 'end of bundle')
616 yield _pack(_fpartheadersize, 0)
616 yield _pack(_fpartheadersize, 0)
617
617
618
618
619 def salvageoutput(self):
619 def salvageoutput(self):
620 """return a list with a copy of all output parts in the bundle
620 """return a list with a copy of all output parts in the bundle
621
621
622 This is meant to be used during error handling to make sure we preserve
622 This is meant to be used during error handling to make sure we preserve
623 server output"""
623 server output"""
624 salvaged = []
624 salvaged = []
625 for part in self._parts:
625 for part in self._parts:
626 if part.type.startswith('output'):
626 if part.type.startswith('output'):
627 salvaged.append(part.copy())
627 salvaged.append(part.copy())
628 return salvaged
628 return salvaged
629
629
630
630
631 class unpackermixin(object):
631 class unpackermixin(object):
632 """A mixin to extract bytes and struct data from a stream"""
632 """A mixin to extract bytes and struct data from a stream"""
633
633
634 def __init__(self, fp):
634 def __init__(self, fp):
635 self._fp = fp
635 self._fp = fp
636
636
637 def _unpack(self, format):
637 def _unpack(self, format):
638 """unpack this struct format from the stream
638 """unpack this struct format from the stream
639
639
640 This method is meant for internal usage by the bundle2 protocol only.
640 This method is meant for internal usage by the bundle2 protocol only.
641 They directly manipulate the low level stream including bundle2 level
641 They directly manipulate the low level stream including bundle2 level
642 instruction.
642 instruction.
643
643
644 Do not use it to implement higher-level logic or methods."""
644 Do not use it to implement higher-level logic or methods."""
645 data = self._readexact(struct.calcsize(format))
645 data = self._readexact(struct.calcsize(format))
646 return _unpack(format, data)
646 return _unpack(format, data)
647
647
648 def _readexact(self, size):
648 def _readexact(self, size):
649 """read exactly <size> bytes from the stream
649 """read exactly <size> bytes from the stream
650
650
651 This method is meant for internal usage by the bundle2 protocol only.
651 This method is meant for internal usage by the bundle2 protocol only.
652 They directly manipulate the low level stream including bundle2 level
652 They directly manipulate the low level stream including bundle2 level
653 instruction.
653 instruction.
654
654
655 Do not use it to implement higher-level logic or methods."""
655 Do not use it to implement higher-level logic or methods."""
656 return changegroup.readexactly(self._fp, size)
656 return changegroup.readexactly(self._fp, size)
657
657
658 def getunbundler(ui, fp, magicstring=None):
658 def getunbundler(ui, fp, magicstring=None):
659 """return a valid unbundler object for a given magicstring"""
659 """return a valid unbundler object for a given magicstring"""
660 if magicstring is None:
660 if magicstring is None:
661 magicstring = changegroup.readexactly(fp, 4)
661 magicstring = changegroup.readexactly(fp, 4)
662 magic, version = magicstring[0:2], magicstring[2:4]
662 magic, version = magicstring[0:2], magicstring[2:4]
663 if magic != 'HG':
663 if magic != 'HG':
664 raise error.Abort(_('not a Mercurial bundle'))
664 raise error.Abort(_('not a Mercurial bundle'))
665 unbundlerclass = formatmap.get(version)
665 unbundlerclass = formatmap.get(version)
666 if unbundlerclass is None:
666 if unbundlerclass is None:
667 raise error.Abort(_('unknown bundle version %s') % version)
667 raise error.Abort(_('unknown bundle version %s') % version)
668 unbundler = unbundlerclass(ui, fp)
668 unbundler = unbundlerclass(ui, fp)
669 indebug(ui, 'start processing of %s stream' % magicstring)
669 indebug(ui, 'start processing of %s stream' % magicstring)
670 return unbundler
670 return unbundler
671
671
672 class unbundle20(unpackermixin):
672 class unbundle20(unpackermixin):
673 """interpret a bundle2 stream
673 """interpret a bundle2 stream
674
674
675 This class is fed with a binary stream and yields parts through its
675 This class is fed with a binary stream and yields parts through its
676 `iterparts` methods."""
676 `iterparts` methods."""
677
677
678 _magicstring = 'HG20'
678 _magicstring = 'HG20'
679
679
680 def __init__(self, ui, fp):
680 def __init__(self, ui, fp):
681 """If header is specified, we do not read it out of the stream."""
681 """If header is specified, we do not read it out of the stream."""
682 self.ui = ui
682 self.ui = ui
683 self._compengine = util.compengines.forbundletype('UN')
683 self._compengine = util.compengines.forbundletype('UN')
684 self._compressed = None
684 self._compressed = None
685 super(unbundle20, self).__init__(fp)
685 super(unbundle20, self).__init__(fp)
686
686
687 @util.propertycache
687 @util.propertycache
688 def params(self):
688 def params(self):
689 """dictionary of stream level parameters"""
689 """dictionary of stream level parameters"""
690 indebug(self.ui, 'reading bundle2 stream parameters')
690 indebug(self.ui, 'reading bundle2 stream parameters')
691 params = {}
691 params = {}
692 paramssize = self._unpack(_fstreamparamsize)[0]
692 paramssize = self._unpack(_fstreamparamsize)[0]
693 if paramssize < 0:
693 if paramssize < 0:
694 raise error.BundleValueError('negative bundle param size: %i'
694 raise error.BundleValueError('negative bundle param size: %i'
695 % paramssize)
695 % paramssize)
696 if paramssize:
696 if paramssize:
697 params = self._readexact(paramssize)
697 params = self._readexact(paramssize)
698 params = self._processallparams(params)
698 params = self._processallparams(params)
699 return params
699 return params
700
700
701 def _processallparams(self, paramsblock):
701 def _processallparams(self, paramsblock):
702 """"""
702 """"""
703 params = util.sortdict()
703 params = util.sortdict()
704 for p in paramsblock.split(' '):
704 for p in paramsblock.split(' '):
705 p = p.split('=', 1)
705 p = p.split('=', 1)
706 p = [urlreq.unquote(i) for i in p]
706 p = [urlreq.unquote(i) for i in p]
707 if len(p) < 2:
707 if len(p) < 2:
708 p.append(None)
708 p.append(None)
709 self._processparam(*p)
709 self._processparam(*p)
710 params[p[0]] = p[1]
710 params[p[0]] = p[1]
711 return params
711 return params
712
712
713
713
714 def _processparam(self, name, value):
714 def _processparam(self, name, value):
715 """process a parameter, applying its effect if needed
715 """process a parameter, applying its effect if needed
716
716
717 Parameter starting with a lower case letter are advisory and will be
717 Parameter starting with a lower case letter are advisory and will be
718 ignored when unknown. Those starting with an upper case letter are
718 ignored when unknown. Those starting with an upper case letter are
719 mandatory and will this function will raise a KeyError when unknown.
719 mandatory and will this function will raise a KeyError when unknown.
720
720
721 Note: no option are currently supported. Any input will be either
721 Note: no option are currently supported. Any input will be either
722 ignored or failing.
722 ignored or failing.
723 """
723 """
724 if not name:
724 if not name:
725 raise ValueError('empty parameter name')
725 raise ValueError('empty parameter name')
726 if name[0] not in string.letters:
726 if name[0] not in string.letters:
727 raise ValueError('non letter first character: %r' % name)
727 raise ValueError('non letter first character: %r' % name)
728 try:
728 try:
729 handler = b2streamparamsmap[name.lower()]
729 handler = b2streamparamsmap[name.lower()]
730 except KeyError:
730 except KeyError:
731 if name[0].islower():
731 if name[0].islower():
732 indebug(self.ui, "ignoring unknown parameter %r" % name)
732 indebug(self.ui, "ignoring unknown parameter %r" % name)
733 else:
733 else:
734 raise error.BundleUnknownFeatureError(params=(name,))
734 raise error.BundleUnknownFeatureError(params=(name,))
735 else:
735 else:
736 handler(self, name, value)
736 handler(self, name, value)
737
737
738 def _forwardchunks(self):
738 def _forwardchunks(self):
739 """utility to transfer a bundle2 as binary
739 """utility to transfer a bundle2 as binary
740
740
741 This is made necessary by the fact the 'getbundle' command over 'ssh'
741 This is made necessary by the fact the 'getbundle' command over 'ssh'
742 have no way to know then the reply end, relying on the bundle to be
742 have no way to know then the reply end, relying on the bundle to be
743 interpreted to know its end. This is terrible and we are sorry, but we
743 interpreted to know its end. This is terrible and we are sorry, but we
744 needed to move forward to get general delta enabled.
744 needed to move forward to get general delta enabled.
745 """
745 """
746 yield self._magicstring
746 yield self._magicstring
747 assert 'params' not in vars(self)
747 assert 'params' not in vars(self)
748 paramssize = self._unpack(_fstreamparamsize)[0]
748 paramssize = self._unpack(_fstreamparamsize)[0]
749 if paramssize < 0:
749 if paramssize < 0:
750 raise error.BundleValueError('negative bundle param size: %i'
750 raise error.BundleValueError('negative bundle param size: %i'
751 % paramssize)
751 % paramssize)
752 yield _pack(_fstreamparamsize, paramssize)
752 yield _pack(_fstreamparamsize, paramssize)
753 if paramssize:
753 if paramssize:
754 params = self._readexact(paramssize)
754 params = self._readexact(paramssize)
755 self._processallparams(params)
755 self._processallparams(params)
756 yield params
756 yield params
757 assert self._compengine.bundletype == 'UN'
757 assert self._compengine.bundletype == 'UN'
758 # From there, payload might need to be decompressed
758 # From there, payload might need to be decompressed
759 self._fp = self._compengine.decompressorreader(self._fp)
759 self._fp = self._compengine.decompressorreader(self._fp)
760 emptycount = 0
760 emptycount = 0
761 while emptycount < 2:
761 while emptycount < 2:
762 # so we can brainlessly loop
762 # so we can brainlessly loop
763 assert _fpartheadersize == _fpayloadsize
763 assert _fpartheadersize == _fpayloadsize
764 size = self._unpack(_fpartheadersize)[0]
764 size = self._unpack(_fpartheadersize)[0]
765 yield _pack(_fpartheadersize, size)
765 yield _pack(_fpartheadersize, size)
766 if size:
766 if size:
767 emptycount = 0
767 emptycount = 0
768 else:
768 else:
769 emptycount += 1
769 emptycount += 1
770 continue
770 continue
771 if size == flaginterrupt:
771 if size == flaginterrupt:
772 continue
772 continue
773 elif size < 0:
773 elif size < 0:
774 raise error.BundleValueError('negative chunk size: %i')
774 raise error.BundleValueError('negative chunk size: %i')
775 yield self._readexact(size)
775 yield self._readexact(size)
776
776
777
777
778 def iterparts(self):
778 def iterparts(self):
779 """yield all parts contained in the stream"""
779 """yield all parts contained in the stream"""
780 # make sure param have been loaded
780 # make sure param have been loaded
781 self.params
781 self.params
782 # From there, payload need to be decompressed
782 # From there, payload need to be decompressed
783 self._fp = self._compengine.decompressorreader(self._fp)
783 self._fp = self._compengine.decompressorreader(self._fp)
784 indebug(self.ui, 'start extraction of bundle2 parts')
784 indebug(self.ui, 'start extraction of bundle2 parts')
785 headerblock = self._readpartheader()
785 headerblock = self._readpartheader()
786 while headerblock is not None:
786 while headerblock is not None:
787 part = unbundlepart(self.ui, headerblock, self._fp)
787 part = unbundlepart(self.ui, headerblock, self._fp)
788 yield part
788 yield part
789 part.seek(0, 2)
789 part.seek(0, 2)
790 headerblock = self._readpartheader()
790 headerblock = self._readpartheader()
791 indebug(self.ui, 'end of bundle2 stream')
791 indebug(self.ui, 'end of bundle2 stream')
792
792
793 def _readpartheader(self):
793 def _readpartheader(self):
794 """reads a part header size and return the bytes blob
794 """reads a part header size and return the bytes blob
795
795
796 returns None if empty"""
796 returns None if empty"""
797 headersize = self._unpack(_fpartheadersize)[0]
797 headersize = self._unpack(_fpartheadersize)[0]
798 if headersize < 0:
798 if headersize < 0:
799 raise error.BundleValueError('negative part header size: %i'
799 raise error.BundleValueError('negative part header size: %i'
800 % headersize)
800 % headersize)
801 indebug(self.ui, 'part header size: %i' % headersize)
801 indebug(self.ui, 'part header size: %i' % headersize)
802 if headersize:
802 if headersize:
803 return self._readexact(headersize)
803 return self._readexact(headersize)
804 return None
804 return None
805
805
806 def compressed(self):
806 def compressed(self):
807 self.params # load params
807 self.params # load params
808 return self._compressed
808 return self._compressed
809
809
810 def close(self):
810 def close(self):
811 """close underlying file"""
811 """close underlying file"""
812 if util.safehasattr(self._fp, 'close'):
812 if util.safehasattr(self._fp, 'close'):
813 return self._fp.close()
813 return self._fp.close()
814
814
815 formatmap = {'20': unbundle20}
815 formatmap = {'20': unbundle20}
816
816
817 b2streamparamsmap = {}
817 b2streamparamsmap = {}
818
818
819 def b2streamparamhandler(name):
819 def b2streamparamhandler(name):
820 """register a handler for a stream level parameter"""
820 """register a handler for a stream level parameter"""
821 def decorator(func):
821 def decorator(func):
822 assert name not in formatmap
822 assert name not in formatmap
823 b2streamparamsmap[name] = func
823 b2streamparamsmap[name] = func
824 return func
824 return func
825 return decorator
825 return decorator
826
826
827 @b2streamparamhandler('compression')
827 @b2streamparamhandler('compression')
828 def processcompression(unbundler, param, value):
828 def processcompression(unbundler, param, value):
829 """read compression parameter and install payload decompression"""
829 """read compression parameter and install payload decompression"""
830 if value not in util.compengines.supportedbundletypes:
830 if value not in util.compengines.supportedbundletypes:
831 raise error.BundleUnknownFeatureError(params=(param,),
831 raise error.BundleUnknownFeatureError(params=(param,),
832 values=(value,))
832 values=(value,))
833 unbundler._compengine = util.compengines.forbundletype(value)
833 unbundler._compengine = util.compengines.forbundletype(value)
834 if value is not None:
834 if value is not None:
835 unbundler._compressed = True
835 unbundler._compressed = True
836
836
837 class bundlepart(object):
837 class bundlepart(object):
838 """A bundle2 part contains application level payload
838 """A bundle2 part contains application level payload
839
839
840 The part `type` is used to route the part to the application level
840 The part `type` is used to route the part to the application level
841 handler.
841 handler.
842
842
843 The part payload is contained in ``part.data``. It could be raw bytes or a
843 The part payload is contained in ``part.data``. It could be raw bytes or a
844 generator of byte chunks.
844 generator of byte chunks.
845
845
846 You can add parameters to the part using the ``addparam`` method.
846 You can add parameters to the part using the ``addparam`` method.
847 Parameters can be either mandatory (default) or advisory. Remote side
847 Parameters can be either mandatory (default) or advisory. Remote side
848 should be able to safely ignore the advisory ones.
848 should be able to safely ignore the advisory ones.
849
849
850 Both data and parameters cannot be modified after the generation has begun.
850 Both data and parameters cannot be modified after the generation has begun.
851 """
851 """
852
852
853 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
853 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
854 data='', mandatory=True):
854 data='', mandatory=True):
855 validateparttype(parttype)
855 validateparttype(parttype)
856 self.id = None
856 self.id = None
857 self.type = parttype
857 self.type = parttype
858 self._data = data
858 self._data = data
859 self._mandatoryparams = list(mandatoryparams)
859 self._mandatoryparams = list(mandatoryparams)
860 self._advisoryparams = list(advisoryparams)
860 self._advisoryparams = list(advisoryparams)
861 # checking for duplicated entries
861 # checking for duplicated entries
862 self._seenparams = set()
862 self._seenparams = set()
863 for pname, __ in self._mandatoryparams + self._advisoryparams:
863 for pname, __ in self._mandatoryparams + self._advisoryparams:
864 if pname in self._seenparams:
864 if pname in self._seenparams:
865 raise error.ProgrammingError('duplicated params: %s' % pname)
865 raise error.ProgrammingError('duplicated params: %s' % pname)
866 self._seenparams.add(pname)
866 self._seenparams.add(pname)
867 # status of the part's generation:
867 # status of the part's generation:
868 # - None: not started,
868 # - None: not started,
869 # - False: currently generated,
869 # - False: currently generated,
870 # - True: generation done.
870 # - True: generation done.
871 self._generated = None
871 self._generated = None
872 self.mandatory = mandatory
872 self.mandatory = mandatory
873
873
874 def __repr__(self):
874 def __repr__(self):
875 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
875 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
876 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
876 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
877 % (cls, id(self), self.id, self.type, self.mandatory))
877 % (cls, id(self), self.id, self.type, self.mandatory))
878
878
879 def copy(self):
879 def copy(self):
880 """return a copy of the part
880 """return a copy of the part
881
881
882 The new part have the very same content but no partid assigned yet.
882 The new part have the very same content but no partid assigned yet.
883 Parts with generated data cannot be copied."""
883 Parts with generated data cannot be copied."""
884 assert not util.safehasattr(self.data, 'next')
884 assert not util.safehasattr(self.data, 'next')
885 return self.__class__(self.type, self._mandatoryparams,
885 return self.__class__(self.type, self._mandatoryparams,
886 self._advisoryparams, self._data, self.mandatory)
886 self._advisoryparams, self._data, self.mandatory)
887
887
888 # methods used to defines the part content
888 # methods used to defines the part content
889 @property
889 @property
890 def data(self):
890 def data(self):
891 return self._data
891 return self._data
892
892
893 @data.setter
893 @data.setter
894 def data(self, data):
894 def data(self, data):
895 if self._generated is not None:
895 if self._generated is not None:
896 raise error.ReadOnlyPartError('part is being generated')
896 raise error.ReadOnlyPartError('part is being generated')
897 self._data = data
897 self._data = data
898
898
899 @property
899 @property
900 def mandatoryparams(self):
900 def mandatoryparams(self):
901 # make it an immutable tuple to force people through ``addparam``
901 # make it an immutable tuple to force people through ``addparam``
902 return tuple(self._mandatoryparams)
902 return tuple(self._mandatoryparams)
903
903
904 @property
904 @property
905 def advisoryparams(self):
905 def advisoryparams(self):
906 # make it an immutable tuple to force people through ``addparam``
906 # make it an immutable tuple to force people through ``addparam``
907 return tuple(self._advisoryparams)
907 return tuple(self._advisoryparams)
908
908
909 def addparam(self, name, value='', mandatory=True):
909 def addparam(self, name, value='', mandatory=True):
910 """add a parameter to the part
910 """add a parameter to the part
911
911
912 If 'mandatory' is set to True, the remote handler must claim support
912 If 'mandatory' is set to True, the remote handler must claim support
913 for this parameter or the unbundling will be aborted.
913 for this parameter or the unbundling will be aborted.
914
914
915 The 'name' and 'value' cannot exceed 255 bytes each.
915 The 'name' and 'value' cannot exceed 255 bytes each.
916 """
916 """
917 if self._generated is not None:
917 if self._generated is not None:
918 raise error.ReadOnlyPartError('part is being generated')
918 raise error.ReadOnlyPartError('part is being generated')
919 if name in self._seenparams:
919 if name in self._seenparams:
920 raise ValueError('duplicated params: %s' % name)
920 raise ValueError('duplicated params: %s' % name)
921 self._seenparams.add(name)
921 self._seenparams.add(name)
922 params = self._advisoryparams
922 params = self._advisoryparams
923 if mandatory:
923 if mandatory:
924 params = self._mandatoryparams
924 params = self._mandatoryparams
925 params.append((name, value))
925 params.append((name, value))
926
926
927 # methods used to generates the bundle2 stream
927 # methods used to generates the bundle2 stream
928 def getchunks(self, ui):
928 def getchunks(self, ui):
929 if self._generated is not None:
929 if self._generated is not None:
930 raise error.ProgrammingError('part can only be consumed once')
930 raise error.ProgrammingError('part can only be consumed once')
931 self._generated = False
931 self._generated = False
932
932
933 if ui.debugflag:
933 if ui.debugflag:
934 msg = ['bundle2-output-part: "%s"' % self.type]
934 msg = ['bundle2-output-part: "%s"' % self.type]
935 if not self.mandatory:
935 if not self.mandatory:
936 msg.append(' (advisory)')
936 msg.append(' (advisory)')
937 nbmp = len(self.mandatoryparams)
937 nbmp = len(self.mandatoryparams)
938 nbap = len(self.advisoryparams)
938 nbap = len(self.advisoryparams)
939 if nbmp or nbap:
939 if nbmp or nbap:
940 msg.append(' (params:')
940 msg.append(' (params:')
941 if nbmp:
941 if nbmp:
942 msg.append(' %i mandatory' % nbmp)
942 msg.append(' %i mandatory' % nbmp)
943 if nbap:
943 if nbap:
944 msg.append(' %i advisory' % nbmp)
944 msg.append(' %i advisory' % nbmp)
945 msg.append(')')
945 msg.append(')')
946 if not self.data:
946 if not self.data:
947 msg.append(' empty payload')
947 msg.append(' empty payload')
948 elif util.safehasattr(self.data, 'next'):
948 elif util.safehasattr(self.data, 'next'):
949 msg.append(' streamed payload')
949 msg.append(' streamed payload')
950 else:
950 else:
951 msg.append(' %i bytes payload' % len(self.data))
951 msg.append(' %i bytes payload' % len(self.data))
952 msg.append('\n')
952 msg.append('\n')
953 ui.debug(''.join(msg))
953 ui.debug(''.join(msg))
954
954
955 #### header
955 #### header
956 if self.mandatory:
956 if self.mandatory:
957 parttype = self.type.upper()
957 parttype = self.type.upper()
958 else:
958 else:
959 parttype = self.type.lower()
959 parttype = self.type.lower()
960 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
960 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
961 ## parttype
961 ## parttype
962 header = [_pack(_fparttypesize, len(parttype)),
962 header = [_pack(_fparttypesize, len(parttype)),
963 parttype, _pack(_fpartid, self.id),
963 parttype, _pack(_fpartid, self.id),
964 ]
964 ]
965 ## parameters
965 ## parameters
966 # count
966 # count
967 manpar = self.mandatoryparams
967 manpar = self.mandatoryparams
968 advpar = self.advisoryparams
968 advpar = self.advisoryparams
969 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
969 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
970 # size
970 # size
971 parsizes = []
971 parsizes = []
972 for key, value in manpar:
972 for key, value in manpar:
973 parsizes.append(len(key))
973 parsizes.append(len(key))
974 parsizes.append(len(value))
974 parsizes.append(len(value))
975 for key, value in advpar:
975 for key, value in advpar:
976 parsizes.append(len(key))
976 parsizes.append(len(key))
977 parsizes.append(len(value))
977 parsizes.append(len(value))
978 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
978 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
979 header.append(paramsizes)
979 header.append(paramsizes)
980 # key, value
980 # key, value
981 for key, value in manpar:
981 for key, value in manpar:
982 header.append(key)
982 header.append(key)
983 header.append(value)
983 header.append(value)
984 for key, value in advpar:
984 for key, value in advpar:
985 header.append(key)
985 header.append(key)
986 header.append(value)
986 header.append(value)
987 ## finalize header
987 ## finalize header
988 headerchunk = ''.join(header)
988 headerchunk = ''.join(header)
989 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
989 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
990 yield _pack(_fpartheadersize, len(headerchunk))
990 yield _pack(_fpartheadersize, len(headerchunk))
991 yield headerchunk
991 yield headerchunk
992 ## payload
992 ## payload
993 try:
993 try:
994 for chunk in self._payloadchunks():
994 for chunk in self._payloadchunks():
995 outdebug(ui, 'payload chunk size: %i' % len(chunk))
995 outdebug(ui, 'payload chunk size: %i' % len(chunk))
996 yield _pack(_fpayloadsize, len(chunk))
996 yield _pack(_fpayloadsize, len(chunk))
997 yield chunk
997 yield chunk
998 except GeneratorExit:
998 except GeneratorExit:
999 # GeneratorExit means that nobody is listening for our
999 # GeneratorExit means that nobody is listening for our
1000 # results anyway, so just bail quickly rather than trying
1000 # results anyway, so just bail quickly rather than trying
1001 # to produce an error part.
1001 # to produce an error part.
1002 ui.debug('bundle2-generatorexit\n')
1002 ui.debug('bundle2-generatorexit\n')
1003 raise
1003 raise
1004 except BaseException as exc:
1004 except BaseException as exc:
1005 # backup exception data for later
1005 # backup exception data for later
1006 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1006 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1007 % exc)
1007 % exc)
1008 tb = sys.exc_info()[2]
1008 tb = sys.exc_info()[2]
1009 msg = 'unexpected error: %s' % exc
1009 msg = 'unexpected error: %s' % exc
1010 interpart = bundlepart('error:abort', [('message', msg)],
1010 interpart = bundlepart('error:abort', [('message', msg)],
1011 mandatory=False)
1011 mandatory=False)
1012 interpart.id = 0
1012 interpart.id = 0
1013 yield _pack(_fpayloadsize, -1)
1013 yield _pack(_fpayloadsize, -1)
1014 for chunk in interpart.getchunks(ui=ui):
1014 for chunk in interpart.getchunks(ui=ui):
1015 yield chunk
1015 yield chunk
1016 outdebug(ui, 'closing payload chunk')
1016 outdebug(ui, 'closing payload chunk')
1017 # abort current part payload
1017 # abort current part payload
1018 yield _pack(_fpayloadsize, 0)
1018 yield _pack(_fpayloadsize, 0)
1019 pycompat.raisewithtb(exc, tb)
1019 pycompat.raisewithtb(exc, tb)
1020 # end of payload
1020 # end of payload
1021 outdebug(ui, 'closing payload chunk')
1021 outdebug(ui, 'closing payload chunk')
1022 yield _pack(_fpayloadsize, 0)
1022 yield _pack(_fpayloadsize, 0)
1023 self._generated = True
1023 self._generated = True
1024
1024
1025 def _payloadchunks(self):
1025 def _payloadchunks(self):
1026 """yield chunks of a the part payload
1026 """yield chunks of a the part payload
1027
1027
1028 Exists to handle the different methods to provide data to a part."""
1028 Exists to handle the different methods to provide data to a part."""
1029 # we only support fixed size data now.
1029 # we only support fixed size data now.
1030 # This will be improved in the future.
1030 # This will be improved in the future.
1031 if util.safehasattr(self.data, 'next'):
1031 if util.safehasattr(self.data, 'next'):
1032 buff = util.chunkbuffer(self.data)
1032 buff = util.chunkbuffer(self.data)
1033 chunk = buff.read(preferedchunksize)
1033 chunk = buff.read(preferedchunksize)
1034 while chunk:
1034 while chunk:
1035 yield chunk
1035 yield chunk
1036 chunk = buff.read(preferedchunksize)
1036 chunk = buff.read(preferedchunksize)
1037 elif len(self.data):
1037 elif len(self.data):
1038 yield self.data
1038 yield self.data
1039
1039
1040
1040
1041 flaginterrupt = -1
1041 flaginterrupt = -1
1042
1042
1043 class interrupthandler(unpackermixin):
1043 class interrupthandler(unpackermixin):
1044 """read one part and process it with restricted capability
1044 """read one part and process it with restricted capability
1045
1045
1046 This allows to transmit exception raised on the producer size during part
1046 This allows to transmit exception raised on the producer size during part
1047 iteration while the consumer is reading a part.
1047 iteration while the consumer is reading a part.
1048
1048
1049 Part processed in this manner only have access to a ui object,"""
1049 Part processed in this manner only have access to a ui object,"""
1050
1050
1051 def __init__(self, ui, fp):
1051 def __init__(self, ui, fp):
1052 super(interrupthandler, self).__init__(fp)
1052 super(interrupthandler, self).__init__(fp)
1053 self.ui = ui
1053 self.ui = ui
1054
1054
1055 def _readpartheader(self):
1055 def _readpartheader(self):
1056 """reads a part header size and return the bytes blob
1056 """reads a part header size and return the bytes blob
1057
1057
1058 returns None if empty"""
1058 returns None if empty"""
1059 headersize = self._unpack(_fpartheadersize)[0]
1059 headersize = self._unpack(_fpartheadersize)[0]
1060 if headersize < 0:
1060 if headersize < 0:
1061 raise error.BundleValueError('negative part header size: %i'
1061 raise error.BundleValueError('negative part header size: %i'
1062 % headersize)
1062 % headersize)
1063 indebug(self.ui, 'part header size: %i\n' % headersize)
1063 indebug(self.ui, 'part header size: %i\n' % headersize)
1064 if headersize:
1064 if headersize:
1065 return self._readexact(headersize)
1065 return self._readexact(headersize)
1066 return None
1066 return None
1067
1067
1068 def __call__(self):
1068 def __call__(self):
1069
1069
1070 self.ui.debug('bundle2-input-stream-interrupt:'
1070 self.ui.debug('bundle2-input-stream-interrupt:'
1071 ' opening out of band context\n')
1071 ' opening out of band context\n')
1072 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1072 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1073 headerblock = self._readpartheader()
1073 headerblock = self._readpartheader()
1074 if headerblock is None:
1074 if headerblock is None:
1075 indebug(self.ui, 'no part found during interruption.')
1075 indebug(self.ui, 'no part found during interruption.')
1076 return
1076 return
1077 part = unbundlepart(self.ui, headerblock, self._fp)
1077 part = unbundlepart(self.ui, headerblock, self._fp)
1078 op = interruptoperation(self.ui)
1078 op = interruptoperation(self.ui)
1079 _processpart(op, part)
1079 _processpart(op, part)
1080 self.ui.debug('bundle2-input-stream-interrupt:'
1080 self.ui.debug('bundle2-input-stream-interrupt:'
1081 ' closing out of band context\n')
1081 ' closing out of band context\n')
1082
1082
1083 class interruptoperation(object):
1083 class interruptoperation(object):
1084 """A limited operation to be use by part handler during interruption
1084 """A limited operation to be use by part handler during interruption
1085
1085
1086 It only have access to an ui object.
1086 It only have access to an ui object.
1087 """
1087 """
1088
1088
1089 def __init__(self, ui):
1089 def __init__(self, ui):
1090 self.ui = ui
1090 self.ui = ui
1091 self.reply = None
1091 self.reply = None
1092 self.captureoutput = False
1092 self.captureoutput = False
1093
1093
1094 @property
1094 @property
1095 def repo(self):
1095 def repo(self):
1096 raise error.ProgrammingError('no repo access from stream interruption')
1096 raise error.ProgrammingError('no repo access from stream interruption')
1097
1097
1098 def gettransaction(self):
1098 def gettransaction(self):
1099 raise TransactionUnavailable('no repo access from stream interruption')
1099 raise TransactionUnavailable('no repo access from stream interruption')
1100
1100
1101 class unbundlepart(unpackermixin):
1101 class unbundlepart(unpackermixin):
1102 """a bundle part read from a bundle"""
1102 """a bundle part read from a bundle"""
1103
1103
1104 def __init__(self, ui, header, fp):
1104 def __init__(self, ui, header, fp):
1105 super(unbundlepart, self).__init__(fp)
1105 super(unbundlepart, self).__init__(fp)
1106 self._seekable = (util.safehasattr(fp, 'seek') and
1106 self._seekable = (util.safehasattr(fp, 'seek') and
1107 util.safehasattr(fp, 'tell'))
1107 util.safehasattr(fp, 'tell'))
1108 self.ui = ui
1108 self.ui = ui
1109 # unbundle state attr
1109 # unbundle state attr
1110 self._headerdata = header
1110 self._headerdata = header
1111 self._headeroffset = 0
1111 self._headeroffset = 0
1112 self._initialized = False
1112 self._initialized = False
1113 self.consumed = False
1113 self.consumed = False
1114 # part data
1114 # part data
1115 self.id = None
1115 self.id = None
1116 self.type = None
1116 self.type = None
1117 self.mandatoryparams = None
1117 self.mandatoryparams = None
1118 self.advisoryparams = None
1118 self.advisoryparams = None
1119 self.params = None
1119 self.params = None
1120 self.mandatorykeys = ()
1120 self.mandatorykeys = ()
1121 self._payloadstream = None
1121 self._payloadstream = None
1122 self._readheader()
1122 self._readheader()
1123 self._mandatory = None
1123 self._mandatory = None
1124 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1124 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1125 self._pos = 0
1125 self._pos = 0
1126
1126
1127 def _fromheader(self, size):
1127 def _fromheader(self, size):
1128 """return the next <size> byte from the header"""
1128 """return the next <size> byte from the header"""
1129 offset = self._headeroffset
1129 offset = self._headeroffset
1130 data = self._headerdata[offset:(offset + size)]
1130 data = self._headerdata[offset:(offset + size)]
1131 self._headeroffset = offset + size
1131 self._headeroffset = offset + size
1132 return data
1132 return data
1133
1133
1134 def _unpackheader(self, format):
1134 def _unpackheader(self, format):
1135 """read given format from header
1135 """read given format from header
1136
1136
1137 This automatically compute the size of the format to read."""
1137 This automatically compute the size of the format to read."""
1138 data = self._fromheader(struct.calcsize(format))
1138 data = self._fromheader(struct.calcsize(format))
1139 return _unpack(format, data)
1139 return _unpack(format, data)
1140
1140
1141 def _initparams(self, mandatoryparams, advisoryparams):
1141 def _initparams(self, mandatoryparams, advisoryparams):
1142 """internal function to setup all logic related parameters"""
1142 """internal function to setup all logic related parameters"""
1143 # make it read only to prevent people touching it by mistake.
1143 # make it read only to prevent people touching it by mistake.
1144 self.mandatoryparams = tuple(mandatoryparams)
1144 self.mandatoryparams = tuple(mandatoryparams)
1145 self.advisoryparams = tuple(advisoryparams)
1145 self.advisoryparams = tuple(advisoryparams)
1146 # user friendly UI
1146 # user friendly UI
1147 self.params = util.sortdict(self.mandatoryparams)
1147 self.params = util.sortdict(self.mandatoryparams)
1148 self.params.update(self.advisoryparams)
1148 self.params.update(self.advisoryparams)
1149 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1149 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1150
1150
1151 def _payloadchunks(self, chunknum=0):
1151 def _payloadchunks(self, chunknum=0):
1152 '''seek to specified chunk and start yielding data'''
1152 '''seek to specified chunk and start yielding data'''
1153 if len(self._chunkindex) == 0:
1153 if len(self._chunkindex) == 0:
1154 assert chunknum == 0, 'Must start with chunk 0'
1154 assert chunknum == 0, 'Must start with chunk 0'
1155 self._chunkindex.append((0, self._tellfp()))
1155 self._chunkindex.append((0, self._tellfp()))
1156 else:
1156 else:
1157 assert chunknum < len(self._chunkindex), \
1157 assert chunknum < len(self._chunkindex), \
1158 'Unknown chunk %d' % chunknum
1158 'Unknown chunk %d' % chunknum
1159 self._seekfp(self._chunkindex[chunknum][1])
1159 self._seekfp(self._chunkindex[chunknum][1])
1160
1160
1161 pos = self._chunkindex[chunknum][0]
1161 pos = self._chunkindex[chunknum][0]
1162 payloadsize = self._unpack(_fpayloadsize)[0]
1162 payloadsize = self._unpack(_fpayloadsize)[0]
1163 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1163 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1164 while payloadsize:
1164 while payloadsize:
1165 if payloadsize == flaginterrupt:
1165 if payloadsize == flaginterrupt:
1166 # interruption detection, the handler will now read a
1166 # interruption detection, the handler will now read a
1167 # single part and process it.
1167 # single part and process it.
1168 interrupthandler(self.ui, self._fp)()
1168 interrupthandler(self.ui, self._fp)()
1169 elif payloadsize < 0:
1169 elif payloadsize < 0:
1170 msg = 'negative payload chunk size: %i' % payloadsize
1170 msg = 'negative payload chunk size: %i' % payloadsize
1171 raise error.BundleValueError(msg)
1171 raise error.BundleValueError(msg)
1172 else:
1172 else:
1173 result = self._readexact(payloadsize)
1173 result = self._readexact(payloadsize)
1174 chunknum += 1
1174 chunknum += 1
1175 pos += payloadsize
1175 pos += payloadsize
1176 if chunknum == len(self._chunkindex):
1176 if chunknum == len(self._chunkindex):
1177 self._chunkindex.append((pos, self._tellfp()))
1177 self._chunkindex.append((pos, self._tellfp()))
1178 yield result
1178 yield result
1179 payloadsize = self._unpack(_fpayloadsize)[0]
1179 payloadsize = self._unpack(_fpayloadsize)[0]
1180 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1180 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1181
1181
1182 def _findchunk(self, pos):
1182 def _findchunk(self, pos):
1183 '''for a given payload position, return a chunk number and offset'''
1183 '''for a given payload position, return a chunk number and offset'''
1184 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1184 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1185 if ppos == pos:
1185 if ppos == pos:
1186 return chunk, 0
1186 return chunk, 0
1187 elif ppos > pos:
1187 elif ppos > pos:
1188 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1188 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1189 raise ValueError('Unknown chunk')
1189 raise ValueError('Unknown chunk')
1190
1190
1191 def _readheader(self):
1191 def _readheader(self):
1192 """read the header and setup the object"""
1192 """read the header and setup the object"""
1193 typesize = self._unpackheader(_fparttypesize)[0]
1193 typesize = self._unpackheader(_fparttypesize)[0]
1194 self.type = self._fromheader(typesize)
1194 self.type = self._fromheader(typesize)
1195 indebug(self.ui, 'part type: "%s"' % self.type)
1195 indebug(self.ui, 'part type: "%s"' % self.type)
1196 self.id = self._unpackheader(_fpartid)[0]
1196 self.id = self._unpackheader(_fpartid)[0]
1197 indebug(self.ui, 'part id: "%s"' % self.id)
1197 indebug(self.ui, 'part id: "%s"' % self.id)
1198 # extract mandatory bit from type
1198 # extract mandatory bit from type
1199 self.mandatory = (self.type != self.type.lower())
1199 self.mandatory = (self.type != self.type.lower())
1200 self.type = self.type.lower()
1200 self.type = self.type.lower()
1201 ## reading parameters
1201 ## reading parameters
1202 # param count
1202 # param count
1203 mancount, advcount = self._unpackheader(_fpartparamcount)
1203 mancount, advcount = self._unpackheader(_fpartparamcount)
1204 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1204 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1205 # param size
1205 # param size
1206 fparamsizes = _makefpartparamsizes(mancount + advcount)
1206 fparamsizes = _makefpartparamsizes(mancount + advcount)
1207 paramsizes = self._unpackheader(fparamsizes)
1207 paramsizes = self._unpackheader(fparamsizes)
1208 # make it a list of couple again
1208 # make it a list of couple again
1209 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1209 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1210 # split mandatory from advisory
1210 # split mandatory from advisory
1211 mansizes = paramsizes[:mancount]
1211 mansizes = paramsizes[:mancount]
1212 advsizes = paramsizes[mancount:]
1212 advsizes = paramsizes[mancount:]
1213 # retrieve param value
1213 # retrieve param value
1214 manparams = []
1214 manparams = []
1215 for key, value in mansizes:
1215 for key, value in mansizes:
1216 manparams.append((self._fromheader(key), self._fromheader(value)))
1216 manparams.append((self._fromheader(key), self._fromheader(value)))
1217 advparams = []
1217 advparams = []
1218 for key, value in advsizes:
1218 for key, value in advsizes:
1219 advparams.append((self._fromheader(key), self._fromheader(value)))
1219 advparams.append((self._fromheader(key), self._fromheader(value)))
1220 self._initparams(manparams, advparams)
1220 self._initparams(manparams, advparams)
1221 ## part payload
1221 ## part payload
1222 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1222 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1223 # we read the data, tell it
1223 # we read the data, tell it
1224 self._initialized = True
1224 self._initialized = True
1225
1225
1226 def read(self, size=None):
1226 def read(self, size=None):
1227 """read payload data"""
1227 """read payload data"""
1228 if not self._initialized:
1228 if not self._initialized:
1229 self._readheader()
1229 self._readheader()
1230 if size is None:
1230 if size is None:
1231 data = self._payloadstream.read()
1231 data = self._payloadstream.read()
1232 else:
1232 else:
1233 data = self._payloadstream.read(size)
1233 data = self._payloadstream.read(size)
1234 self._pos += len(data)
1234 self._pos += len(data)
1235 if size is None or len(data) < size:
1235 if size is None or len(data) < size:
1236 if not self.consumed and self._pos:
1236 if not self.consumed and self._pos:
1237 self.ui.debug('bundle2-input-part: total payload size %i\n'
1237 self.ui.debug('bundle2-input-part: total payload size %i\n'
1238 % self._pos)
1238 % self._pos)
1239 self.consumed = True
1239 self.consumed = True
1240 return data
1240 return data
1241
1241
1242 def tell(self):
1242 def tell(self):
1243 return self._pos
1243 return self._pos
1244
1244
1245 def seek(self, offset, whence=0):
1245 def seek(self, offset, whence=0):
1246 if whence == 0:
1246 if whence == 0:
1247 newpos = offset
1247 newpos = offset
1248 elif whence == 1:
1248 elif whence == 1:
1249 newpos = self._pos + offset
1249 newpos = self._pos + offset
1250 elif whence == 2:
1250 elif whence == 2:
1251 if not self.consumed:
1251 if not self.consumed:
1252 self.read()
1252 self.read()
1253 newpos = self._chunkindex[-1][0] - offset
1253 newpos = self._chunkindex[-1][0] - offset
1254 else:
1254 else:
1255 raise ValueError('Unknown whence value: %r' % (whence,))
1255 raise ValueError('Unknown whence value: %r' % (whence,))
1256
1256
1257 if newpos > self._chunkindex[-1][0] and not self.consumed:
1257 if newpos > self._chunkindex[-1][0] and not self.consumed:
1258 self.read()
1258 self.read()
1259 if not 0 <= newpos <= self._chunkindex[-1][0]:
1259 if not 0 <= newpos <= self._chunkindex[-1][0]:
1260 raise ValueError('Offset out of range')
1260 raise ValueError('Offset out of range')
1261
1261
1262 if self._pos != newpos:
1262 if self._pos != newpos:
1263 chunk, internaloffset = self._findchunk(newpos)
1263 chunk, internaloffset = self._findchunk(newpos)
1264 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1264 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1265 adjust = self.read(internaloffset)
1265 adjust = self.read(internaloffset)
1266 if len(adjust) != internaloffset:
1266 if len(adjust) != internaloffset:
1267 raise error.Abort(_('Seek failed\n'))
1267 raise error.Abort(_('Seek failed\n'))
1268 self._pos = newpos
1268 self._pos = newpos
1269
1269
1270 def _seekfp(self, offset, whence=0):
1270 def _seekfp(self, offset, whence=0):
1271 """move the underlying file pointer
1271 """move the underlying file pointer
1272
1272
1273 This method is meant for internal usage by the bundle2 protocol only.
1273 This method is meant for internal usage by the bundle2 protocol only.
1274 They directly manipulate the low level stream including bundle2 level
1274 They directly manipulate the low level stream including bundle2 level
1275 instruction.
1275 instruction.
1276
1276
1277 Do not use it to implement higher-level logic or methods."""
1277 Do not use it to implement higher-level logic or methods."""
1278 if self._seekable:
1278 if self._seekable:
1279 return self._fp.seek(offset, whence)
1279 return self._fp.seek(offset, whence)
1280 else:
1280 else:
1281 raise NotImplementedError(_('File pointer is not seekable'))
1281 raise NotImplementedError(_('File pointer is not seekable'))
1282
1282
1283 def _tellfp(self):
1283 def _tellfp(self):
1284 """return the file offset, or None if file is not seekable
1284 """return the file offset, or None if file is not seekable
1285
1285
1286 This method is meant for internal usage by the bundle2 protocol only.
1286 This method is meant for internal usage by the bundle2 protocol only.
1287 They directly manipulate the low level stream including bundle2 level
1287 They directly manipulate the low level stream including bundle2 level
1288 instruction.
1288 instruction.
1289
1289
1290 Do not use it to implement higher-level logic or methods."""
1290 Do not use it to implement higher-level logic or methods."""
1291 if self._seekable:
1291 if self._seekable:
1292 try:
1292 try:
1293 return self._fp.tell()
1293 return self._fp.tell()
1294 except IOError as e:
1294 except IOError as e:
1295 if e.errno == errno.ESPIPE:
1295 if e.errno == errno.ESPIPE:
1296 self._seekable = False
1296 self._seekable = False
1297 else:
1297 else:
1298 raise
1298 raise
1299 return None
1299 return None
1300
1300
1301 # These are only the static capabilities.
1301 # These are only the static capabilities.
1302 # Check the 'getrepocaps' function for the rest.
1302 # Check the 'getrepocaps' function for the rest.
1303 capabilities = {'HG20': (),
1303 capabilities = {'HG20': (),
1304 'error': ('abort', 'unsupportedcontent', 'pushraced',
1304 'error': ('abort', 'unsupportedcontent', 'pushraced',
1305 'pushkey'),
1305 'pushkey'),
1306 'listkeys': (),
1306 'listkeys': (),
1307 'pushkey': (),
1307 'pushkey': (),
1308 'digests': tuple(sorted(util.DIGESTS.keys())),
1308 'digests': tuple(sorted(util.DIGESTS.keys())),
1309 'remote-changegroup': ('http', 'https'),
1309 'remote-changegroup': ('http', 'https'),
1310 'hgtagsfnodes': (),
1310 'hgtagsfnodes': (),
1311 }
1311 }
1312
1312
1313 def getrepocaps(repo, allowpushback=False):
1313 def getrepocaps(repo, allowpushback=False):
1314 """return the bundle2 capabilities for a given repo
1314 """return the bundle2 capabilities for a given repo
1315
1315
1316 Exists to allow extensions (like evolution) to mutate the capabilities.
1316 Exists to allow extensions (like evolution) to mutate the capabilities.
1317 """
1317 """
1318 caps = capabilities.copy()
1318 caps = capabilities.copy()
1319 caps['changegroup'] = tuple(sorted(
1319 caps['changegroup'] = tuple(sorted(
1320 changegroup.supportedincomingversions(repo)))
1320 changegroup.supportedincomingversions(repo)))
1321 if obsolete.isenabled(repo, obsolete.exchangeopt):
1321 if obsolete.isenabled(repo, obsolete.exchangeopt):
1322 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1322 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1323 caps['obsmarkers'] = supportedformat
1323 caps['obsmarkers'] = supportedformat
1324 if allowpushback:
1324 if allowpushback:
1325 caps['pushback'] = ()
1325 caps['pushback'] = ()
1326 return caps
1326 return caps
1327
1327
1328 def bundle2caps(remote):
1328 def bundle2caps(remote):
1329 """return the bundle capabilities of a peer as dict"""
1329 """return the bundle capabilities of a peer as dict"""
1330 raw = remote.capable('bundle2')
1330 raw = remote.capable('bundle2')
1331 if not raw and raw != '':
1331 if not raw and raw != '':
1332 return {}
1332 return {}
1333 capsblob = urlreq.unquote(remote.capable('bundle2'))
1333 capsblob = urlreq.unquote(remote.capable('bundle2'))
1334 return decodecaps(capsblob)
1334 return decodecaps(capsblob)
1335
1335
1336 def obsmarkersversion(caps):
1336 def obsmarkersversion(caps):
1337 """extract the list of supported obsmarkers versions from a bundle2caps dict
1337 """extract the list of supported obsmarkers versions from a bundle2caps dict
1338 """
1338 """
1339 obscaps = caps.get('obsmarkers', ())
1339 obscaps = caps.get('obsmarkers', ())
1340 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1340 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1341
1341
1342 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1342 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1343 vfs=None, compression=None, compopts=None):
1343 vfs=None, compression=None, compopts=None):
1344 if bundletype.startswith('HG10'):
1344 if bundletype.startswith('HG10'):
1345 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1345 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1346 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1346 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1347 compression=compression, compopts=compopts)
1347 compression=compression, compopts=compopts)
1348 elif not bundletype.startswith('HG20'):
1348 elif not bundletype.startswith('HG20'):
1349 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1349 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1350
1350
1351 bundle = bundle20(ui)
1351 bundle = bundle20(ui)
1352 bundle.setcompression(compression, compopts)
1352 bundle.setcompression(compression, compopts)
1353 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1353 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1354 chunkiter = bundle.getchunks()
1354 chunkiter = bundle.getchunks()
1355
1355
1356 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1356 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1357
1357
1358 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1358 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1359 # We should eventually reconcile this logic with the one behind
1359 # We should eventually reconcile this logic with the one behind
1360 # 'exchange.getbundle2partsgenerator'.
1360 # 'exchange.getbundle2partsgenerator'.
1361 #
1361 #
1362 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1362 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1363 # different right now. So we keep them separated for now for the sake of
1363 # different right now. So we keep them separated for now for the sake of
1364 # simplicity.
1364 # simplicity.
1365
1365
1366 # we always want a changegroup in such bundle
1366 # we always want a changegroup in such bundle
1367 cgversion = opts.get('cg.version')
1367 cgversion = opts.get('cg.version')
1368 if cgversion is None:
1368 if cgversion is None:
1369 cgversion = changegroup.safeversion(repo)
1369 cgversion = changegroup.safeversion(repo)
1370 cg = changegroup.getchangegroup(repo, source, outgoing,
1370 cg = changegroup.getchangegroup(repo, source, outgoing,
1371 version=cgversion)
1371 version=cgversion)
1372 part = bundler.newpart('changegroup', data=cg.getchunks())
1372 part = bundler.newpart('changegroup', data=cg.getchunks())
1373 part.addparam('version', cg.version)
1373 part.addparam('version', cg.version)
1374 if 'clcount' in cg.extras:
1374 if 'clcount' in cg.extras:
1375 part.addparam('nbchanges', str(cg.extras['clcount']),
1375 part.addparam('nbchanges', str(cg.extras['clcount']),
1376 mandatory=False)
1376 mandatory=False)
1377
1377
1378 addparttagsfnodescache(repo, bundler, outgoing)
1378 addparttagsfnodescache(repo, bundler, outgoing)
1379
1379
1380 def addparttagsfnodescache(repo, bundler, outgoing):
1380 def addparttagsfnodescache(repo, bundler, outgoing):
1381 # we include the tags fnode cache for the bundle changeset
1381 # we include the tags fnode cache for the bundle changeset
1382 # (as an optional parts)
1382 # (as an optional parts)
1383 cache = tags.hgtagsfnodescache(repo.unfiltered())
1383 cache = tags.hgtagsfnodescache(repo.unfiltered())
1384 chunks = []
1384 chunks = []
1385
1385
1386 # .hgtags fnodes are only relevant for head changesets. While we could
1386 # .hgtags fnodes are only relevant for head changesets. While we could
1387 # transfer values for all known nodes, there will likely be little to
1387 # transfer values for all known nodes, there will likely be little to
1388 # no benefit.
1388 # no benefit.
1389 #
1389 #
1390 # We don't bother using a generator to produce output data because
1390 # We don't bother using a generator to produce output data because
1391 # a) we only have 40 bytes per head and even esoteric numbers of heads
1391 # a) we only have 40 bytes per head and even esoteric numbers of heads
1392 # consume little memory (1M heads is 40MB) b) we don't want to send the
1392 # consume little memory (1M heads is 40MB) b) we don't want to send the
1393 # part if we don't have entries and knowing if we have entries requires
1393 # part if we don't have entries and knowing if we have entries requires
1394 # cache lookups.
1394 # cache lookups.
1395 for node in outgoing.missingheads:
1395 for node in outgoing.missingheads:
1396 # Don't compute missing, as this may slow down serving.
1396 # Don't compute missing, as this may slow down serving.
1397 fnode = cache.getfnode(node, computemissing=False)
1397 fnode = cache.getfnode(node, computemissing=False)
1398 if fnode is not None:
1398 if fnode is not None:
1399 chunks.extend([node, fnode])
1399 chunks.extend([node, fnode])
1400
1400
1401 if chunks:
1401 if chunks:
1402 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1402 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1403
1403
1404 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1404 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1405 compopts=None):
1405 compopts=None):
1406 """Write a bundle file and return its filename.
1406 """Write a bundle file and return its filename.
1407
1407
1408 Existing files will not be overwritten.
1408 Existing files will not be overwritten.
1409 If no filename is specified, a temporary file is created.
1409 If no filename is specified, a temporary file is created.
1410 bz2 compression can be turned off.
1410 bz2 compression can be turned off.
1411 The bundle file will be deleted in case of errors.
1411 The bundle file will be deleted in case of errors.
1412 """
1412 """
1413
1413
1414 if bundletype == "HG20":
1414 if bundletype == "HG20":
1415 bundle = bundle20(ui)
1415 bundle = bundle20(ui)
1416 bundle.setcompression(compression, compopts)
1416 bundle.setcompression(compression, compopts)
1417 part = bundle.newpart('changegroup', data=cg.getchunks())
1417 part = bundle.newpart('changegroup', data=cg.getchunks())
1418 part.addparam('version', cg.version)
1418 part.addparam('version', cg.version)
1419 if 'clcount' in cg.extras:
1419 if 'clcount' in cg.extras:
1420 part.addparam('nbchanges', str(cg.extras['clcount']),
1420 part.addparam('nbchanges', str(cg.extras['clcount']),
1421 mandatory=False)
1421 mandatory=False)
1422 chunkiter = bundle.getchunks()
1422 chunkiter = bundle.getchunks()
1423 else:
1423 else:
1424 # compression argument is only for the bundle2 case
1424 # compression argument is only for the bundle2 case
1425 assert compression is None
1425 assert compression is None
1426 if cg.version != '01':
1426 if cg.version != '01':
1427 raise error.Abort(_('old bundle types only supports v1 '
1427 raise error.Abort(_('old bundle types only supports v1 '
1428 'changegroups'))
1428 'changegroups'))
1429 header, comp = bundletypes[bundletype]
1429 header, comp = bundletypes[bundletype]
1430 if comp not in util.compengines.supportedbundletypes:
1430 if comp not in util.compengines.supportedbundletypes:
1431 raise error.Abort(_('unknown stream compression type: %s')
1431 raise error.Abort(_('unknown stream compression type: %s')
1432 % comp)
1432 % comp)
1433 compengine = util.compengines.forbundletype(comp)
1433 compengine = util.compengines.forbundletype(comp)
1434 def chunkiter():
1434 def chunkiter():
1435 yield header
1435 yield header
1436 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1436 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1437 yield chunk
1437 yield chunk
1438 chunkiter = chunkiter()
1438 chunkiter = chunkiter()
1439
1439
1440 # parse the changegroup data, otherwise we will block
1440 # parse the changegroup data, otherwise we will block
1441 # in case of sshrepo because we don't know the end of the stream
1441 # in case of sshrepo because we don't know the end of the stream
1442 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1442 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1443
1443
1444 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1444 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1445 def handlechangegroup(op, inpart):
1445 def handlechangegroup(op, inpart):
1446 """apply a changegroup part on the repo
1446 """apply a changegroup part on the repo
1447
1447
1448 This is a very early implementation that will massive rework before being
1448 This is a very early implementation that will massive rework before being
1449 inflicted to any end-user.
1449 inflicted to any end-user.
1450 """
1450 """
1451 # Make sure we trigger a transaction creation
1451 # Make sure we trigger a transaction creation
1452 #
1452 #
1453 # The addchangegroup function will get a transaction object by itself, but
1453 # The addchangegroup function will get a transaction object by itself, but
1454 # we need to make sure we trigger the creation of a transaction object used
1454 # we need to make sure we trigger the creation of a transaction object used
1455 # for the whole processing scope.
1455 # for the whole processing scope.
1456 op.gettransaction()
1456 op.gettransaction()
1457 unpackerversion = inpart.params.get('version', '01')
1457 unpackerversion = inpart.params.get('version', '01')
1458 # We should raise an appropriate exception here
1458 # We should raise an appropriate exception here
1459 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1459 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1460 # the source and url passed here are overwritten by the one contained in
1460 # the source and url passed here are overwritten by the one contained in
1461 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1461 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1462 nbchangesets = None
1462 nbchangesets = None
1463 if 'nbchanges' in inpart.params:
1463 if 'nbchanges' in inpart.params:
1464 nbchangesets = int(inpart.params.get('nbchanges'))
1464 nbchangesets = int(inpart.params.get('nbchanges'))
1465 if ('treemanifest' in inpart.params and
1465 if ('treemanifest' in inpart.params and
1466 'treemanifest' not in op.repo.requirements):
1466 'treemanifest' not in op.repo.requirements):
1467 if len(op.repo.changelog) != 0:
1467 if len(op.repo.changelog) != 0:
1468 raise error.Abort(_(
1468 raise error.Abort(_(
1469 "bundle contains tree manifests, but local repo is "
1469 "bundle contains tree manifests, but local repo is "
1470 "non-empty and does not use tree manifests"))
1470 "non-empty and does not use tree manifests"))
1471 op.repo.requirements.add('treemanifest')
1471 op.repo.requirements.add('treemanifest')
1472 op.repo._applyopenerreqs()
1472 op.repo._applyopenerreqs()
1473 op.repo._writerequirements()
1473 op.repo._writerequirements()
1474 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1474 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1475 op.records.add('changegroup', {'return': ret})
1475 op.records.add('changegroup', {'return': ret})
1476 if op.reply is not None:
1476 if op.reply is not None:
1477 # This is definitely not the final form of this
1477 # This is definitely not the final form of this
1478 # return. But one need to start somewhere.
1478 # return. But one need to start somewhere.
1479 part = op.reply.newpart('reply:changegroup', mandatory=False)
1479 part = op.reply.newpart('reply:changegroup', mandatory=False)
1480 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1480 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1481 part.addparam('return', '%i' % ret, mandatory=False)
1481 part.addparam('return', '%i' % ret, mandatory=False)
1482 assert not inpart.read()
1482 assert not inpart.read()
1483
1483
1484 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1484 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1485 ['digest:%s' % k for k in util.DIGESTS.keys()])
1485 ['digest:%s' % k for k in util.DIGESTS.keys()])
1486 @parthandler('remote-changegroup', _remotechangegroupparams)
1486 @parthandler('remote-changegroup', _remotechangegroupparams)
1487 def handleremotechangegroup(op, inpart):
1487 def handleremotechangegroup(op, inpart):
1488 """apply a bundle10 on the repo, given an url and validation information
1488 """apply a bundle10 on the repo, given an url and validation information
1489
1489
1490 All the information about the remote bundle to import are given as
1490 All the information about the remote bundle to import are given as
1491 parameters. The parameters include:
1491 parameters. The parameters include:
1492 - url: the url to the bundle10.
1492 - url: the url to the bundle10.
1493 - size: the bundle10 file size. It is used to validate what was
1493 - size: the bundle10 file size. It is used to validate what was
1494 retrieved by the client matches the server knowledge about the bundle.
1494 retrieved by the client matches the server knowledge about the bundle.
1495 - digests: a space separated list of the digest types provided as
1495 - digests: a space separated list of the digest types provided as
1496 parameters.
1496 parameters.
1497 - digest:<digest-type>: the hexadecimal representation of the digest with
1497 - digest:<digest-type>: the hexadecimal representation of the digest with
1498 that name. Like the size, it is used to validate what was retrieved by
1498 that name. Like the size, it is used to validate what was retrieved by
1499 the client matches what the server knows about the bundle.
1499 the client matches what the server knows about the bundle.
1500
1500
1501 When multiple digest types are given, all of them are checked.
1501 When multiple digest types are given, all of them are checked.
1502 """
1502 """
1503 try:
1503 try:
1504 raw_url = inpart.params['url']
1504 raw_url = inpart.params['url']
1505 except KeyError:
1505 except KeyError:
1506 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1506 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1507 parsed_url = util.url(raw_url)
1507 parsed_url = util.url(raw_url)
1508 if parsed_url.scheme not in capabilities['remote-changegroup']:
1508 if parsed_url.scheme not in capabilities['remote-changegroup']:
1509 raise error.Abort(_('remote-changegroup does not support %s urls') %
1509 raise error.Abort(_('remote-changegroup does not support %s urls') %
1510 parsed_url.scheme)
1510 parsed_url.scheme)
1511
1511
1512 try:
1512 try:
1513 size = int(inpart.params['size'])
1513 size = int(inpart.params['size'])
1514 except ValueError:
1514 except ValueError:
1515 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1515 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1516 % 'size')
1516 % 'size')
1517 except KeyError:
1517 except KeyError:
1518 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1518 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1519
1519
1520 digests = {}
1520 digests = {}
1521 for typ in inpart.params.get('digests', '').split():
1521 for typ in inpart.params.get('digests', '').split():
1522 param = 'digest:%s' % typ
1522 param = 'digest:%s' % typ
1523 try:
1523 try:
1524 value = inpart.params[param]
1524 value = inpart.params[param]
1525 except KeyError:
1525 except KeyError:
1526 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1526 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1527 param)
1527 param)
1528 digests[typ] = value
1528 digests[typ] = value
1529
1529
1530 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1530 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1531
1531
1532 # Make sure we trigger a transaction creation
1532 # Make sure we trigger a transaction creation
1533 #
1533 #
1534 # The addchangegroup function will get a transaction object by itself, but
1534 # The addchangegroup function will get a transaction object by itself, but
1535 # we need to make sure we trigger the creation of a transaction object used
1535 # we need to make sure we trigger the creation of a transaction object used
1536 # for the whole processing scope.
1536 # for the whole processing scope.
1537 op.gettransaction()
1537 op.gettransaction()
1538 from . import exchange
1538 from . import exchange
1539 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1539 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1540 if not isinstance(cg, changegroup.cg1unpacker):
1540 if not isinstance(cg, changegroup.cg1unpacker):
1541 raise error.Abort(_('%s: not a bundle version 1.0') %
1541 raise error.Abort(_('%s: not a bundle version 1.0') %
1542 util.hidepassword(raw_url))
1542 util.hidepassword(raw_url))
1543 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1543 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1544 op.records.add('changegroup', {'return': ret})
1544 op.records.add('changegroup', {'return': ret})
1545 if op.reply is not None:
1545 if op.reply is not None:
1546 # This is definitely not the final form of this
1546 # This is definitely not the final form of this
1547 # return. But one need to start somewhere.
1547 # return. But one need to start somewhere.
1548 part = op.reply.newpart('reply:changegroup')
1548 part = op.reply.newpart('reply:changegroup')
1549 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1549 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1550 part.addparam('return', '%i' % ret, mandatory=False)
1550 part.addparam('return', '%i' % ret, mandatory=False)
1551 try:
1551 try:
1552 real_part.validate()
1552 real_part.validate()
1553 except error.Abort as e:
1553 except error.Abort as e:
1554 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1554 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1555 (util.hidepassword(raw_url), str(e)))
1555 (util.hidepassword(raw_url), str(e)))
1556 assert not inpart.read()
1556 assert not inpart.read()
1557
1557
1558 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1558 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1559 def handlereplychangegroup(op, inpart):
1559 def handlereplychangegroup(op, inpart):
1560 ret = int(inpart.params['return'])
1560 ret = int(inpart.params['return'])
1561 replyto = int(inpart.params['in-reply-to'])
1561 replyto = int(inpart.params['in-reply-to'])
1562 op.records.add('changegroup', {'return': ret}, replyto)
1562 op.records.add('changegroup', {'return': ret}, replyto)
1563
1563
1564 @parthandler('check:heads')
1564 @parthandler('check:heads')
1565 def handlecheckheads(op, inpart):
1565 def handlecheckheads(op, inpart):
1566 """check that head of the repo did not change
1566 """check that head of the repo did not change
1567
1567
1568 This is used to detect a push race when using unbundle.
1568 This is used to detect a push race when using unbundle.
1569 This replaces the "heads" argument of unbundle."""
1569 This replaces the "heads" argument of unbundle."""
1570 h = inpart.read(20)
1570 h = inpart.read(20)
1571 heads = []
1571 heads = []
1572 while len(h) == 20:
1572 while len(h) == 20:
1573 heads.append(h)
1573 heads.append(h)
1574 h = inpart.read(20)
1574 h = inpart.read(20)
1575 assert not h
1575 assert not h
1576 # Trigger a transaction so that we are guaranteed to have the lock now.
1576 # Trigger a transaction so that we are guaranteed to have the lock now.
1577 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1577 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1578 op.gettransaction()
1578 op.gettransaction()
1579 if sorted(heads) != sorted(op.repo.heads()):
1579 if sorted(heads) != sorted(op.repo.heads()):
1580 raise error.PushRaced('repository changed while pushing - '
1580 raise error.PushRaced('repository changed while pushing - '
1581 'please try again')
1581 'please try again')
1582
1582
1583 @parthandler('output')
1583 @parthandler('output')
1584 def handleoutput(op, inpart):
1584 def handleoutput(op, inpart):
1585 """forward output captured on the server to the client"""
1585 """forward output captured on the server to the client"""
1586 for line in inpart.read().splitlines():
1586 for line in inpart.read().splitlines():
1587 op.ui.status(_('remote: %s\n') % line)
1587 op.ui.status(_('remote: %s\n') % line)
1588
1588
1589 @parthandler('replycaps')
1589 @parthandler('replycaps')
1590 def handlereplycaps(op, inpart):
1590 def handlereplycaps(op, inpart):
1591 """Notify that a reply bundle should be created
1591 """Notify that a reply bundle should be created
1592
1592
1593 The payload contains the capabilities information for the reply"""
1593 The payload contains the capabilities information for the reply"""
1594 caps = decodecaps(inpart.read())
1594 caps = decodecaps(inpart.read())
1595 if op.reply is None:
1595 if op.reply is None:
1596 op.reply = bundle20(op.ui, caps)
1596 op.reply = bundle20(op.ui, caps)
1597
1597
1598 class AbortFromPart(error.Abort):
1598 class AbortFromPart(error.Abort):
1599 """Sub-class of Abort that denotes an error from a bundle2 part."""
1599 """Sub-class of Abort that denotes an error from a bundle2 part."""
1600
1600
1601 @parthandler('error:abort', ('message', 'hint'))
1601 @parthandler('error:abort', ('message', 'hint'))
1602 def handleerrorabort(op, inpart):
1602 def handleerrorabort(op, inpart):
1603 """Used to transmit abort error over the wire"""
1603 """Used to transmit abort error over the wire"""
1604 raise AbortFromPart(inpart.params['message'],
1604 raise AbortFromPart(inpart.params['message'],
1605 hint=inpart.params.get('hint'))
1605 hint=inpart.params.get('hint'))
1606
1606
1607 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1607 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1608 'in-reply-to'))
1608 'in-reply-to'))
1609 def handleerrorpushkey(op, inpart):
1609 def handleerrorpushkey(op, inpart):
1610 """Used to transmit failure of a mandatory pushkey over the wire"""
1610 """Used to transmit failure of a mandatory pushkey over the wire"""
1611 kwargs = {}
1611 kwargs = {}
1612 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1612 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1613 value = inpart.params.get(name)
1613 value = inpart.params.get(name)
1614 if value is not None:
1614 if value is not None:
1615 kwargs[name] = value
1615 kwargs[name] = value
1616 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1616 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1617
1617
1618 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1618 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1619 def handleerrorunsupportedcontent(op, inpart):
1619 def handleerrorunsupportedcontent(op, inpart):
1620 """Used to transmit unknown content error over the wire"""
1620 """Used to transmit unknown content error over the wire"""
1621 kwargs = {}
1621 kwargs = {}
1622 parttype = inpart.params.get('parttype')
1622 parttype = inpart.params.get('parttype')
1623 if parttype is not None:
1623 if parttype is not None:
1624 kwargs['parttype'] = parttype
1624 kwargs['parttype'] = parttype
1625 params = inpart.params.get('params')
1625 params = inpart.params.get('params')
1626 if params is not None:
1626 if params is not None:
1627 kwargs['params'] = params.split('\0')
1627 kwargs['params'] = params.split('\0')
1628
1628
1629 raise error.BundleUnknownFeatureError(**kwargs)
1629 raise error.BundleUnknownFeatureError(**kwargs)
1630
1630
1631 @parthandler('error:pushraced', ('message',))
1631 @parthandler('error:pushraced', ('message',))
1632 def handleerrorpushraced(op, inpart):
1632 def handleerrorpushraced(op, inpart):
1633 """Used to transmit push race error over the wire"""
1633 """Used to transmit push race error over the wire"""
1634 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1634 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1635
1635
1636 @parthandler('listkeys', ('namespace',))
1636 @parthandler('listkeys', ('namespace',))
1637 def handlelistkeys(op, inpart):
1637 def handlelistkeys(op, inpart):
1638 """retrieve pushkey namespace content stored in a bundle2"""
1638 """retrieve pushkey namespace content stored in a bundle2"""
1639 namespace = inpart.params['namespace']
1639 namespace = inpart.params['namespace']
1640 r = pushkey.decodekeys(inpart.read())
1640 r = pushkey.decodekeys(inpart.read())
1641 op.records.add('listkeys', (namespace, r))
1641 op.records.add('listkeys', (namespace, r))
1642
1642
1643 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1643 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1644 def handlepushkey(op, inpart):
1644 def handlepushkey(op, inpart):
1645 """process a pushkey request"""
1645 """process a pushkey request"""
1646 dec = pushkey.decode
1646 dec = pushkey.decode
1647 namespace = dec(inpart.params['namespace'])
1647 namespace = dec(inpart.params['namespace'])
1648 key = dec(inpart.params['key'])
1648 key = dec(inpart.params['key'])
1649 old = dec(inpart.params['old'])
1649 old = dec(inpart.params['old'])
1650 new = dec(inpart.params['new'])
1650 new = dec(inpart.params['new'])
1651 # Grab the transaction to ensure that we have the lock before performing the
1651 # Grab the transaction to ensure that we have the lock before performing the
1652 # pushkey.
1652 # pushkey.
1653 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1653 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1654 op.gettransaction()
1654 op.gettransaction()
1655 ret = op.repo.pushkey(namespace, key, old, new)
1655 ret = op.repo.pushkey(namespace, key, old, new)
1656 record = {'namespace': namespace,
1656 record = {'namespace': namespace,
1657 'key': key,
1657 'key': key,
1658 'old': old,
1658 'old': old,
1659 'new': new}
1659 'new': new}
1660 op.records.add('pushkey', record)
1660 op.records.add('pushkey', record)
1661 if op.reply is not None:
1661 if op.reply is not None:
1662 rpart = op.reply.newpart('reply:pushkey')
1662 rpart = op.reply.newpart('reply:pushkey')
1663 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1663 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1664 rpart.addparam('return', '%i' % ret, mandatory=False)
1664 rpart.addparam('return', '%i' % ret, mandatory=False)
1665 if inpart.mandatory and not ret:
1665 if inpart.mandatory and not ret:
1666 kwargs = {}
1666 kwargs = {}
1667 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1667 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1668 if key in inpart.params:
1668 if key in inpart.params:
1669 kwargs[key] = inpart.params[key]
1669 kwargs[key] = inpart.params[key]
1670 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1670 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1671
1671
1672 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1672 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1673 def handlepushkeyreply(op, inpart):
1673 def handlepushkeyreply(op, inpart):
1674 """retrieve the result of a pushkey request"""
1674 """retrieve the result of a pushkey request"""
1675 ret = int(inpart.params['return'])
1675 ret = int(inpart.params['return'])
1676 partid = int(inpart.params['in-reply-to'])
1676 partid = int(inpart.params['in-reply-to'])
1677 op.records.add('pushkey', {'return': ret}, partid)
1677 op.records.add('pushkey', {'return': ret}, partid)
1678
1678
1679 @parthandler('obsmarkers')
1679 @parthandler('obsmarkers')
1680 def handleobsmarker(op, inpart):
1680 def handleobsmarker(op, inpart):
1681 """add a stream of obsmarkers to the repo"""
1681 """add a stream of obsmarkers to the repo"""
1682 tr = op.gettransaction()
1682 tr = op.gettransaction()
1683 markerdata = inpart.read()
1683 markerdata = inpart.read()
1684 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1684 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1685 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1685 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1686 % len(markerdata))
1686 % len(markerdata))
1687 # The mergemarkers call will crash if marker creation is not enabled.
1687 # The mergemarkers call will crash if marker creation is not enabled.
1688 # we want to avoid this if the part is advisory.
1688 # we want to avoid this if the part is advisory.
1689 if not inpart.mandatory and op.repo.obsstore.readonly:
1689 if not inpart.mandatory and op.repo.obsstore.readonly:
1690 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1690 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1691 return
1691 return
1692 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1692 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1693 op.repo.invalidatevolatilesets()
1693 if new:
1694 if new:
1694 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1695 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1695 op.records.add('obsmarkers', {'new': new})
1696 op.records.add('obsmarkers', {'new': new})
1696 if op.reply is not None:
1697 if op.reply is not None:
1697 rpart = op.reply.newpart('reply:obsmarkers')
1698 rpart = op.reply.newpart('reply:obsmarkers')
1698 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1699 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1699 rpart.addparam('new', '%i' % new, mandatory=False)
1700 rpart.addparam('new', '%i' % new, mandatory=False)
1700
1701
1701
1702
1702 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1703 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1703 def handleobsmarkerreply(op, inpart):
1704 def handleobsmarkerreply(op, inpart):
1704 """retrieve the result of a pushkey request"""
1705 """retrieve the result of a pushkey request"""
1705 ret = int(inpart.params['new'])
1706 ret = int(inpart.params['new'])
1706 partid = int(inpart.params['in-reply-to'])
1707 partid = int(inpart.params['in-reply-to'])
1707 op.records.add('obsmarkers', {'new': ret}, partid)
1708 op.records.add('obsmarkers', {'new': ret}, partid)
1708
1709
1709 @parthandler('hgtagsfnodes')
1710 @parthandler('hgtagsfnodes')
1710 def handlehgtagsfnodes(op, inpart):
1711 def handlehgtagsfnodes(op, inpart):
1711 """Applies .hgtags fnodes cache entries to the local repo.
1712 """Applies .hgtags fnodes cache entries to the local repo.
1712
1713
1713 Payload is pairs of 20 byte changeset nodes and filenodes.
1714 Payload is pairs of 20 byte changeset nodes and filenodes.
1714 """
1715 """
1715 # Grab the transaction so we ensure that we have the lock at this point.
1716 # Grab the transaction so we ensure that we have the lock at this point.
1716 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1717 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1717 op.gettransaction()
1718 op.gettransaction()
1718 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1719 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1719
1720
1720 count = 0
1721 count = 0
1721 while True:
1722 while True:
1722 node = inpart.read(20)
1723 node = inpart.read(20)
1723 fnode = inpart.read(20)
1724 fnode = inpart.read(20)
1724 if len(node) < 20 or len(fnode) < 20:
1725 if len(node) < 20 or len(fnode) < 20:
1725 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1726 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1726 break
1727 break
1727 cache.setfnode(node, fnode)
1728 cache.setfnode(node, fnode)
1728 count += 1
1729 count += 1
1729
1730
1730 cache.write()
1731 cache.write()
1731 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1732 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
@@ -1,1292 +1,1293 b''
1 # obsolete.py - obsolete markers handling
1 # obsolete.py - obsolete markers handling
2 #
2 #
3 # Copyright 2012 Pierre-Yves David <pierre-yves.david@ens-lyon.org>
3 # Copyright 2012 Pierre-Yves David <pierre-yves.david@ens-lyon.org>
4 # Logilab SA <contact@logilab.fr>
4 # Logilab SA <contact@logilab.fr>
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8
8
9 """Obsolete marker handling
9 """Obsolete marker handling
10
10
11 An obsolete marker maps an old changeset to a list of new
11 An obsolete marker maps an old changeset to a list of new
12 changesets. If the list of new changesets is empty, the old changeset
12 changesets. If the list of new changesets is empty, the old changeset
13 is said to be "killed". Otherwise, the old changeset is being
13 is said to be "killed". Otherwise, the old changeset is being
14 "replaced" by the new changesets.
14 "replaced" by the new changesets.
15
15
16 Obsolete markers can be used to record and distribute changeset graph
16 Obsolete markers can be used to record and distribute changeset graph
17 transformations performed by history rewrite operations, and help
17 transformations performed by history rewrite operations, and help
18 building new tools to reconcile conflicting rewrite actions. To
18 building new tools to reconcile conflicting rewrite actions. To
19 facilitate conflict resolution, markers include various annotations
19 facilitate conflict resolution, markers include various annotations
20 besides old and news changeset identifiers, such as creation date or
20 besides old and news changeset identifiers, such as creation date or
21 author name.
21 author name.
22
22
23 The old obsoleted changeset is called a "precursor" and possible
23 The old obsoleted changeset is called a "precursor" and possible
24 replacements are called "successors". Markers that used changeset X as
24 replacements are called "successors". Markers that used changeset X as
25 a precursor are called "successor markers of X" because they hold
25 a precursor are called "successor markers of X" because they hold
26 information about the successors of X. Markers that use changeset Y as
26 information about the successors of X. Markers that use changeset Y as
27 a successors are call "precursor markers of Y" because they hold
27 a successors are call "precursor markers of Y" because they hold
28 information about the precursors of Y.
28 information about the precursors of Y.
29
29
30 Examples:
30 Examples:
31
31
32 - When changeset A is replaced by changeset A', one marker is stored:
32 - When changeset A is replaced by changeset A', one marker is stored:
33
33
34 (A, (A',))
34 (A, (A',))
35
35
36 - When changesets A and B are folded into a new changeset C, two markers are
36 - When changesets A and B are folded into a new changeset C, two markers are
37 stored:
37 stored:
38
38
39 (A, (C,)) and (B, (C,))
39 (A, (C,)) and (B, (C,))
40
40
41 - When changeset A is simply "pruned" from the graph, a marker is created:
41 - When changeset A is simply "pruned" from the graph, a marker is created:
42
42
43 (A, ())
43 (A, ())
44
44
45 - When changeset A is split into B and C, a single marker is used:
45 - When changeset A is split into B and C, a single marker is used:
46
46
47 (A, (B, C))
47 (A, (B, C))
48
48
49 We use a single marker to distinguish the "split" case from the "divergence"
49 We use a single marker to distinguish the "split" case from the "divergence"
50 case. If two independent operations rewrite the same changeset A in to A' and
50 case. If two independent operations rewrite the same changeset A in to A' and
51 A'', we have an error case: divergent rewriting. We can detect it because
51 A'', we have an error case: divergent rewriting. We can detect it because
52 two markers will be created independently:
52 two markers will be created independently:
53
53
54 (A, (B,)) and (A, (C,))
54 (A, (B,)) and (A, (C,))
55
55
56 Format
56 Format
57 ------
57 ------
58
58
59 Markers are stored in an append-only file stored in
59 Markers are stored in an append-only file stored in
60 '.hg/store/obsstore'.
60 '.hg/store/obsstore'.
61
61
62 The file starts with a version header:
62 The file starts with a version header:
63
63
64 - 1 unsigned byte: version number, starting at zero.
64 - 1 unsigned byte: version number, starting at zero.
65
65
66 The header is followed by the markers. Marker format depend of the version. See
66 The header is followed by the markers. Marker format depend of the version. See
67 comment associated with each format for details.
67 comment associated with each format for details.
68
68
69 """
69 """
70 from __future__ import absolute_import
70 from __future__ import absolute_import
71
71
72 import errno
72 import errno
73 import struct
73 import struct
74
74
75 from .i18n import _
75 from .i18n import _
76 from . import (
76 from . import (
77 error,
77 error,
78 node,
78 node,
79 phases,
79 phases,
80 policy,
80 policy,
81 util,
81 util,
82 )
82 )
83
83
84 parsers = policy.importmod(r'parsers')
84 parsers = policy.importmod(r'parsers')
85
85
86 _pack = struct.pack
86 _pack = struct.pack
87 _unpack = struct.unpack
87 _unpack = struct.unpack
88 _calcsize = struct.calcsize
88 _calcsize = struct.calcsize
89 propertycache = util.propertycache
89 propertycache = util.propertycache
90
90
91 # the obsolete feature is not mature enough to be enabled by default.
91 # the obsolete feature is not mature enough to be enabled by default.
92 # you have to rely on third party extension extension to enable this.
92 # you have to rely on third party extension extension to enable this.
93 _enabled = False
93 _enabled = False
94
94
95 # Options for obsolescence
95 # Options for obsolescence
96 createmarkersopt = 'createmarkers'
96 createmarkersopt = 'createmarkers'
97 allowunstableopt = 'allowunstable'
97 allowunstableopt = 'allowunstable'
98 exchangeopt = 'exchange'
98 exchangeopt = 'exchange'
99
99
100 def isenabled(repo, option):
100 def isenabled(repo, option):
101 """Returns True if the given repository has the given obsolete option
101 """Returns True if the given repository has the given obsolete option
102 enabled.
102 enabled.
103 """
103 """
104 result = set(repo.ui.configlist('experimental', 'evolution'))
104 result = set(repo.ui.configlist('experimental', 'evolution'))
105 if 'all' in result:
105 if 'all' in result:
106 return True
106 return True
107
107
108 # For migration purposes, temporarily return true if the config hasn't been
108 # For migration purposes, temporarily return true if the config hasn't been
109 # set but _enabled is true.
109 # set but _enabled is true.
110 if len(result) == 0 and _enabled:
110 if len(result) == 0 and _enabled:
111 return True
111 return True
112
112
113 # createmarkers must be enabled if other options are enabled
113 # createmarkers must be enabled if other options are enabled
114 if ((allowunstableopt in result or exchangeopt in result) and
114 if ((allowunstableopt in result or exchangeopt in result) and
115 not createmarkersopt in result):
115 not createmarkersopt in result):
116 raise error.Abort(_("'createmarkers' obsolete option must be enabled "
116 raise error.Abort(_("'createmarkers' obsolete option must be enabled "
117 "if other obsolete options are enabled"))
117 "if other obsolete options are enabled"))
118
118
119 return option in result
119 return option in result
120
120
121 ### obsolescence marker flag
121 ### obsolescence marker flag
122
122
123 ## bumpedfix flag
123 ## bumpedfix flag
124 #
124 #
125 # When a changeset A' succeed to a changeset A which became public, we call A'
125 # When a changeset A' succeed to a changeset A which became public, we call A'
126 # "bumped" because it's a successors of a public changesets
126 # "bumped" because it's a successors of a public changesets
127 #
127 #
128 # o A' (bumped)
128 # o A' (bumped)
129 # |`:
129 # |`:
130 # | o A
130 # | o A
131 # |/
131 # |/
132 # o Z
132 # o Z
133 #
133 #
134 # The way to solve this situation is to create a new changeset Ad as children
134 # The way to solve this situation is to create a new changeset Ad as children
135 # of A. This changeset have the same content than A'. So the diff from A to A'
135 # of A. This changeset have the same content than A'. So the diff from A to A'
136 # is the same than the diff from A to Ad. Ad is marked as a successors of A'
136 # is the same than the diff from A to Ad. Ad is marked as a successors of A'
137 #
137 #
138 # o Ad
138 # o Ad
139 # |`:
139 # |`:
140 # | x A'
140 # | x A'
141 # |'|
141 # |'|
142 # o | A
142 # o | A
143 # |/
143 # |/
144 # o Z
144 # o Z
145 #
145 #
146 # But by transitivity Ad is also a successors of A. To avoid having Ad marked
146 # But by transitivity Ad is also a successors of A. To avoid having Ad marked
147 # as bumped too, we add the `bumpedfix` flag to the marker. <A', (Ad,)>.
147 # as bumped too, we add the `bumpedfix` flag to the marker. <A', (Ad,)>.
148 # This flag mean that the successors express the changes between the public and
148 # This flag mean that the successors express the changes between the public and
149 # bumped version and fix the situation, breaking the transitivity of
149 # bumped version and fix the situation, breaking the transitivity of
150 # "bumped" here.
150 # "bumped" here.
151 bumpedfix = 1
151 bumpedfix = 1
152 usingsha256 = 2
152 usingsha256 = 2
153
153
154 ## Parsing and writing of version "0"
154 ## Parsing and writing of version "0"
155 #
155 #
156 # The header is followed by the markers. Each marker is made of:
156 # The header is followed by the markers. Each marker is made of:
157 #
157 #
158 # - 1 uint8 : number of new changesets "N", can be zero.
158 # - 1 uint8 : number of new changesets "N", can be zero.
159 #
159 #
160 # - 1 uint32: metadata size "M" in bytes.
160 # - 1 uint32: metadata size "M" in bytes.
161 #
161 #
162 # - 1 byte: a bit field. It is reserved for flags used in common
162 # - 1 byte: a bit field. It is reserved for flags used in common
163 # obsolete marker operations, to avoid repeated decoding of metadata
163 # obsolete marker operations, to avoid repeated decoding of metadata
164 # entries.
164 # entries.
165 #
165 #
166 # - 20 bytes: obsoleted changeset identifier.
166 # - 20 bytes: obsoleted changeset identifier.
167 #
167 #
168 # - N*20 bytes: new changesets identifiers.
168 # - N*20 bytes: new changesets identifiers.
169 #
169 #
170 # - M bytes: metadata as a sequence of nul-terminated strings. Each
170 # - M bytes: metadata as a sequence of nul-terminated strings. Each
171 # string contains a key and a value, separated by a colon ':', without
171 # string contains a key and a value, separated by a colon ':', without
172 # additional encoding. Keys cannot contain '\0' or ':' and values
172 # additional encoding. Keys cannot contain '\0' or ':' and values
173 # cannot contain '\0'.
173 # cannot contain '\0'.
174 _fm0version = 0
174 _fm0version = 0
175 _fm0fixed = '>BIB20s'
175 _fm0fixed = '>BIB20s'
176 _fm0node = '20s'
176 _fm0node = '20s'
177 _fm0fsize = _calcsize(_fm0fixed)
177 _fm0fsize = _calcsize(_fm0fixed)
178 _fm0fnodesize = _calcsize(_fm0node)
178 _fm0fnodesize = _calcsize(_fm0node)
179
179
180 def _fm0readmarkers(data, off):
180 def _fm0readmarkers(data, off):
181 # Loop on markers
181 # Loop on markers
182 l = len(data)
182 l = len(data)
183 while off + _fm0fsize <= l:
183 while off + _fm0fsize <= l:
184 # read fixed part
184 # read fixed part
185 cur = data[off:off + _fm0fsize]
185 cur = data[off:off + _fm0fsize]
186 off += _fm0fsize
186 off += _fm0fsize
187 numsuc, mdsize, flags, pre = _unpack(_fm0fixed, cur)
187 numsuc, mdsize, flags, pre = _unpack(_fm0fixed, cur)
188 # read replacement
188 # read replacement
189 sucs = ()
189 sucs = ()
190 if numsuc:
190 if numsuc:
191 s = (_fm0fnodesize * numsuc)
191 s = (_fm0fnodesize * numsuc)
192 cur = data[off:off + s]
192 cur = data[off:off + s]
193 sucs = _unpack(_fm0node * numsuc, cur)
193 sucs = _unpack(_fm0node * numsuc, cur)
194 off += s
194 off += s
195 # read metadata
195 # read metadata
196 # (metadata will be decoded on demand)
196 # (metadata will be decoded on demand)
197 metadata = data[off:off + mdsize]
197 metadata = data[off:off + mdsize]
198 if len(metadata) != mdsize:
198 if len(metadata) != mdsize:
199 raise error.Abort(_('parsing obsolete marker: metadata is too '
199 raise error.Abort(_('parsing obsolete marker: metadata is too '
200 'short, %d bytes expected, got %d')
200 'short, %d bytes expected, got %d')
201 % (mdsize, len(metadata)))
201 % (mdsize, len(metadata)))
202 off += mdsize
202 off += mdsize
203 metadata = _fm0decodemeta(metadata)
203 metadata = _fm0decodemeta(metadata)
204 try:
204 try:
205 when, offset = metadata.pop('date', '0 0').split(' ')
205 when, offset = metadata.pop('date', '0 0').split(' ')
206 date = float(when), int(offset)
206 date = float(when), int(offset)
207 except ValueError:
207 except ValueError:
208 date = (0., 0)
208 date = (0., 0)
209 parents = None
209 parents = None
210 if 'p2' in metadata:
210 if 'p2' in metadata:
211 parents = (metadata.pop('p1', None), metadata.pop('p2', None))
211 parents = (metadata.pop('p1', None), metadata.pop('p2', None))
212 elif 'p1' in metadata:
212 elif 'p1' in metadata:
213 parents = (metadata.pop('p1', None),)
213 parents = (metadata.pop('p1', None),)
214 elif 'p0' in metadata:
214 elif 'p0' in metadata:
215 parents = ()
215 parents = ()
216 if parents is not None:
216 if parents is not None:
217 try:
217 try:
218 parents = tuple(node.bin(p) for p in parents)
218 parents = tuple(node.bin(p) for p in parents)
219 # if parent content is not a nodeid, drop the data
219 # if parent content is not a nodeid, drop the data
220 for p in parents:
220 for p in parents:
221 if len(p) != 20:
221 if len(p) != 20:
222 parents = None
222 parents = None
223 break
223 break
224 except TypeError:
224 except TypeError:
225 # if content cannot be translated to nodeid drop the data.
225 # if content cannot be translated to nodeid drop the data.
226 parents = None
226 parents = None
227
227
228 metadata = tuple(sorted(metadata.iteritems()))
228 metadata = tuple(sorted(metadata.iteritems()))
229
229
230 yield (pre, sucs, flags, metadata, date, parents)
230 yield (pre, sucs, flags, metadata, date, parents)
231
231
232 def _fm0encodeonemarker(marker):
232 def _fm0encodeonemarker(marker):
233 pre, sucs, flags, metadata, date, parents = marker
233 pre, sucs, flags, metadata, date, parents = marker
234 if flags & usingsha256:
234 if flags & usingsha256:
235 raise error.Abort(_('cannot handle sha256 with old obsstore format'))
235 raise error.Abort(_('cannot handle sha256 with old obsstore format'))
236 metadata = dict(metadata)
236 metadata = dict(metadata)
237 time, tz = date
237 time, tz = date
238 metadata['date'] = '%r %i' % (time, tz)
238 metadata['date'] = '%r %i' % (time, tz)
239 if parents is not None:
239 if parents is not None:
240 if not parents:
240 if not parents:
241 # mark that we explicitly recorded no parents
241 # mark that we explicitly recorded no parents
242 metadata['p0'] = ''
242 metadata['p0'] = ''
243 for i, p in enumerate(parents, 1):
243 for i, p in enumerate(parents, 1):
244 metadata['p%i' % i] = node.hex(p)
244 metadata['p%i' % i] = node.hex(p)
245 metadata = _fm0encodemeta(metadata)
245 metadata = _fm0encodemeta(metadata)
246 numsuc = len(sucs)
246 numsuc = len(sucs)
247 format = _fm0fixed + (_fm0node * numsuc)
247 format = _fm0fixed + (_fm0node * numsuc)
248 data = [numsuc, len(metadata), flags, pre]
248 data = [numsuc, len(metadata), flags, pre]
249 data.extend(sucs)
249 data.extend(sucs)
250 return _pack(format, *data) + metadata
250 return _pack(format, *data) + metadata
251
251
252 def _fm0encodemeta(meta):
252 def _fm0encodemeta(meta):
253 """Return encoded metadata string to string mapping.
253 """Return encoded metadata string to string mapping.
254
254
255 Assume no ':' in key and no '\0' in both key and value."""
255 Assume no ':' in key and no '\0' in both key and value."""
256 for key, value in meta.iteritems():
256 for key, value in meta.iteritems():
257 if ':' in key or '\0' in key:
257 if ':' in key or '\0' in key:
258 raise ValueError("':' and '\0' are forbidden in metadata key'")
258 raise ValueError("':' and '\0' are forbidden in metadata key'")
259 if '\0' in value:
259 if '\0' in value:
260 raise ValueError("':' is forbidden in metadata value'")
260 raise ValueError("':' is forbidden in metadata value'")
261 return '\0'.join(['%s:%s' % (k, meta[k]) for k in sorted(meta)])
261 return '\0'.join(['%s:%s' % (k, meta[k]) for k in sorted(meta)])
262
262
263 def _fm0decodemeta(data):
263 def _fm0decodemeta(data):
264 """Return string to string dictionary from encoded version."""
264 """Return string to string dictionary from encoded version."""
265 d = {}
265 d = {}
266 for l in data.split('\0'):
266 for l in data.split('\0'):
267 if l:
267 if l:
268 key, value = l.split(':')
268 key, value = l.split(':')
269 d[key] = value
269 d[key] = value
270 return d
270 return d
271
271
272 ## Parsing and writing of version "1"
272 ## Parsing and writing of version "1"
273 #
273 #
274 # The header is followed by the markers. Each marker is made of:
274 # The header is followed by the markers. Each marker is made of:
275 #
275 #
276 # - uint32: total size of the marker (including this field)
276 # - uint32: total size of the marker (including this field)
277 #
277 #
278 # - float64: date in seconds since epoch
278 # - float64: date in seconds since epoch
279 #
279 #
280 # - int16: timezone offset in minutes
280 # - int16: timezone offset in minutes
281 #
281 #
282 # - uint16: a bit field. It is reserved for flags used in common
282 # - uint16: a bit field. It is reserved for flags used in common
283 # obsolete marker operations, to avoid repeated decoding of metadata
283 # obsolete marker operations, to avoid repeated decoding of metadata
284 # entries.
284 # entries.
285 #
285 #
286 # - uint8: number of successors "N", can be zero.
286 # - uint8: number of successors "N", can be zero.
287 #
287 #
288 # - uint8: number of parents "P", can be zero.
288 # - uint8: number of parents "P", can be zero.
289 #
289 #
290 # 0: parents data stored but no parent,
290 # 0: parents data stored but no parent,
291 # 1: one parent stored,
291 # 1: one parent stored,
292 # 2: two parents stored,
292 # 2: two parents stored,
293 # 3: no parent data stored
293 # 3: no parent data stored
294 #
294 #
295 # - uint8: number of metadata entries M
295 # - uint8: number of metadata entries M
296 #
296 #
297 # - 20 or 32 bytes: precursor changeset identifier.
297 # - 20 or 32 bytes: precursor changeset identifier.
298 #
298 #
299 # - N*(20 or 32) bytes: successors changesets identifiers.
299 # - N*(20 or 32) bytes: successors changesets identifiers.
300 #
300 #
301 # - P*(20 or 32) bytes: parents of the precursors changesets.
301 # - P*(20 or 32) bytes: parents of the precursors changesets.
302 #
302 #
303 # - M*(uint8, uint8): size of all metadata entries (key and value)
303 # - M*(uint8, uint8): size of all metadata entries (key and value)
304 #
304 #
305 # - remaining bytes: the metadata, each (key, value) pair after the other.
305 # - remaining bytes: the metadata, each (key, value) pair after the other.
306 _fm1version = 1
306 _fm1version = 1
307 _fm1fixed = '>IdhHBBB20s'
307 _fm1fixed = '>IdhHBBB20s'
308 _fm1nodesha1 = '20s'
308 _fm1nodesha1 = '20s'
309 _fm1nodesha256 = '32s'
309 _fm1nodesha256 = '32s'
310 _fm1nodesha1size = _calcsize(_fm1nodesha1)
310 _fm1nodesha1size = _calcsize(_fm1nodesha1)
311 _fm1nodesha256size = _calcsize(_fm1nodesha256)
311 _fm1nodesha256size = _calcsize(_fm1nodesha256)
312 _fm1fsize = _calcsize(_fm1fixed)
312 _fm1fsize = _calcsize(_fm1fixed)
313 _fm1parentnone = 3
313 _fm1parentnone = 3
314 _fm1parentshift = 14
314 _fm1parentshift = 14
315 _fm1parentmask = (_fm1parentnone << _fm1parentshift)
315 _fm1parentmask = (_fm1parentnone << _fm1parentshift)
316 _fm1metapair = 'BB'
316 _fm1metapair = 'BB'
317 _fm1metapairsize = _calcsize('BB')
317 _fm1metapairsize = _calcsize('BB')
318
318
319 def _fm1purereadmarkers(data, off):
319 def _fm1purereadmarkers(data, off):
320 # make some global constants local for performance
320 # make some global constants local for performance
321 noneflag = _fm1parentnone
321 noneflag = _fm1parentnone
322 sha2flag = usingsha256
322 sha2flag = usingsha256
323 sha1size = _fm1nodesha1size
323 sha1size = _fm1nodesha1size
324 sha2size = _fm1nodesha256size
324 sha2size = _fm1nodesha256size
325 sha1fmt = _fm1nodesha1
325 sha1fmt = _fm1nodesha1
326 sha2fmt = _fm1nodesha256
326 sha2fmt = _fm1nodesha256
327 metasize = _fm1metapairsize
327 metasize = _fm1metapairsize
328 metafmt = _fm1metapair
328 metafmt = _fm1metapair
329 fsize = _fm1fsize
329 fsize = _fm1fsize
330 unpack = _unpack
330 unpack = _unpack
331
331
332 # Loop on markers
332 # Loop on markers
333 stop = len(data) - _fm1fsize
333 stop = len(data) - _fm1fsize
334 ufixed = struct.Struct(_fm1fixed).unpack
334 ufixed = struct.Struct(_fm1fixed).unpack
335
335
336 while off <= stop:
336 while off <= stop:
337 # read fixed part
337 # read fixed part
338 o1 = off + fsize
338 o1 = off + fsize
339 t, secs, tz, flags, numsuc, numpar, nummeta, prec = ufixed(data[off:o1])
339 t, secs, tz, flags, numsuc, numpar, nummeta, prec = ufixed(data[off:o1])
340
340
341 if flags & sha2flag:
341 if flags & sha2flag:
342 # FIXME: prec was read as a SHA1, needs to be amended
342 # FIXME: prec was read as a SHA1, needs to be amended
343
343
344 # read 0 or more successors
344 # read 0 or more successors
345 if numsuc == 1:
345 if numsuc == 1:
346 o2 = o1 + sha2size
346 o2 = o1 + sha2size
347 sucs = (data[o1:o2],)
347 sucs = (data[o1:o2],)
348 else:
348 else:
349 o2 = o1 + sha2size * numsuc
349 o2 = o1 + sha2size * numsuc
350 sucs = unpack(sha2fmt * numsuc, data[o1:o2])
350 sucs = unpack(sha2fmt * numsuc, data[o1:o2])
351
351
352 # read parents
352 # read parents
353 if numpar == noneflag:
353 if numpar == noneflag:
354 o3 = o2
354 o3 = o2
355 parents = None
355 parents = None
356 elif numpar == 1:
356 elif numpar == 1:
357 o3 = o2 + sha2size
357 o3 = o2 + sha2size
358 parents = (data[o2:o3],)
358 parents = (data[o2:o3],)
359 else:
359 else:
360 o3 = o2 + sha2size * numpar
360 o3 = o2 + sha2size * numpar
361 parents = unpack(sha2fmt * numpar, data[o2:o3])
361 parents = unpack(sha2fmt * numpar, data[o2:o3])
362 else:
362 else:
363 # read 0 or more successors
363 # read 0 or more successors
364 if numsuc == 1:
364 if numsuc == 1:
365 o2 = o1 + sha1size
365 o2 = o1 + sha1size
366 sucs = (data[o1:o2],)
366 sucs = (data[o1:o2],)
367 else:
367 else:
368 o2 = o1 + sha1size * numsuc
368 o2 = o1 + sha1size * numsuc
369 sucs = unpack(sha1fmt * numsuc, data[o1:o2])
369 sucs = unpack(sha1fmt * numsuc, data[o1:o2])
370
370
371 # read parents
371 # read parents
372 if numpar == noneflag:
372 if numpar == noneflag:
373 o3 = o2
373 o3 = o2
374 parents = None
374 parents = None
375 elif numpar == 1:
375 elif numpar == 1:
376 o3 = o2 + sha1size
376 o3 = o2 + sha1size
377 parents = (data[o2:o3],)
377 parents = (data[o2:o3],)
378 else:
378 else:
379 o3 = o2 + sha1size * numpar
379 o3 = o2 + sha1size * numpar
380 parents = unpack(sha1fmt * numpar, data[o2:o3])
380 parents = unpack(sha1fmt * numpar, data[o2:o3])
381
381
382 # read metadata
382 # read metadata
383 off = o3 + metasize * nummeta
383 off = o3 + metasize * nummeta
384 metapairsize = unpack('>' + (metafmt * nummeta), data[o3:off])
384 metapairsize = unpack('>' + (metafmt * nummeta), data[o3:off])
385 metadata = []
385 metadata = []
386 for idx in xrange(0, len(metapairsize), 2):
386 for idx in xrange(0, len(metapairsize), 2):
387 o1 = off + metapairsize[idx]
387 o1 = off + metapairsize[idx]
388 o2 = o1 + metapairsize[idx + 1]
388 o2 = o1 + metapairsize[idx + 1]
389 metadata.append((data[off:o1], data[o1:o2]))
389 metadata.append((data[off:o1], data[o1:o2]))
390 off = o2
390 off = o2
391
391
392 yield (prec, sucs, flags, tuple(metadata), (secs, tz * 60), parents)
392 yield (prec, sucs, flags, tuple(metadata), (secs, tz * 60), parents)
393
393
394 def _fm1encodeonemarker(marker):
394 def _fm1encodeonemarker(marker):
395 pre, sucs, flags, metadata, date, parents = marker
395 pre, sucs, flags, metadata, date, parents = marker
396 # determine node size
396 # determine node size
397 _fm1node = _fm1nodesha1
397 _fm1node = _fm1nodesha1
398 if flags & usingsha256:
398 if flags & usingsha256:
399 _fm1node = _fm1nodesha256
399 _fm1node = _fm1nodesha256
400 numsuc = len(sucs)
400 numsuc = len(sucs)
401 numextranodes = numsuc
401 numextranodes = numsuc
402 if parents is None:
402 if parents is None:
403 numpar = _fm1parentnone
403 numpar = _fm1parentnone
404 else:
404 else:
405 numpar = len(parents)
405 numpar = len(parents)
406 numextranodes += numpar
406 numextranodes += numpar
407 formatnodes = _fm1node * numextranodes
407 formatnodes = _fm1node * numextranodes
408 formatmeta = _fm1metapair * len(metadata)
408 formatmeta = _fm1metapair * len(metadata)
409 format = _fm1fixed + formatnodes + formatmeta
409 format = _fm1fixed + formatnodes + formatmeta
410 # tz is stored in minutes so we divide by 60
410 # tz is stored in minutes so we divide by 60
411 tz = date[1]//60
411 tz = date[1]//60
412 data = [None, date[0], tz, flags, numsuc, numpar, len(metadata), pre]
412 data = [None, date[0], tz, flags, numsuc, numpar, len(metadata), pre]
413 data.extend(sucs)
413 data.extend(sucs)
414 if parents is not None:
414 if parents is not None:
415 data.extend(parents)
415 data.extend(parents)
416 totalsize = _calcsize(format)
416 totalsize = _calcsize(format)
417 for key, value in metadata:
417 for key, value in metadata:
418 lk = len(key)
418 lk = len(key)
419 lv = len(value)
419 lv = len(value)
420 data.append(lk)
420 data.append(lk)
421 data.append(lv)
421 data.append(lv)
422 totalsize += lk + lv
422 totalsize += lk + lv
423 data[0] = totalsize
423 data[0] = totalsize
424 data = [_pack(format, *data)]
424 data = [_pack(format, *data)]
425 for key, value in metadata:
425 for key, value in metadata:
426 data.append(key)
426 data.append(key)
427 data.append(value)
427 data.append(value)
428 return ''.join(data)
428 return ''.join(data)
429
429
430 def _fm1readmarkers(data, off):
430 def _fm1readmarkers(data, off):
431 native = getattr(parsers, 'fm1readmarkers', None)
431 native = getattr(parsers, 'fm1readmarkers', None)
432 if not native:
432 if not native:
433 return _fm1purereadmarkers(data, off)
433 return _fm1purereadmarkers(data, off)
434 stop = len(data) - _fm1fsize
434 stop = len(data) - _fm1fsize
435 return native(data, off, stop)
435 return native(data, off, stop)
436
436
437 # mapping to read/write various marker formats
437 # mapping to read/write various marker formats
438 # <version> -> (decoder, encoder)
438 # <version> -> (decoder, encoder)
439 formats = {_fm0version: (_fm0readmarkers, _fm0encodeonemarker),
439 formats = {_fm0version: (_fm0readmarkers, _fm0encodeonemarker),
440 _fm1version: (_fm1readmarkers, _fm1encodeonemarker)}
440 _fm1version: (_fm1readmarkers, _fm1encodeonemarker)}
441
441
442 @util.nogc
442 @util.nogc
443 def _readmarkers(data):
443 def _readmarkers(data):
444 """Read and enumerate markers from raw data"""
444 """Read and enumerate markers from raw data"""
445 off = 0
445 off = 0
446 diskversion = _unpack('>B', data[off:off + 1])[0]
446 diskversion = _unpack('>B', data[off:off + 1])[0]
447 off += 1
447 off += 1
448 if diskversion not in formats:
448 if diskversion not in formats:
449 raise error.Abort(_('parsing obsolete marker: unknown version %r')
449 raise error.Abort(_('parsing obsolete marker: unknown version %r')
450 % diskversion)
450 % diskversion)
451 return diskversion, formats[diskversion][0](data, off)
451 return diskversion, formats[diskversion][0](data, off)
452
452
453 def encodemarkers(markers, addheader=False, version=_fm0version):
453 def encodemarkers(markers, addheader=False, version=_fm0version):
454 # Kept separate from flushmarkers(), it will be reused for
454 # Kept separate from flushmarkers(), it will be reused for
455 # markers exchange.
455 # markers exchange.
456 encodeone = formats[version][1]
456 encodeone = formats[version][1]
457 if addheader:
457 if addheader:
458 yield _pack('>B', version)
458 yield _pack('>B', version)
459 for marker in markers:
459 for marker in markers:
460 yield encodeone(marker)
460 yield encodeone(marker)
461
461
462
462
463 class marker(object):
463 class marker(object):
464 """Wrap obsolete marker raw data"""
464 """Wrap obsolete marker raw data"""
465
465
466 def __init__(self, repo, data):
466 def __init__(self, repo, data):
467 # the repo argument will be used to create changectx in later version
467 # the repo argument will be used to create changectx in later version
468 self._repo = repo
468 self._repo = repo
469 self._data = data
469 self._data = data
470 self._decodedmeta = None
470 self._decodedmeta = None
471
471
472 def __hash__(self):
472 def __hash__(self):
473 return hash(self._data)
473 return hash(self._data)
474
474
475 def __eq__(self, other):
475 def __eq__(self, other):
476 if type(other) != type(self):
476 if type(other) != type(self):
477 return False
477 return False
478 return self._data == other._data
478 return self._data == other._data
479
479
480 def precnode(self):
480 def precnode(self):
481 """Precursor changeset node identifier"""
481 """Precursor changeset node identifier"""
482 return self._data[0]
482 return self._data[0]
483
483
484 def succnodes(self):
484 def succnodes(self):
485 """List of successor changesets node identifiers"""
485 """List of successor changesets node identifiers"""
486 return self._data[1]
486 return self._data[1]
487
487
488 def parentnodes(self):
488 def parentnodes(self):
489 """Parents of the precursors (None if not recorded)"""
489 """Parents of the precursors (None if not recorded)"""
490 return self._data[5]
490 return self._data[5]
491
491
492 def metadata(self):
492 def metadata(self):
493 """Decoded metadata dictionary"""
493 """Decoded metadata dictionary"""
494 return dict(self._data[3])
494 return dict(self._data[3])
495
495
496 def date(self):
496 def date(self):
497 """Creation date as (unixtime, offset)"""
497 """Creation date as (unixtime, offset)"""
498 return self._data[4]
498 return self._data[4]
499
499
500 def flags(self):
500 def flags(self):
501 """The flags field of the marker"""
501 """The flags field of the marker"""
502 return self._data[2]
502 return self._data[2]
503
503
504 @util.nogc
504 @util.nogc
505 def _addsuccessors(successors, markers):
505 def _addsuccessors(successors, markers):
506 for mark in markers:
506 for mark in markers:
507 successors.setdefault(mark[0], set()).add(mark)
507 successors.setdefault(mark[0], set()).add(mark)
508
508
509 @util.nogc
509 @util.nogc
510 def _addprecursors(precursors, markers):
510 def _addprecursors(precursors, markers):
511 for mark in markers:
511 for mark in markers:
512 for suc in mark[1]:
512 for suc in mark[1]:
513 precursors.setdefault(suc, set()).add(mark)
513 precursors.setdefault(suc, set()).add(mark)
514
514
515 @util.nogc
515 @util.nogc
516 def _addchildren(children, markers):
516 def _addchildren(children, markers):
517 for mark in markers:
517 for mark in markers:
518 parents = mark[5]
518 parents = mark[5]
519 if parents is not None:
519 if parents is not None:
520 for p in parents:
520 for p in parents:
521 children.setdefault(p, set()).add(mark)
521 children.setdefault(p, set()).add(mark)
522
522
523 def _checkinvalidmarkers(markers):
523 def _checkinvalidmarkers(markers):
524 """search for marker with invalid data and raise error if needed
524 """search for marker with invalid data and raise error if needed
525
525
526 Exist as a separated function to allow the evolve extension for a more
526 Exist as a separated function to allow the evolve extension for a more
527 subtle handling.
527 subtle handling.
528 """
528 """
529 for mark in markers:
529 for mark in markers:
530 if node.nullid in mark[1]:
530 if node.nullid in mark[1]:
531 raise error.Abort(_('bad obsolescence marker detected: '
531 raise error.Abort(_('bad obsolescence marker detected: '
532 'invalid successors nullid'))
532 'invalid successors nullid'))
533
533
534 class obsstore(object):
534 class obsstore(object):
535 """Store obsolete markers
535 """Store obsolete markers
536
536
537 Markers can be accessed with two mappings:
537 Markers can be accessed with two mappings:
538 - precursors[x] -> set(markers on precursors edges of x)
538 - precursors[x] -> set(markers on precursors edges of x)
539 - successors[x] -> set(markers on successors edges of x)
539 - successors[x] -> set(markers on successors edges of x)
540 - children[x] -> set(markers on precursors edges of children(x)
540 - children[x] -> set(markers on precursors edges of children(x)
541 """
541 """
542
542
543 fields = ('prec', 'succs', 'flag', 'meta', 'date', 'parents')
543 fields = ('prec', 'succs', 'flag', 'meta', 'date', 'parents')
544 # prec: nodeid, precursor changesets
544 # prec: nodeid, precursor changesets
545 # succs: tuple of nodeid, successor changesets (0-N length)
545 # succs: tuple of nodeid, successor changesets (0-N length)
546 # flag: integer, flag field carrying modifier for the markers (see doc)
546 # flag: integer, flag field carrying modifier for the markers (see doc)
547 # meta: binary blob, encoded metadata dictionary
547 # meta: binary blob, encoded metadata dictionary
548 # date: (float, int) tuple, date of marker creation
548 # date: (float, int) tuple, date of marker creation
549 # parents: (tuple of nodeid) or None, parents of precursors
549 # parents: (tuple of nodeid) or None, parents of precursors
550 # None is used when no data has been recorded
550 # None is used when no data has been recorded
551
551
552 def __init__(self, svfs, defaultformat=_fm1version, readonly=False):
552 def __init__(self, svfs, defaultformat=_fm1version, readonly=False):
553 # caches for various obsolescence related cache
553 # caches for various obsolescence related cache
554 self.caches = {}
554 self.caches = {}
555 self.svfs = svfs
555 self.svfs = svfs
556 self._version = defaultformat
556 self._version = defaultformat
557 self._readonly = readonly
557 self._readonly = readonly
558
558
559 def __iter__(self):
559 def __iter__(self):
560 return iter(self._all)
560 return iter(self._all)
561
561
562 def __len__(self):
562 def __len__(self):
563 return len(self._all)
563 return len(self._all)
564
564
565 def __nonzero__(self):
565 def __nonzero__(self):
566 if not self._cached('_all'):
566 if not self._cached('_all'):
567 try:
567 try:
568 return self.svfs.stat('obsstore').st_size > 1
568 return self.svfs.stat('obsstore').st_size > 1
569 except OSError as inst:
569 except OSError as inst:
570 if inst.errno != errno.ENOENT:
570 if inst.errno != errno.ENOENT:
571 raise
571 raise
572 # just build an empty _all list if no obsstore exists, which
572 # just build an empty _all list if no obsstore exists, which
573 # avoids further stat() syscalls
573 # avoids further stat() syscalls
574 pass
574 pass
575 return bool(self._all)
575 return bool(self._all)
576
576
577 __bool__ = __nonzero__
577 __bool__ = __nonzero__
578
578
579 @property
579 @property
580 def readonly(self):
580 def readonly(self):
581 """True if marker creation is disabled
581 """True if marker creation is disabled
582
582
583 Remove me in the future when obsolete marker is always on."""
583 Remove me in the future when obsolete marker is always on."""
584 return self._readonly
584 return self._readonly
585
585
586 def create(self, transaction, prec, succs=(), flag=0, parents=None,
586 def create(self, transaction, prec, succs=(), flag=0, parents=None,
587 date=None, metadata=None):
587 date=None, metadata=None):
588 """obsolete: add a new obsolete marker
588 """obsolete: add a new obsolete marker
589
589
590 * ensuring it is hashable
590 * ensuring it is hashable
591 * check mandatory metadata
591 * check mandatory metadata
592 * encode metadata
592 * encode metadata
593
593
594 If you are a human writing code creating marker you want to use the
594 If you are a human writing code creating marker you want to use the
595 `createmarkers` function in this module instead.
595 `createmarkers` function in this module instead.
596
596
597 return True if a new marker have been added, False if the markers
597 return True if a new marker have been added, False if the markers
598 already existed (no op).
598 already existed (no op).
599 """
599 """
600 if metadata is None:
600 if metadata is None:
601 metadata = {}
601 metadata = {}
602 if date is None:
602 if date is None:
603 if 'date' in metadata:
603 if 'date' in metadata:
604 # as a courtesy for out-of-tree extensions
604 # as a courtesy for out-of-tree extensions
605 date = util.parsedate(metadata.pop('date'))
605 date = util.parsedate(metadata.pop('date'))
606 else:
606 else:
607 date = util.makedate()
607 date = util.makedate()
608 if len(prec) != 20:
608 if len(prec) != 20:
609 raise ValueError(prec)
609 raise ValueError(prec)
610 for succ in succs:
610 for succ in succs:
611 if len(succ) != 20:
611 if len(succ) != 20:
612 raise ValueError(succ)
612 raise ValueError(succ)
613 if prec in succs:
613 if prec in succs:
614 raise ValueError(_('in-marker cycle with %s') % node.hex(prec))
614 raise ValueError(_('in-marker cycle with %s') % node.hex(prec))
615
615
616 metadata = tuple(sorted(metadata.iteritems()))
616 metadata = tuple(sorted(metadata.iteritems()))
617
617
618 marker = (str(prec), tuple(succs), int(flag), metadata, date, parents)
618 marker = (str(prec), tuple(succs), int(flag), metadata, date, parents)
619 return bool(self.add(transaction, [marker]))
619 return bool(self.add(transaction, [marker]))
620
620
621 def add(self, transaction, markers):
621 def add(self, transaction, markers):
622 """Add new markers to the store
622 """Add new markers to the store
623
623
624 Take care of filtering duplicate.
624 Take care of filtering duplicate.
625 Return the number of new marker."""
625 Return the number of new marker."""
626 if self._readonly:
626 if self._readonly:
627 raise error.Abort(_('creating obsolete markers is not enabled on '
627 raise error.Abort(_('creating obsolete markers is not enabled on '
628 'this repo'))
628 'this repo'))
629 known = set(self._all)
629 known = set(self._all)
630 new = []
630 new = []
631 for m in markers:
631 for m in markers:
632 if m not in known:
632 if m not in known:
633 known.add(m)
633 known.add(m)
634 new.append(m)
634 new.append(m)
635 if new:
635 if new:
636 f = self.svfs('obsstore', 'ab')
636 f = self.svfs('obsstore', 'ab')
637 try:
637 try:
638 offset = f.tell()
638 offset = f.tell()
639 transaction.add('obsstore', offset)
639 transaction.add('obsstore', offset)
640 # offset == 0: new file - add the version header
640 # offset == 0: new file - add the version header
641 for bytes in encodemarkers(new, offset == 0, self._version):
641 for bytes in encodemarkers(new, offset == 0, self._version):
642 f.write(bytes)
642 f.write(bytes)
643 finally:
643 finally:
644 # XXX: f.close() == filecache invalidation == obsstore rebuilt.
644 # XXX: f.close() == filecache invalidation == obsstore rebuilt.
645 # call 'filecacheentry.refresh()' here
645 # call 'filecacheentry.refresh()' here
646 f.close()
646 f.close()
647 self._addmarkers(new)
647 self._addmarkers(new)
648 # new marker *may* have changed several set. invalidate the cache.
648 # new marker *may* have changed several set. invalidate the cache.
649 self.caches.clear()
649 self.caches.clear()
650 # records the number of new markers for the transaction hooks
650 # records the number of new markers for the transaction hooks
651 previous = int(transaction.hookargs.get('new_obsmarkers', '0'))
651 previous = int(transaction.hookargs.get('new_obsmarkers', '0'))
652 transaction.hookargs['new_obsmarkers'] = str(previous + len(new))
652 transaction.hookargs['new_obsmarkers'] = str(previous + len(new))
653 return len(new)
653 return len(new)
654
654
655 def mergemarkers(self, transaction, data):
655 def mergemarkers(self, transaction, data):
656 """merge a binary stream of markers inside the obsstore
656 """merge a binary stream of markers inside the obsstore
657
657
658 Returns the number of new markers added."""
658 Returns the number of new markers added."""
659 version, markers = _readmarkers(data)
659 version, markers = _readmarkers(data)
660 return self.add(transaction, markers)
660 return self.add(transaction, markers)
661
661
662 @propertycache
662 @propertycache
663 def _all(self):
663 def _all(self):
664 data = self.svfs.tryread('obsstore')
664 data = self.svfs.tryread('obsstore')
665 if not data:
665 if not data:
666 return []
666 return []
667 self._version, markers = _readmarkers(data)
667 self._version, markers = _readmarkers(data)
668 markers = list(markers)
668 markers = list(markers)
669 _checkinvalidmarkers(markers)
669 _checkinvalidmarkers(markers)
670 return markers
670 return markers
671
671
672 @propertycache
672 @propertycache
673 def successors(self):
673 def successors(self):
674 successors = {}
674 successors = {}
675 _addsuccessors(successors, self._all)
675 _addsuccessors(successors, self._all)
676 return successors
676 return successors
677
677
678 @propertycache
678 @propertycache
679 def precursors(self):
679 def precursors(self):
680 precursors = {}
680 precursors = {}
681 _addprecursors(precursors, self._all)
681 _addprecursors(precursors, self._all)
682 return precursors
682 return precursors
683
683
684 @propertycache
684 @propertycache
685 def children(self):
685 def children(self):
686 children = {}
686 children = {}
687 _addchildren(children, self._all)
687 _addchildren(children, self._all)
688 return children
688 return children
689
689
690 def _cached(self, attr):
690 def _cached(self, attr):
691 return attr in self.__dict__
691 return attr in self.__dict__
692
692
693 def _addmarkers(self, markers):
693 def _addmarkers(self, markers):
694 markers = list(markers) # to allow repeated iteration
694 markers = list(markers) # to allow repeated iteration
695 self._all.extend(markers)
695 self._all.extend(markers)
696 if self._cached('successors'):
696 if self._cached('successors'):
697 _addsuccessors(self.successors, markers)
697 _addsuccessors(self.successors, markers)
698 if self._cached('precursors'):
698 if self._cached('precursors'):
699 _addprecursors(self.precursors, markers)
699 _addprecursors(self.precursors, markers)
700 if self._cached('children'):
700 if self._cached('children'):
701 _addchildren(self.children, markers)
701 _addchildren(self.children, markers)
702 _checkinvalidmarkers(markers)
702 _checkinvalidmarkers(markers)
703
703
704 def relevantmarkers(self, nodes):
704 def relevantmarkers(self, nodes):
705 """return a set of all obsolescence markers relevant to a set of nodes.
705 """return a set of all obsolescence markers relevant to a set of nodes.
706
706
707 "relevant" to a set of nodes mean:
707 "relevant" to a set of nodes mean:
708
708
709 - marker that use this changeset as successor
709 - marker that use this changeset as successor
710 - prune marker of direct children on this changeset
710 - prune marker of direct children on this changeset
711 - recursive application of the two rules on precursors of these markers
711 - recursive application of the two rules on precursors of these markers
712
712
713 It is a set so you cannot rely on order."""
713 It is a set so you cannot rely on order."""
714
714
715 pendingnodes = set(nodes)
715 pendingnodes = set(nodes)
716 seenmarkers = set()
716 seenmarkers = set()
717 seennodes = set(pendingnodes)
717 seennodes = set(pendingnodes)
718 precursorsmarkers = self.precursors
718 precursorsmarkers = self.precursors
719 children = self.children
719 children = self.children
720 while pendingnodes:
720 while pendingnodes:
721 direct = set()
721 direct = set()
722 for current in pendingnodes:
722 for current in pendingnodes:
723 direct.update(precursorsmarkers.get(current, ()))
723 direct.update(precursorsmarkers.get(current, ()))
724 pruned = [m for m in children.get(current, ()) if not m[1]]
724 pruned = [m for m in children.get(current, ()) if not m[1]]
725 direct.update(pruned)
725 direct.update(pruned)
726 direct -= seenmarkers
726 direct -= seenmarkers
727 pendingnodes = set([m[0] for m in direct])
727 pendingnodes = set([m[0] for m in direct])
728 seenmarkers |= direct
728 seenmarkers |= direct
729 pendingnodes -= seennodes
729 pendingnodes -= seennodes
730 seennodes |= pendingnodes
730 seennodes |= pendingnodes
731 return seenmarkers
731 return seenmarkers
732
732
733 def commonversion(versions):
733 def commonversion(versions):
734 """Return the newest version listed in both versions and our local formats.
734 """Return the newest version listed in both versions and our local formats.
735
735
736 Returns None if no common version exists.
736 Returns None if no common version exists.
737 """
737 """
738 versions.sort(reverse=True)
738 versions.sort(reverse=True)
739 # search for highest version known on both side
739 # search for highest version known on both side
740 for v in versions:
740 for v in versions:
741 if v in formats:
741 if v in formats:
742 return v
742 return v
743 return None
743 return None
744
744
745 # arbitrary picked to fit into 8K limit from HTTP server
745 # arbitrary picked to fit into 8K limit from HTTP server
746 # you have to take in account:
746 # you have to take in account:
747 # - the version header
747 # - the version header
748 # - the base85 encoding
748 # - the base85 encoding
749 _maxpayload = 5300
749 _maxpayload = 5300
750
750
751 def _pushkeyescape(markers):
751 def _pushkeyescape(markers):
752 """encode markers into a dict suitable for pushkey exchange
752 """encode markers into a dict suitable for pushkey exchange
753
753
754 - binary data is base85 encoded
754 - binary data is base85 encoded
755 - split in chunks smaller than 5300 bytes"""
755 - split in chunks smaller than 5300 bytes"""
756 keys = {}
756 keys = {}
757 parts = []
757 parts = []
758 currentlen = _maxpayload * 2 # ensure we create a new part
758 currentlen = _maxpayload * 2 # ensure we create a new part
759 for marker in markers:
759 for marker in markers:
760 nextdata = _fm0encodeonemarker(marker)
760 nextdata = _fm0encodeonemarker(marker)
761 if (len(nextdata) + currentlen > _maxpayload):
761 if (len(nextdata) + currentlen > _maxpayload):
762 currentpart = []
762 currentpart = []
763 currentlen = 0
763 currentlen = 0
764 parts.append(currentpart)
764 parts.append(currentpart)
765 currentpart.append(nextdata)
765 currentpart.append(nextdata)
766 currentlen += len(nextdata)
766 currentlen += len(nextdata)
767 for idx, part in enumerate(reversed(parts)):
767 for idx, part in enumerate(reversed(parts)):
768 data = ''.join([_pack('>B', _fm0version)] + part)
768 data = ''.join([_pack('>B', _fm0version)] + part)
769 keys['dump%i' % idx] = util.b85encode(data)
769 keys['dump%i' % idx] = util.b85encode(data)
770 return keys
770 return keys
771
771
772 def listmarkers(repo):
772 def listmarkers(repo):
773 """List markers over pushkey"""
773 """List markers over pushkey"""
774 if not repo.obsstore:
774 if not repo.obsstore:
775 return {}
775 return {}
776 return _pushkeyescape(sorted(repo.obsstore))
776 return _pushkeyescape(sorted(repo.obsstore))
777
777
778 def pushmarker(repo, key, old, new):
778 def pushmarker(repo, key, old, new):
779 """Push markers over pushkey"""
779 """Push markers over pushkey"""
780 if not key.startswith('dump'):
780 if not key.startswith('dump'):
781 repo.ui.warn(_('unknown key: %r') % key)
781 repo.ui.warn(_('unknown key: %r') % key)
782 return 0
782 return 0
783 if old:
783 if old:
784 repo.ui.warn(_('unexpected old value for %r') % key)
784 repo.ui.warn(_('unexpected old value for %r') % key)
785 return 0
785 return 0
786 data = util.b85decode(new)
786 data = util.b85decode(new)
787 lock = repo.lock()
787 lock = repo.lock()
788 try:
788 try:
789 tr = repo.transaction('pushkey: obsolete markers')
789 tr = repo.transaction('pushkey: obsolete markers')
790 try:
790 try:
791 repo.obsstore.mergemarkers(tr, data)
791 repo.obsstore.mergemarkers(tr, data)
792 repo.invalidatevolatilesets()
792 tr.close()
793 tr.close()
793 return 1
794 return 1
794 finally:
795 finally:
795 tr.release()
796 tr.release()
796 finally:
797 finally:
797 lock.release()
798 lock.release()
798
799
799 def getmarkers(repo, nodes=None):
800 def getmarkers(repo, nodes=None):
800 """returns markers known in a repository
801 """returns markers known in a repository
801
802
802 If <nodes> is specified, only markers "relevant" to those nodes are are
803 If <nodes> is specified, only markers "relevant" to those nodes are are
803 returned"""
804 returned"""
804 if nodes is None:
805 if nodes is None:
805 rawmarkers = repo.obsstore
806 rawmarkers = repo.obsstore
806 else:
807 else:
807 rawmarkers = repo.obsstore.relevantmarkers(nodes)
808 rawmarkers = repo.obsstore.relevantmarkers(nodes)
808
809
809 for markerdata in rawmarkers:
810 for markerdata in rawmarkers:
810 yield marker(repo, markerdata)
811 yield marker(repo, markerdata)
811
812
812 def relevantmarkers(repo, node):
813 def relevantmarkers(repo, node):
813 """all obsolete markers relevant to some revision"""
814 """all obsolete markers relevant to some revision"""
814 for markerdata in repo.obsstore.relevantmarkers(node):
815 for markerdata in repo.obsstore.relevantmarkers(node):
815 yield marker(repo, markerdata)
816 yield marker(repo, markerdata)
816
817
817
818
818 def precursormarkers(ctx):
819 def precursormarkers(ctx):
819 """obsolete marker marking this changeset as a successors"""
820 """obsolete marker marking this changeset as a successors"""
820 for data in ctx.repo().obsstore.precursors.get(ctx.node(), ()):
821 for data in ctx.repo().obsstore.precursors.get(ctx.node(), ()):
821 yield marker(ctx.repo(), data)
822 yield marker(ctx.repo(), data)
822
823
823 def successormarkers(ctx):
824 def successormarkers(ctx):
824 """obsolete marker making this changeset obsolete"""
825 """obsolete marker making this changeset obsolete"""
825 for data in ctx.repo().obsstore.successors.get(ctx.node(), ()):
826 for data in ctx.repo().obsstore.successors.get(ctx.node(), ()):
826 yield marker(ctx.repo(), data)
827 yield marker(ctx.repo(), data)
827
828
828 def allsuccessors(obsstore, nodes, ignoreflags=0):
829 def allsuccessors(obsstore, nodes, ignoreflags=0):
829 """Yield node for every successor of <nodes>.
830 """Yield node for every successor of <nodes>.
830
831
831 Some successors may be unknown locally.
832 Some successors may be unknown locally.
832
833
833 This is a linear yield unsuited to detecting split changesets. It includes
834 This is a linear yield unsuited to detecting split changesets. It includes
834 initial nodes too."""
835 initial nodes too."""
835 remaining = set(nodes)
836 remaining = set(nodes)
836 seen = set(remaining)
837 seen = set(remaining)
837 while remaining:
838 while remaining:
838 current = remaining.pop()
839 current = remaining.pop()
839 yield current
840 yield current
840 for mark in obsstore.successors.get(current, ()):
841 for mark in obsstore.successors.get(current, ()):
841 # ignore marker flagged with specified flag
842 # ignore marker flagged with specified flag
842 if mark[2] & ignoreflags:
843 if mark[2] & ignoreflags:
843 continue
844 continue
844 for suc in mark[1]:
845 for suc in mark[1]:
845 if suc not in seen:
846 if suc not in seen:
846 seen.add(suc)
847 seen.add(suc)
847 remaining.add(suc)
848 remaining.add(suc)
848
849
849 def allprecursors(obsstore, nodes, ignoreflags=0):
850 def allprecursors(obsstore, nodes, ignoreflags=0):
850 """Yield node for every precursors of <nodes>.
851 """Yield node for every precursors of <nodes>.
851
852
852 Some precursors may be unknown locally.
853 Some precursors may be unknown locally.
853
854
854 This is a linear yield unsuited to detecting folded changesets. It includes
855 This is a linear yield unsuited to detecting folded changesets. It includes
855 initial nodes too."""
856 initial nodes too."""
856
857
857 remaining = set(nodes)
858 remaining = set(nodes)
858 seen = set(remaining)
859 seen = set(remaining)
859 while remaining:
860 while remaining:
860 current = remaining.pop()
861 current = remaining.pop()
861 yield current
862 yield current
862 for mark in obsstore.precursors.get(current, ()):
863 for mark in obsstore.precursors.get(current, ()):
863 # ignore marker flagged with specified flag
864 # ignore marker flagged with specified flag
864 if mark[2] & ignoreflags:
865 if mark[2] & ignoreflags:
865 continue
866 continue
866 suc = mark[0]
867 suc = mark[0]
867 if suc not in seen:
868 if suc not in seen:
868 seen.add(suc)
869 seen.add(suc)
869 remaining.add(suc)
870 remaining.add(suc)
870
871
871 def foreground(repo, nodes):
872 def foreground(repo, nodes):
872 """return all nodes in the "foreground" of other node
873 """return all nodes in the "foreground" of other node
873
874
874 The foreground of a revision is anything reachable using parent -> children
875 The foreground of a revision is anything reachable using parent -> children
875 or precursor -> successor relation. It is very similar to "descendant" but
876 or precursor -> successor relation. It is very similar to "descendant" but
876 augmented with obsolescence information.
877 augmented with obsolescence information.
877
878
878 Beware that possible obsolescence cycle may result if complex situation.
879 Beware that possible obsolescence cycle may result if complex situation.
879 """
880 """
880 repo = repo.unfiltered()
881 repo = repo.unfiltered()
881 foreground = set(repo.set('%ln::', nodes))
882 foreground = set(repo.set('%ln::', nodes))
882 if repo.obsstore:
883 if repo.obsstore:
883 # We only need this complicated logic if there is obsolescence
884 # We only need this complicated logic if there is obsolescence
884 # XXX will probably deserve an optimised revset.
885 # XXX will probably deserve an optimised revset.
885 nm = repo.changelog.nodemap
886 nm = repo.changelog.nodemap
886 plen = -1
887 plen = -1
887 # compute the whole set of successors or descendants
888 # compute the whole set of successors or descendants
888 while len(foreground) != plen:
889 while len(foreground) != plen:
889 plen = len(foreground)
890 plen = len(foreground)
890 succs = set(c.node() for c in foreground)
891 succs = set(c.node() for c in foreground)
891 mutable = [c.node() for c in foreground if c.mutable()]
892 mutable = [c.node() for c in foreground if c.mutable()]
892 succs.update(allsuccessors(repo.obsstore, mutable))
893 succs.update(allsuccessors(repo.obsstore, mutable))
893 known = (n for n in succs if n in nm)
894 known = (n for n in succs if n in nm)
894 foreground = set(repo.set('%ln::', known))
895 foreground = set(repo.set('%ln::', known))
895 return set(c.node() for c in foreground)
896 return set(c.node() for c in foreground)
896
897
897
898
898 def successorssets(repo, initialnode, cache=None):
899 def successorssets(repo, initialnode, cache=None):
899 """Return set of all latest successors of initial nodes
900 """Return set of all latest successors of initial nodes
900
901
901 The successors set of a changeset A are the group of revisions that succeed
902 The successors set of a changeset A are the group of revisions that succeed
902 A. It succeeds A as a consistent whole, each revision being only a partial
903 A. It succeeds A as a consistent whole, each revision being only a partial
903 replacement. The successors set contains non-obsolete changesets only.
904 replacement. The successors set contains non-obsolete changesets only.
904
905
905 This function returns the full list of successor sets which is why it
906 This function returns the full list of successor sets which is why it
906 returns a list of tuples and not just a single tuple. Each tuple is a valid
907 returns a list of tuples and not just a single tuple. Each tuple is a valid
907 successors set. Note that (A,) may be a valid successors set for changeset A
908 successors set. Note that (A,) may be a valid successors set for changeset A
908 (see below).
909 (see below).
909
910
910 In most cases, a changeset A will have a single element (e.g. the changeset
911 In most cases, a changeset A will have a single element (e.g. the changeset
911 A is replaced by A') in its successors set. Though, it is also common for a
912 A is replaced by A') in its successors set. Though, it is also common for a
912 changeset A to have no elements in its successor set (e.g. the changeset
913 changeset A to have no elements in its successor set (e.g. the changeset
913 has been pruned). Therefore, the returned list of successors sets will be
914 has been pruned). Therefore, the returned list of successors sets will be
914 [(A',)] or [], respectively.
915 [(A',)] or [], respectively.
915
916
916 When a changeset A is split into A' and B', however, it will result in a
917 When a changeset A is split into A' and B', however, it will result in a
917 successors set containing more than a single element, i.e. [(A',B')].
918 successors set containing more than a single element, i.e. [(A',B')].
918 Divergent changesets will result in multiple successors sets, i.e. [(A',),
919 Divergent changesets will result in multiple successors sets, i.e. [(A',),
919 (A'')].
920 (A'')].
920
921
921 If a changeset A is not obsolete, then it will conceptually have no
922 If a changeset A is not obsolete, then it will conceptually have no
922 successors set. To distinguish this from a pruned changeset, the successor
923 successors set. To distinguish this from a pruned changeset, the successor
923 set will contain itself only, i.e. [(A,)].
924 set will contain itself only, i.e. [(A,)].
924
925
925 Finally, successors unknown locally are considered to be pruned (obsoleted
926 Finally, successors unknown locally are considered to be pruned (obsoleted
926 without any successors).
927 without any successors).
927
928
928 The optional `cache` parameter is a dictionary that may contain precomputed
929 The optional `cache` parameter is a dictionary that may contain precomputed
929 successors sets. It is meant to reuse the computation of a previous call to
930 successors sets. It is meant to reuse the computation of a previous call to
930 `successorssets` when multiple calls are made at the same time. The cache
931 `successorssets` when multiple calls are made at the same time. The cache
931 dictionary is updated in place. The caller is responsible for its life
932 dictionary is updated in place. The caller is responsible for its life
932 span. Code that makes multiple calls to `successorssets` *must* use this
933 span. Code that makes multiple calls to `successorssets` *must* use this
933 cache mechanism or suffer terrible performance.
934 cache mechanism or suffer terrible performance.
934 """
935 """
935
936
936 succmarkers = repo.obsstore.successors
937 succmarkers = repo.obsstore.successors
937
938
938 # Stack of nodes we search successors sets for
939 # Stack of nodes we search successors sets for
939 toproceed = [initialnode]
940 toproceed = [initialnode]
940 # set version of above list for fast loop detection
941 # set version of above list for fast loop detection
941 # element added to "toproceed" must be added here
942 # element added to "toproceed" must be added here
942 stackedset = set(toproceed)
943 stackedset = set(toproceed)
943 if cache is None:
944 if cache is None:
944 cache = {}
945 cache = {}
945
946
946 # This while loop is the flattened version of a recursive search for
947 # This while loop is the flattened version of a recursive search for
947 # successors sets
948 # successors sets
948 #
949 #
949 # def successorssets(x):
950 # def successorssets(x):
950 # successors = directsuccessors(x)
951 # successors = directsuccessors(x)
951 # ss = [[]]
952 # ss = [[]]
952 # for succ in directsuccessors(x):
953 # for succ in directsuccessors(x):
953 # # product as in itertools cartesian product
954 # # product as in itertools cartesian product
954 # ss = product(ss, successorssets(succ))
955 # ss = product(ss, successorssets(succ))
955 # return ss
956 # return ss
956 #
957 #
957 # But we can not use plain recursive calls here:
958 # But we can not use plain recursive calls here:
958 # - that would blow the python call stack
959 # - that would blow the python call stack
959 # - obsolescence markers may have cycles, we need to handle them.
960 # - obsolescence markers may have cycles, we need to handle them.
960 #
961 #
961 # The `toproceed` list act as our call stack. Every node we search
962 # The `toproceed` list act as our call stack. Every node we search
962 # successors set for are stacked there.
963 # successors set for are stacked there.
963 #
964 #
964 # The `stackedset` is set version of this stack used to check if a node is
965 # The `stackedset` is set version of this stack used to check if a node is
965 # already stacked. This check is used to detect cycles and prevent infinite
966 # already stacked. This check is used to detect cycles and prevent infinite
966 # loop.
967 # loop.
967 #
968 #
968 # successors set of all nodes are stored in the `cache` dictionary.
969 # successors set of all nodes are stored in the `cache` dictionary.
969 #
970 #
970 # After this while loop ends we use the cache to return the successors sets
971 # After this while loop ends we use the cache to return the successors sets
971 # for the node requested by the caller.
972 # for the node requested by the caller.
972 while toproceed:
973 while toproceed:
973 # Every iteration tries to compute the successors sets of the topmost
974 # Every iteration tries to compute the successors sets of the topmost
974 # node of the stack: CURRENT.
975 # node of the stack: CURRENT.
975 #
976 #
976 # There are four possible outcomes:
977 # There are four possible outcomes:
977 #
978 #
978 # 1) We already know the successors sets of CURRENT:
979 # 1) We already know the successors sets of CURRENT:
979 # -> mission accomplished, pop it from the stack.
980 # -> mission accomplished, pop it from the stack.
980 # 2) Node is not obsolete:
981 # 2) Node is not obsolete:
981 # -> the node is its own successors sets. Add it to the cache.
982 # -> the node is its own successors sets. Add it to the cache.
982 # 3) We do not know successors set of direct successors of CURRENT:
983 # 3) We do not know successors set of direct successors of CURRENT:
983 # -> We add those successors to the stack.
984 # -> We add those successors to the stack.
984 # 4) We know successors sets of all direct successors of CURRENT:
985 # 4) We know successors sets of all direct successors of CURRENT:
985 # -> We can compute CURRENT successors set and add it to the
986 # -> We can compute CURRENT successors set and add it to the
986 # cache.
987 # cache.
987 #
988 #
988 current = toproceed[-1]
989 current = toproceed[-1]
989 if current in cache:
990 if current in cache:
990 # case (1): We already know the successors sets
991 # case (1): We already know the successors sets
991 stackedset.remove(toproceed.pop())
992 stackedset.remove(toproceed.pop())
992 elif current not in succmarkers:
993 elif current not in succmarkers:
993 # case (2): The node is not obsolete.
994 # case (2): The node is not obsolete.
994 if current in repo:
995 if current in repo:
995 # We have a valid last successors.
996 # We have a valid last successors.
996 cache[current] = [(current,)]
997 cache[current] = [(current,)]
997 else:
998 else:
998 # Final obsolete version is unknown locally.
999 # Final obsolete version is unknown locally.
999 # Do not count that as a valid successors
1000 # Do not count that as a valid successors
1000 cache[current] = []
1001 cache[current] = []
1001 else:
1002 else:
1002 # cases (3) and (4)
1003 # cases (3) and (4)
1003 #
1004 #
1004 # We proceed in two phases. Phase 1 aims to distinguish case (3)
1005 # We proceed in two phases. Phase 1 aims to distinguish case (3)
1005 # from case (4):
1006 # from case (4):
1006 #
1007 #
1007 # For each direct successors of CURRENT, we check whether its
1008 # For each direct successors of CURRENT, we check whether its
1008 # successors sets are known. If they are not, we stack the
1009 # successors sets are known. If they are not, we stack the
1009 # unknown node and proceed to the next iteration of the while
1010 # unknown node and proceed to the next iteration of the while
1010 # loop. (case 3)
1011 # loop. (case 3)
1011 #
1012 #
1012 # During this step, we may detect obsolescence cycles: a node
1013 # During this step, we may detect obsolescence cycles: a node
1013 # with unknown successors sets but already in the call stack.
1014 # with unknown successors sets but already in the call stack.
1014 # In such a situation, we arbitrary set the successors sets of
1015 # In such a situation, we arbitrary set the successors sets of
1015 # the node to nothing (node pruned) to break the cycle.
1016 # the node to nothing (node pruned) to break the cycle.
1016 #
1017 #
1017 # If no break was encountered we proceed to phase 2.
1018 # If no break was encountered we proceed to phase 2.
1018 #
1019 #
1019 # Phase 2 computes successors sets of CURRENT (case 4); see details
1020 # Phase 2 computes successors sets of CURRENT (case 4); see details
1020 # in phase 2 itself.
1021 # in phase 2 itself.
1021 #
1022 #
1022 # Note the two levels of iteration in each phase.
1023 # Note the two levels of iteration in each phase.
1023 # - The first one handles obsolescence markers using CURRENT as
1024 # - The first one handles obsolescence markers using CURRENT as
1024 # precursor (successors markers of CURRENT).
1025 # precursor (successors markers of CURRENT).
1025 #
1026 #
1026 # Having multiple entry here means divergence.
1027 # Having multiple entry here means divergence.
1027 #
1028 #
1028 # - The second one handles successors defined in each marker.
1029 # - The second one handles successors defined in each marker.
1029 #
1030 #
1030 # Having none means pruned node, multiple successors means split,
1031 # Having none means pruned node, multiple successors means split,
1031 # single successors are standard replacement.
1032 # single successors are standard replacement.
1032 #
1033 #
1033 for mark in sorted(succmarkers[current]):
1034 for mark in sorted(succmarkers[current]):
1034 for suc in mark[1]:
1035 for suc in mark[1]:
1035 if suc not in cache:
1036 if suc not in cache:
1036 if suc in stackedset:
1037 if suc in stackedset:
1037 # cycle breaking
1038 # cycle breaking
1038 cache[suc] = []
1039 cache[suc] = []
1039 else:
1040 else:
1040 # case (3) If we have not computed successors sets
1041 # case (3) If we have not computed successors sets
1041 # of one of those successors we add it to the
1042 # of one of those successors we add it to the
1042 # `toproceed` stack and stop all work for this
1043 # `toproceed` stack and stop all work for this
1043 # iteration.
1044 # iteration.
1044 toproceed.append(suc)
1045 toproceed.append(suc)
1045 stackedset.add(suc)
1046 stackedset.add(suc)
1046 break
1047 break
1047 else:
1048 else:
1048 continue
1049 continue
1049 break
1050 break
1050 else:
1051 else:
1051 # case (4): we know all successors sets of all direct
1052 # case (4): we know all successors sets of all direct
1052 # successors
1053 # successors
1053 #
1054 #
1054 # Successors set contributed by each marker depends on the
1055 # Successors set contributed by each marker depends on the
1055 # successors sets of all its "successors" node.
1056 # successors sets of all its "successors" node.
1056 #
1057 #
1057 # Each different marker is a divergence in the obsolescence
1058 # Each different marker is a divergence in the obsolescence
1058 # history. It contributes successors sets distinct from other
1059 # history. It contributes successors sets distinct from other
1059 # markers.
1060 # markers.
1060 #
1061 #
1061 # Within a marker, a successor may have divergent successors
1062 # Within a marker, a successor may have divergent successors
1062 # sets. In such a case, the marker will contribute multiple
1063 # sets. In such a case, the marker will contribute multiple
1063 # divergent successors sets. If multiple successors have
1064 # divergent successors sets. If multiple successors have
1064 # divergent successors sets, a Cartesian product is used.
1065 # divergent successors sets, a Cartesian product is used.
1065 #
1066 #
1066 # At the end we post-process successors sets to remove
1067 # At the end we post-process successors sets to remove
1067 # duplicated entry and successors set that are strict subset of
1068 # duplicated entry and successors set that are strict subset of
1068 # another one.
1069 # another one.
1069 succssets = []
1070 succssets = []
1070 for mark in sorted(succmarkers[current]):
1071 for mark in sorted(succmarkers[current]):
1071 # successors sets contributed by this marker
1072 # successors sets contributed by this marker
1072 markss = [[]]
1073 markss = [[]]
1073 for suc in mark[1]:
1074 for suc in mark[1]:
1074 # cardinal product with previous successors
1075 # cardinal product with previous successors
1075 productresult = []
1076 productresult = []
1076 for prefix in markss:
1077 for prefix in markss:
1077 for suffix in cache[suc]:
1078 for suffix in cache[suc]:
1078 newss = list(prefix)
1079 newss = list(prefix)
1079 for part in suffix:
1080 for part in suffix:
1080 # do not duplicated entry in successors set
1081 # do not duplicated entry in successors set
1081 # first entry wins.
1082 # first entry wins.
1082 if part not in newss:
1083 if part not in newss:
1083 newss.append(part)
1084 newss.append(part)
1084 productresult.append(newss)
1085 productresult.append(newss)
1085 markss = productresult
1086 markss = productresult
1086 succssets.extend(markss)
1087 succssets.extend(markss)
1087 # remove duplicated and subset
1088 # remove duplicated and subset
1088 seen = []
1089 seen = []
1089 final = []
1090 final = []
1090 candidate = sorted(((set(s), s) for s in succssets if s),
1091 candidate = sorted(((set(s), s) for s in succssets if s),
1091 key=lambda x: len(x[1]), reverse=True)
1092 key=lambda x: len(x[1]), reverse=True)
1092 for setversion, listversion in candidate:
1093 for setversion, listversion in candidate:
1093 for seenset in seen:
1094 for seenset in seen:
1094 if setversion.issubset(seenset):
1095 if setversion.issubset(seenset):
1095 break
1096 break
1096 else:
1097 else:
1097 final.append(listversion)
1098 final.append(listversion)
1098 seen.append(setversion)
1099 seen.append(setversion)
1099 final.reverse() # put small successors set first
1100 final.reverse() # put small successors set first
1100 cache[current] = final
1101 cache[current] = final
1101 return cache[initialnode]
1102 return cache[initialnode]
1102
1103
1103 # mapping of 'set-name' -> <function to compute this set>
1104 # mapping of 'set-name' -> <function to compute this set>
1104 cachefuncs = {}
1105 cachefuncs = {}
1105 def cachefor(name):
1106 def cachefor(name):
1106 """Decorator to register a function as computing the cache for a set"""
1107 """Decorator to register a function as computing the cache for a set"""
1107 def decorator(func):
1108 def decorator(func):
1108 assert name not in cachefuncs
1109 assert name not in cachefuncs
1109 cachefuncs[name] = func
1110 cachefuncs[name] = func
1110 return func
1111 return func
1111 return decorator
1112 return decorator
1112
1113
1113 def getrevs(repo, name):
1114 def getrevs(repo, name):
1114 """Return the set of revision that belong to the <name> set
1115 """Return the set of revision that belong to the <name> set
1115
1116
1116 Such access may compute the set and cache it for future use"""
1117 Such access may compute the set and cache it for future use"""
1117 repo = repo.unfiltered()
1118 repo = repo.unfiltered()
1118 if not repo.obsstore:
1119 if not repo.obsstore:
1119 return frozenset()
1120 return frozenset()
1120 if name not in repo.obsstore.caches:
1121 if name not in repo.obsstore.caches:
1121 repo.obsstore.caches[name] = cachefuncs[name](repo)
1122 repo.obsstore.caches[name] = cachefuncs[name](repo)
1122 return repo.obsstore.caches[name]
1123 return repo.obsstore.caches[name]
1123
1124
1124 # To be simple we need to invalidate obsolescence cache when:
1125 # To be simple we need to invalidate obsolescence cache when:
1125 #
1126 #
1126 # - new changeset is added:
1127 # - new changeset is added:
1127 # - public phase is changed
1128 # - public phase is changed
1128 # - obsolescence marker are added
1129 # - obsolescence marker are added
1129 # - strip is used a repo
1130 # - strip is used a repo
1130 def clearobscaches(repo):
1131 def clearobscaches(repo):
1131 """Remove all obsolescence related cache from a repo
1132 """Remove all obsolescence related cache from a repo
1132
1133
1133 This remove all cache in obsstore is the obsstore already exist on the
1134 This remove all cache in obsstore is the obsstore already exist on the
1134 repo.
1135 repo.
1135
1136
1136 (We could be smarter here given the exact event that trigger the cache
1137 (We could be smarter here given the exact event that trigger the cache
1137 clearing)"""
1138 clearing)"""
1138 # only clear cache is there is obsstore data in this repo
1139 # only clear cache is there is obsstore data in this repo
1139 if 'obsstore' in repo._filecache:
1140 if 'obsstore' in repo._filecache:
1140 repo.obsstore.caches.clear()
1141 repo.obsstore.caches.clear()
1141
1142
1142 @cachefor('obsolete')
1143 @cachefor('obsolete')
1143 def _computeobsoleteset(repo):
1144 def _computeobsoleteset(repo):
1144 """the set of obsolete revisions"""
1145 """the set of obsolete revisions"""
1145 obs = set()
1146 obs = set()
1146 getnode = repo.changelog.node
1147 getnode = repo.changelog.node
1147 notpublic = repo._phasecache.getrevset(repo, (phases.draft, phases.secret))
1148 notpublic = repo._phasecache.getrevset(repo, (phases.draft, phases.secret))
1148 for r in notpublic:
1149 for r in notpublic:
1149 if getnode(r) in repo.obsstore.successors:
1150 if getnode(r) in repo.obsstore.successors:
1150 obs.add(r)
1151 obs.add(r)
1151 return obs
1152 return obs
1152
1153
1153 @cachefor('unstable')
1154 @cachefor('unstable')
1154 def _computeunstableset(repo):
1155 def _computeunstableset(repo):
1155 """the set of non obsolete revisions with obsolete parents"""
1156 """the set of non obsolete revisions with obsolete parents"""
1156 revs = [(ctx.rev(), ctx) for ctx in
1157 revs = [(ctx.rev(), ctx) for ctx in
1157 repo.set('(not public()) and (not obsolete())')]
1158 repo.set('(not public()) and (not obsolete())')]
1158 revs.sort(key=lambda x:x[0])
1159 revs.sort(key=lambda x:x[0])
1159 unstable = set()
1160 unstable = set()
1160 for rev, ctx in revs:
1161 for rev, ctx in revs:
1161 # A rev is unstable if one of its parent is obsolete or unstable
1162 # A rev is unstable if one of its parent is obsolete or unstable
1162 # this works since we traverse following growing rev order
1163 # this works since we traverse following growing rev order
1163 if any((x.obsolete() or (x.rev() in unstable))
1164 if any((x.obsolete() or (x.rev() in unstable))
1164 for x in ctx.parents()):
1165 for x in ctx.parents()):
1165 unstable.add(rev)
1166 unstable.add(rev)
1166 return unstable
1167 return unstable
1167
1168
1168 @cachefor('suspended')
1169 @cachefor('suspended')
1169 def _computesuspendedset(repo):
1170 def _computesuspendedset(repo):
1170 """the set of obsolete parents with non obsolete descendants"""
1171 """the set of obsolete parents with non obsolete descendants"""
1171 suspended = repo.changelog.ancestors(getrevs(repo, 'unstable'))
1172 suspended = repo.changelog.ancestors(getrevs(repo, 'unstable'))
1172 return set(r for r in getrevs(repo, 'obsolete') if r in suspended)
1173 return set(r for r in getrevs(repo, 'obsolete') if r in suspended)
1173
1174
1174 @cachefor('extinct')
1175 @cachefor('extinct')
1175 def _computeextinctset(repo):
1176 def _computeextinctset(repo):
1176 """the set of obsolete parents without non obsolete descendants"""
1177 """the set of obsolete parents without non obsolete descendants"""
1177 return getrevs(repo, 'obsolete') - getrevs(repo, 'suspended')
1178 return getrevs(repo, 'obsolete') - getrevs(repo, 'suspended')
1178
1179
1179
1180
1180 @cachefor('bumped')
1181 @cachefor('bumped')
1181 def _computebumpedset(repo):
1182 def _computebumpedset(repo):
1182 """the set of revs trying to obsolete public revisions"""
1183 """the set of revs trying to obsolete public revisions"""
1183 bumped = set()
1184 bumped = set()
1184 # util function (avoid attribute lookup in the loop)
1185 # util function (avoid attribute lookup in the loop)
1185 phase = repo._phasecache.phase # would be faster to grab the full list
1186 phase = repo._phasecache.phase # would be faster to grab the full list
1186 public = phases.public
1187 public = phases.public
1187 cl = repo.changelog
1188 cl = repo.changelog
1188 torev = cl.nodemap.get
1189 torev = cl.nodemap.get
1189 for ctx in repo.set('(not public()) and (not obsolete())'):
1190 for ctx in repo.set('(not public()) and (not obsolete())'):
1190 rev = ctx.rev()
1191 rev = ctx.rev()
1191 # We only evaluate mutable, non-obsolete revision
1192 # We only evaluate mutable, non-obsolete revision
1192 node = ctx.node()
1193 node = ctx.node()
1193 # (future) A cache of precursors may worth if split is very common
1194 # (future) A cache of precursors may worth if split is very common
1194 for pnode in allprecursors(repo.obsstore, [node],
1195 for pnode in allprecursors(repo.obsstore, [node],
1195 ignoreflags=bumpedfix):
1196 ignoreflags=bumpedfix):
1196 prev = torev(pnode) # unfiltered! but so is phasecache
1197 prev = torev(pnode) # unfiltered! but so is phasecache
1197 if (prev is not None) and (phase(repo, prev) <= public):
1198 if (prev is not None) and (phase(repo, prev) <= public):
1198 # we have a public precursor
1199 # we have a public precursor
1199 bumped.add(rev)
1200 bumped.add(rev)
1200 break # Next draft!
1201 break # Next draft!
1201 return bumped
1202 return bumped
1202
1203
1203 @cachefor('divergent')
1204 @cachefor('divergent')
1204 def _computedivergentset(repo):
1205 def _computedivergentset(repo):
1205 """the set of rev that compete to be the final successors of some revision.
1206 """the set of rev that compete to be the final successors of some revision.
1206 """
1207 """
1207 divergent = set()
1208 divergent = set()
1208 obsstore = repo.obsstore
1209 obsstore = repo.obsstore
1209 newermap = {}
1210 newermap = {}
1210 for ctx in repo.set('(not public()) - obsolete()'):
1211 for ctx in repo.set('(not public()) - obsolete()'):
1211 mark = obsstore.precursors.get(ctx.node(), ())
1212 mark = obsstore.precursors.get(ctx.node(), ())
1212 toprocess = set(mark)
1213 toprocess = set(mark)
1213 seen = set()
1214 seen = set()
1214 while toprocess:
1215 while toprocess:
1215 prec = toprocess.pop()[0]
1216 prec = toprocess.pop()[0]
1216 if prec in seen:
1217 if prec in seen:
1217 continue # emergency cycle hanging prevention
1218 continue # emergency cycle hanging prevention
1218 seen.add(prec)
1219 seen.add(prec)
1219 if prec not in newermap:
1220 if prec not in newermap:
1220 successorssets(repo, prec, newermap)
1221 successorssets(repo, prec, newermap)
1221 newer = [n for n in newermap[prec] if n]
1222 newer = [n for n in newermap[prec] if n]
1222 if len(newer) > 1:
1223 if len(newer) > 1:
1223 divergent.add(ctx.rev())
1224 divergent.add(ctx.rev())
1224 break
1225 break
1225 toprocess.update(obsstore.precursors.get(prec, ()))
1226 toprocess.update(obsstore.precursors.get(prec, ()))
1226 return divergent
1227 return divergent
1227
1228
1228
1229
1229 def createmarkers(repo, relations, flag=0, date=None, metadata=None,
1230 def createmarkers(repo, relations, flag=0, date=None, metadata=None,
1230 operation=None):
1231 operation=None):
1231 """Add obsolete markers between changesets in a repo
1232 """Add obsolete markers between changesets in a repo
1232
1233
1233 <relations> must be an iterable of (<old>, (<new>, ...)[,{metadata}])
1234 <relations> must be an iterable of (<old>, (<new>, ...)[,{metadata}])
1234 tuple. `old` and `news` are changectx. metadata is an optional dictionary
1235 tuple. `old` and `news` are changectx. metadata is an optional dictionary
1235 containing metadata for this marker only. It is merged with the global
1236 containing metadata for this marker only. It is merged with the global
1236 metadata specified through the `metadata` argument of this function,
1237 metadata specified through the `metadata` argument of this function,
1237
1238
1238 Trying to obsolete a public changeset will raise an exception.
1239 Trying to obsolete a public changeset will raise an exception.
1239
1240
1240 Current user and date are used except if specified otherwise in the
1241 Current user and date are used except if specified otherwise in the
1241 metadata attribute.
1242 metadata attribute.
1242
1243
1243 This function operates within a transaction of its own, but does
1244 This function operates within a transaction of its own, but does
1244 not take any lock on the repo.
1245 not take any lock on the repo.
1245 """
1246 """
1246 # prepare metadata
1247 # prepare metadata
1247 if metadata is None:
1248 if metadata is None:
1248 metadata = {}
1249 metadata = {}
1249 if 'user' not in metadata:
1250 if 'user' not in metadata:
1250 metadata['user'] = repo.ui.username()
1251 metadata['user'] = repo.ui.username()
1251 useoperation = repo.ui.configbool('experimental',
1252 useoperation = repo.ui.configbool('experimental',
1252 'evolution.track-operation',
1253 'evolution.track-operation',
1253 False)
1254 False)
1254 if useoperation and operation:
1255 if useoperation and operation:
1255 metadata['operation'] = operation
1256 metadata['operation'] = operation
1256 tr = repo.transaction('add-obsolescence-marker')
1257 tr = repo.transaction('add-obsolescence-marker')
1257 try:
1258 try:
1258 markerargs = []
1259 markerargs = []
1259 for rel in relations:
1260 for rel in relations:
1260 prec = rel[0]
1261 prec = rel[0]
1261 sucs = rel[1]
1262 sucs = rel[1]
1262 localmetadata = metadata.copy()
1263 localmetadata = metadata.copy()
1263 if 2 < len(rel):
1264 if 2 < len(rel):
1264 localmetadata.update(rel[2])
1265 localmetadata.update(rel[2])
1265
1266
1266 if not prec.mutable():
1267 if not prec.mutable():
1267 raise error.Abort(_("cannot obsolete public changeset: %s")
1268 raise error.Abort(_("cannot obsolete public changeset: %s")
1268 % prec,
1269 % prec,
1269 hint="see 'hg help phases' for details")
1270 hint="see 'hg help phases' for details")
1270 nprec = prec.node()
1271 nprec = prec.node()
1271 nsucs = tuple(s.node() for s in sucs)
1272 nsucs = tuple(s.node() for s in sucs)
1272 npare = None
1273 npare = None
1273 if not nsucs:
1274 if not nsucs:
1274 npare = tuple(p.node() for p in prec.parents())
1275 npare = tuple(p.node() for p in prec.parents())
1275 if nprec in nsucs:
1276 if nprec in nsucs:
1276 raise error.Abort(_("changeset %s cannot obsolete itself")
1277 raise error.Abort(_("changeset %s cannot obsolete itself")
1277 % prec)
1278 % prec)
1278
1279
1279 # Creating the marker causes the hidden cache to become invalid,
1280 # Creating the marker causes the hidden cache to become invalid,
1280 # which causes recomputation when we ask for prec.parents() above.
1281 # which causes recomputation when we ask for prec.parents() above.
1281 # Resulting in n^2 behavior. So let's prepare all of the args
1282 # Resulting in n^2 behavior. So let's prepare all of the args
1282 # first, then create the markers.
1283 # first, then create the markers.
1283 markerargs.append((nprec, nsucs, npare, localmetadata))
1284 markerargs.append((nprec, nsucs, npare, localmetadata))
1284
1285
1285 for args in markerargs:
1286 for args in markerargs:
1286 nprec, nsucs, npare, localmetadata = args
1287 nprec, nsucs, npare, localmetadata = args
1287 repo.obsstore.create(tr, nprec, nsucs, flag, parents=npare,
1288 repo.obsstore.create(tr, nprec, nsucs, flag, parents=npare,
1288 date=date, metadata=localmetadata)
1289 date=date, metadata=localmetadata)
1289 repo.filteredrevcache.clear()
1290 repo.filteredrevcache.clear()
1290 tr.close()
1291 tr.close()
1291 finally:
1292 finally:
1292 tr.release()
1293 tr.release()
General Comments 0
You need to be logged in to leave comments. Login now