##// END OF EJS Templates
bundle2: only grab a transaction when 'phase-heads' affect the repository...
Boris Feld -
r34322:4ef472b9 default
parent child Browse files
Show More
@@ -1,1921 +1,1921 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 phases,
161 phases,
162 pushkey,
162 pushkey,
163 pycompat,
163 pycompat,
164 tags,
164 tags,
165 url,
165 url,
166 util,
166 util,
167 )
167 )
168
168
169 urlerr = util.urlerr
169 urlerr = util.urlerr
170 urlreq = util.urlreq
170 urlreq = util.urlreq
171
171
172 _pack = struct.pack
172 _pack = struct.pack
173 _unpack = struct.unpack
173 _unpack = struct.unpack
174
174
175 _fstreamparamsize = '>i'
175 _fstreamparamsize = '>i'
176 _fpartheadersize = '>i'
176 _fpartheadersize = '>i'
177 _fparttypesize = '>B'
177 _fparttypesize = '>B'
178 _fpartid = '>I'
178 _fpartid = '>I'
179 _fpayloadsize = '>i'
179 _fpayloadsize = '>i'
180 _fpartparamcount = '>BB'
180 _fpartparamcount = '>BB'
181
181
182 preferedchunksize = 4096
182 preferedchunksize = 4096
183
183
184 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
184 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
185
185
186 def outdebug(ui, message):
186 def outdebug(ui, message):
187 """debug regarding output stream (bundling)"""
187 """debug regarding output stream (bundling)"""
188 if ui.configbool('devel', 'bundle2.debug'):
188 if ui.configbool('devel', 'bundle2.debug'):
189 ui.debug('bundle2-output: %s\n' % message)
189 ui.debug('bundle2-output: %s\n' % message)
190
190
191 def indebug(ui, message):
191 def indebug(ui, message):
192 """debug on input stream (unbundling)"""
192 """debug on input stream (unbundling)"""
193 if ui.configbool('devel', 'bundle2.debug'):
193 if ui.configbool('devel', 'bundle2.debug'):
194 ui.debug('bundle2-input: %s\n' % message)
194 ui.debug('bundle2-input: %s\n' % message)
195
195
196 def validateparttype(parttype):
196 def validateparttype(parttype):
197 """raise ValueError if a parttype contains invalid character"""
197 """raise ValueError if a parttype contains invalid character"""
198 if _parttypeforbidden.search(parttype):
198 if _parttypeforbidden.search(parttype):
199 raise ValueError(parttype)
199 raise ValueError(parttype)
200
200
201 def _makefpartparamsizes(nbparams):
201 def _makefpartparamsizes(nbparams):
202 """return a struct format to read part parameter sizes
202 """return a struct format to read part parameter sizes
203
203
204 The number parameters is variable so we need to build that format
204 The number parameters is variable so we need to build that format
205 dynamically.
205 dynamically.
206 """
206 """
207 return '>'+('BB'*nbparams)
207 return '>'+('BB'*nbparams)
208
208
209 parthandlermapping = {}
209 parthandlermapping = {}
210
210
211 def parthandler(parttype, params=()):
211 def parthandler(parttype, params=()):
212 """decorator that register a function as a bundle2 part handler
212 """decorator that register a function as a bundle2 part handler
213
213
214 eg::
214 eg::
215
215
216 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
216 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
217 def myparttypehandler(...):
217 def myparttypehandler(...):
218 '''process a part of type "my part".'''
218 '''process a part of type "my part".'''
219 ...
219 ...
220 """
220 """
221 validateparttype(parttype)
221 validateparttype(parttype)
222 def _decorator(func):
222 def _decorator(func):
223 lparttype = parttype.lower() # enforce lower case matching.
223 lparttype = parttype.lower() # enforce lower case matching.
224 assert lparttype not in parthandlermapping
224 assert lparttype not in parthandlermapping
225 parthandlermapping[lparttype] = func
225 parthandlermapping[lparttype] = func
226 func.params = frozenset(params)
226 func.params = frozenset(params)
227 return func
227 return func
228 return _decorator
228 return _decorator
229
229
230 class unbundlerecords(object):
230 class unbundlerecords(object):
231 """keep record of what happens during and unbundle
231 """keep record of what happens during and unbundle
232
232
233 New records are added using `records.add('cat', obj)`. Where 'cat' is a
233 New records are added using `records.add('cat', obj)`. Where 'cat' is a
234 category of record and obj is an arbitrary object.
234 category of record and obj is an arbitrary object.
235
235
236 `records['cat']` will return all entries of this category 'cat'.
236 `records['cat']` will return all entries of this category 'cat'.
237
237
238 Iterating on the object itself will yield `('category', obj)` tuples
238 Iterating on the object itself will yield `('category', obj)` tuples
239 for all entries.
239 for all entries.
240
240
241 All iterations happens in chronological order.
241 All iterations happens in chronological order.
242 """
242 """
243
243
244 def __init__(self):
244 def __init__(self):
245 self._categories = {}
245 self._categories = {}
246 self._sequences = []
246 self._sequences = []
247 self._replies = {}
247 self._replies = {}
248
248
249 def add(self, category, entry, inreplyto=None):
249 def add(self, category, entry, inreplyto=None):
250 """add a new record of a given category.
250 """add a new record of a given category.
251
251
252 The entry can then be retrieved in the list returned by
252 The entry can then be retrieved in the list returned by
253 self['category']."""
253 self['category']."""
254 self._categories.setdefault(category, []).append(entry)
254 self._categories.setdefault(category, []).append(entry)
255 self._sequences.append((category, entry))
255 self._sequences.append((category, entry))
256 if inreplyto is not None:
256 if inreplyto is not None:
257 self.getreplies(inreplyto).add(category, entry)
257 self.getreplies(inreplyto).add(category, entry)
258
258
259 def getreplies(self, partid):
259 def getreplies(self, partid):
260 """get the records that are replies to a specific part"""
260 """get the records that are replies to a specific part"""
261 return self._replies.setdefault(partid, unbundlerecords())
261 return self._replies.setdefault(partid, unbundlerecords())
262
262
263 def __getitem__(self, cat):
263 def __getitem__(self, cat):
264 return tuple(self._categories.get(cat, ()))
264 return tuple(self._categories.get(cat, ()))
265
265
266 def __iter__(self):
266 def __iter__(self):
267 return iter(self._sequences)
267 return iter(self._sequences)
268
268
269 def __len__(self):
269 def __len__(self):
270 return len(self._sequences)
270 return len(self._sequences)
271
271
272 def __nonzero__(self):
272 def __nonzero__(self):
273 return bool(self._sequences)
273 return bool(self._sequences)
274
274
275 __bool__ = __nonzero__
275 __bool__ = __nonzero__
276
276
277 class bundleoperation(object):
277 class bundleoperation(object):
278 """an object that represents a single bundling process
278 """an object that represents a single bundling process
279
279
280 Its purpose is to carry unbundle-related objects and states.
280 Its purpose is to carry unbundle-related objects and states.
281
281
282 A new object should be created at the beginning of each bundle processing.
282 A new object should be created at the beginning of each bundle processing.
283 The object is to be returned by the processing function.
283 The object is to be returned by the processing function.
284
284
285 The object has very little content now it will ultimately contain:
285 The object has very little content now it will ultimately contain:
286 * an access to the repo the bundle is applied to,
286 * an access to the repo the bundle is applied to,
287 * a ui object,
287 * a ui object,
288 * a way to retrieve a transaction to add changes to the repo,
288 * a way to retrieve a transaction to add changes to the repo,
289 * a way to record the result of processing each part,
289 * a way to record the result of processing each part,
290 * a way to construct a bundle response when applicable.
290 * a way to construct a bundle response when applicable.
291 """
291 """
292
292
293 def __init__(self, repo, transactiongetter, captureoutput=True):
293 def __init__(self, repo, transactiongetter, captureoutput=True):
294 self.repo = repo
294 self.repo = repo
295 self.ui = repo.ui
295 self.ui = repo.ui
296 self.records = unbundlerecords()
296 self.records = unbundlerecords()
297 self.reply = None
297 self.reply = None
298 self.captureoutput = captureoutput
298 self.captureoutput = captureoutput
299 self.hookargs = {}
299 self.hookargs = {}
300 self._gettransaction = transactiongetter
300 self._gettransaction = transactiongetter
301
301
302 def gettransaction(self):
302 def gettransaction(self):
303 transaction = self._gettransaction()
303 transaction = self._gettransaction()
304
304
305 if self.hookargs:
305 if self.hookargs:
306 # the ones added to the transaction supercede those added
306 # the ones added to the transaction supercede those added
307 # to the operation.
307 # to the operation.
308 self.hookargs.update(transaction.hookargs)
308 self.hookargs.update(transaction.hookargs)
309 transaction.hookargs = self.hookargs
309 transaction.hookargs = self.hookargs
310
310
311 # mark the hookargs as flushed. further attempts to add to
311 # mark the hookargs as flushed. further attempts to add to
312 # hookargs will result in an abort.
312 # hookargs will result in an abort.
313 self.hookargs = None
313 self.hookargs = None
314
314
315 return transaction
315 return transaction
316
316
317 def addhookargs(self, hookargs):
317 def addhookargs(self, hookargs):
318 if self.hookargs is None:
318 if self.hookargs is None:
319 raise error.ProgrammingError('attempted to add hookargs to '
319 raise error.ProgrammingError('attempted to add hookargs to '
320 'operation after transaction started')
320 'operation after transaction started')
321 self.hookargs.update(hookargs)
321 self.hookargs.update(hookargs)
322
322
323 class TransactionUnavailable(RuntimeError):
323 class TransactionUnavailable(RuntimeError):
324 pass
324 pass
325
325
326 def _notransaction():
326 def _notransaction():
327 """default method to get a transaction while processing a bundle
327 """default method to get a transaction while processing a bundle
328
328
329 Raise an exception to highlight the fact that no transaction was expected
329 Raise an exception to highlight the fact that no transaction was expected
330 to be created"""
330 to be created"""
331 raise TransactionUnavailable()
331 raise TransactionUnavailable()
332
332
333 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
333 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
334 # transform me into unbundler.apply() as soon as the freeze is lifted
334 # transform me into unbundler.apply() as soon as the freeze is lifted
335 if isinstance(unbundler, unbundle20):
335 if isinstance(unbundler, unbundle20):
336 tr.hookargs['bundle2'] = '1'
336 tr.hookargs['bundle2'] = '1'
337 if source is not None and 'source' not in tr.hookargs:
337 if source is not None and 'source' not in tr.hookargs:
338 tr.hookargs['source'] = source
338 tr.hookargs['source'] = source
339 if url is not None and 'url' not in tr.hookargs:
339 if url is not None and 'url' not in tr.hookargs:
340 tr.hookargs['url'] = url
340 tr.hookargs['url'] = url
341 return processbundle(repo, unbundler, lambda: tr)
341 return processbundle(repo, unbundler, lambda: tr)
342 else:
342 else:
343 # the transactiongetter won't be used, but we might as well set it
343 # the transactiongetter won't be used, but we might as well set it
344 op = bundleoperation(repo, lambda: tr)
344 op = bundleoperation(repo, lambda: tr)
345 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
345 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
346 return op
346 return op
347
347
348 class partiterator(object):
348 class partiterator(object):
349 def __init__(self, repo, op, unbundler):
349 def __init__(self, repo, op, unbundler):
350 self.repo = repo
350 self.repo = repo
351 self.op = op
351 self.op = op
352 self.unbundler = unbundler
352 self.unbundler = unbundler
353 self.iterator = None
353 self.iterator = None
354 self.count = 0
354 self.count = 0
355 self.current = None
355 self.current = None
356
356
357 def __enter__(self):
357 def __enter__(self):
358 def func():
358 def func():
359 itr = enumerate(self.unbundler.iterparts())
359 itr = enumerate(self.unbundler.iterparts())
360 for count, p in itr:
360 for count, p in itr:
361 self.count = count
361 self.count = count
362 self.current = p
362 self.current = p
363 yield p
363 yield p
364 p.seek(0, 2)
364 p.seek(0, 2)
365 self.current = None
365 self.current = None
366 self.iterator = func()
366 self.iterator = func()
367 return self.iterator
367 return self.iterator
368
368
369 def __exit__(self, type, exc, tb):
369 def __exit__(self, type, exc, tb):
370 if not self.iterator:
370 if not self.iterator:
371 return
371 return
372
372
373 if exc:
373 if exc:
374 # If exiting or interrupted, do not attempt to seek the stream in
374 # If exiting or interrupted, do not attempt to seek the stream in
375 # the finally block below. This makes abort faster.
375 # the finally block below. This makes abort faster.
376 if (self.current and
376 if (self.current and
377 not isinstance(exc, (SystemExit, KeyboardInterrupt))):
377 not isinstance(exc, (SystemExit, KeyboardInterrupt))):
378 # consume the part content to not corrupt the stream.
378 # consume the part content to not corrupt the stream.
379 self.current.seek(0, 2)
379 self.current.seek(0, 2)
380
380
381 # Any exceptions seeking to the end of the bundle at this point are
381 # Any exceptions seeking to the end of the bundle at this point are
382 # almost certainly related to the underlying stream being bad.
382 # almost certainly related to the underlying stream being bad.
383 # And, chances are that the exception we're handling is related to
383 # And, chances are that the exception we're handling is related to
384 # getting in that bad state. So, we swallow the seeking error and
384 # getting in that bad state. So, we swallow the seeking error and
385 # re-raise the original error.
385 # re-raise the original error.
386 seekerror = False
386 seekerror = False
387 try:
387 try:
388 for part in self.iterator:
388 for part in self.iterator:
389 # consume the bundle content
389 # consume the bundle content
390 part.seek(0, 2)
390 part.seek(0, 2)
391 except Exception:
391 except Exception:
392 seekerror = True
392 seekerror = True
393
393
394 # Small hack to let caller code distinguish exceptions from bundle2
394 # Small hack to let caller code distinguish exceptions from bundle2
395 # processing from processing the old format. This is mostly needed
395 # processing from processing the old format. This is mostly needed
396 # to handle different return codes to unbundle according to the type
396 # to handle different return codes to unbundle according to the type
397 # of bundle. We should probably clean up or drop this return code
397 # of bundle. We should probably clean up or drop this return code
398 # craziness in a future version.
398 # craziness in a future version.
399 exc.duringunbundle2 = True
399 exc.duringunbundle2 = True
400 salvaged = []
400 salvaged = []
401 replycaps = None
401 replycaps = None
402 if self.op.reply is not None:
402 if self.op.reply is not None:
403 salvaged = self.op.reply.salvageoutput()
403 salvaged = self.op.reply.salvageoutput()
404 replycaps = self.op.reply.capabilities
404 replycaps = self.op.reply.capabilities
405 exc._replycaps = replycaps
405 exc._replycaps = replycaps
406 exc._bundle2salvagedoutput = salvaged
406 exc._bundle2salvagedoutput = salvaged
407
407
408 # Re-raising from a variable loses the original stack. So only use
408 # Re-raising from a variable loses the original stack. So only use
409 # that form if we need to.
409 # that form if we need to.
410 if seekerror:
410 if seekerror:
411 raise exc
411 raise exc
412
412
413 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
413 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
414 self.count)
414 self.count)
415
415
416 def processbundle(repo, unbundler, transactiongetter=None, op=None):
416 def processbundle(repo, unbundler, transactiongetter=None, op=None):
417 """This function process a bundle, apply effect to/from a repo
417 """This function process a bundle, apply effect to/from a repo
418
418
419 It iterates over each part then searches for and uses the proper handling
419 It iterates over each part then searches for and uses the proper handling
420 code to process the part. Parts are processed in order.
420 code to process the part. Parts are processed in order.
421
421
422 Unknown Mandatory part will abort the process.
422 Unknown Mandatory part will abort the process.
423
423
424 It is temporarily possible to provide a prebuilt bundleoperation to the
424 It is temporarily possible to provide a prebuilt bundleoperation to the
425 function. This is used to ensure output is properly propagated in case of
425 function. This is used to ensure output is properly propagated in case of
426 an error during the unbundling. This output capturing part will likely be
426 an error during the unbundling. This output capturing part will likely be
427 reworked and this ability will probably go away in the process.
427 reworked and this ability will probably go away in the process.
428 """
428 """
429 if op is None:
429 if op is None:
430 if transactiongetter is None:
430 if transactiongetter is None:
431 transactiongetter = _notransaction
431 transactiongetter = _notransaction
432 op = bundleoperation(repo, transactiongetter)
432 op = bundleoperation(repo, transactiongetter)
433 # todo:
433 # todo:
434 # - replace this is a init function soon.
434 # - replace this is a init function soon.
435 # - exception catching
435 # - exception catching
436 unbundler.params
436 unbundler.params
437 if repo.ui.debugflag:
437 if repo.ui.debugflag:
438 msg = ['bundle2-input-bundle:']
438 msg = ['bundle2-input-bundle:']
439 if unbundler.params:
439 if unbundler.params:
440 msg.append(' %i params' % len(unbundler.params))
440 msg.append(' %i params' % len(unbundler.params))
441 if op._gettransaction is None or op._gettransaction is _notransaction:
441 if op._gettransaction is None or op._gettransaction is _notransaction:
442 msg.append(' no-transaction')
442 msg.append(' no-transaction')
443 else:
443 else:
444 msg.append(' with-transaction')
444 msg.append(' with-transaction')
445 msg.append('\n')
445 msg.append('\n')
446 repo.ui.debug(''.join(msg))
446 repo.ui.debug(''.join(msg))
447
447
448 processparts(repo, op, unbundler)
448 processparts(repo, op, unbundler)
449
449
450 return op
450 return op
451
451
452 def processparts(repo, op, unbundler):
452 def processparts(repo, op, unbundler):
453 with partiterator(repo, op, unbundler) as parts:
453 with partiterator(repo, op, unbundler) as parts:
454 for part in parts:
454 for part in parts:
455 _processpart(op, part)
455 _processpart(op, part)
456
456
457 def _processchangegroup(op, cg, tr, source, url, **kwargs):
457 def _processchangegroup(op, cg, tr, source, url, **kwargs):
458 ret = cg.apply(op.repo, tr, source, url, **kwargs)
458 ret = cg.apply(op.repo, tr, source, url, **kwargs)
459 op.records.add('changegroup', {
459 op.records.add('changegroup', {
460 'return': ret,
460 'return': ret,
461 })
461 })
462 return ret
462 return ret
463
463
464 def _gethandler(op, part):
464 def _gethandler(op, part):
465 status = 'unknown' # used by debug output
465 status = 'unknown' # used by debug output
466 try:
466 try:
467 handler = parthandlermapping.get(part.type)
467 handler = parthandlermapping.get(part.type)
468 if handler is None:
468 if handler is None:
469 status = 'unsupported-type'
469 status = 'unsupported-type'
470 raise error.BundleUnknownFeatureError(parttype=part.type)
470 raise error.BundleUnknownFeatureError(parttype=part.type)
471 indebug(op.ui, 'found a handler for part %s' % part.type)
471 indebug(op.ui, 'found a handler for part %s' % part.type)
472 unknownparams = part.mandatorykeys - handler.params
472 unknownparams = part.mandatorykeys - handler.params
473 if unknownparams:
473 if unknownparams:
474 unknownparams = list(unknownparams)
474 unknownparams = list(unknownparams)
475 unknownparams.sort()
475 unknownparams.sort()
476 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
476 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
477 raise error.BundleUnknownFeatureError(parttype=part.type,
477 raise error.BundleUnknownFeatureError(parttype=part.type,
478 params=unknownparams)
478 params=unknownparams)
479 status = 'supported'
479 status = 'supported'
480 except error.BundleUnknownFeatureError as exc:
480 except error.BundleUnknownFeatureError as exc:
481 if part.mandatory: # mandatory parts
481 if part.mandatory: # mandatory parts
482 raise
482 raise
483 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
483 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
484 return # skip to part processing
484 return # skip to part processing
485 finally:
485 finally:
486 if op.ui.debugflag:
486 if op.ui.debugflag:
487 msg = ['bundle2-input-part: "%s"' % part.type]
487 msg = ['bundle2-input-part: "%s"' % part.type]
488 if not part.mandatory:
488 if not part.mandatory:
489 msg.append(' (advisory)')
489 msg.append(' (advisory)')
490 nbmp = len(part.mandatorykeys)
490 nbmp = len(part.mandatorykeys)
491 nbap = len(part.params) - nbmp
491 nbap = len(part.params) - nbmp
492 if nbmp or nbap:
492 if nbmp or nbap:
493 msg.append(' (params:')
493 msg.append(' (params:')
494 if nbmp:
494 if nbmp:
495 msg.append(' %i mandatory' % nbmp)
495 msg.append(' %i mandatory' % nbmp)
496 if nbap:
496 if nbap:
497 msg.append(' %i advisory' % nbmp)
497 msg.append(' %i advisory' % nbmp)
498 msg.append(')')
498 msg.append(')')
499 msg.append(' %s\n' % status)
499 msg.append(' %s\n' % status)
500 op.ui.debug(''.join(msg))
500 op.ui.debug(''.join(msg))
501
501
502 return handler
502 return handler
503
503
504 def _processpart(op, part):
504 def _processpart(op, part):
505 """process a single part from a bundle
505 """process a single part from a bundle
506
506
507 The part is guaranteed to have been fully consumed when the function exits
507 The part is guaranteed to have been fully consumed when the function exits
508 (even if an exception is raised)."""
508 (even if an exception is raised)."""
509 handler = _gethandler(op, part)
509 handler = _gethandler(op, part)
510 if handler is None:
510 if handler is None:
511 return
511 return
512
512
513 # handler is called outside the above try block so that we don't
513 # handler is called outside the above try block so that we don't
514 # risk catching KeyErrors from anything other than the
514 # risk catching KeyErrors from anything other than the
515 # parthandlermapping lookup (any KeyError raised by handler()
515 # parthandlermapping lookup (any KeyError raised by handler()
516 # itself represents a defect of a different variety).
516 # itself represents a defect of a different variety).
517 output = None
517 output = None
518 if op.captureoutput and op.reply is not None:
518 if op.captureoutput and op.reply is not None:
519 op.ui.pushbuffer(error=True, subproc=True)
519 op.ui.pushbuffer(error=True, subproc=True)
520 output = ''
520 output = ''
521 try:
521 try:
522 handler(op, part)
522 handler(op, part)
523 finally:
523 finally:
524 if output is not None:
524 if output is not None:
525 output = op.ui.popbuffer()
525 output = op.ui.popbuffer()
526 if output:
526 if output:
527 outpart = op.reply.newpart('output', data=output,
527 outpart = op.reply.newpart('output', data=output,
528 mandatory=False)
528 mandatory=False)
529 outpart.addparam(
529 outpart.addparam(
530 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
530 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
531
531
532 def decodecaps(blob):
532 def decodecaps(blob):
533 """decode a bundle2 caps bytes blob into a dictionary
533 """decode a bundle2 caps bytes blob into a dictionary
534
534
535 The blob is a list of capabilities (one per line)
535 The blob is a list of capabilities (one per line)
536 Capabilities may have values using a line of the form::
536 Capabilities may have values using a line of the form::
537
537
538 capability=value1,value2,value3
538 capability=value1,value2,value3
539
539
540 The values are always a list."""
540 The values are always a list."""
541 caps = {}
541 caps = {}
542 for line in blob.splitlines():
542 for line in blob.splitlines():
543 if not line:
543 if not line:
544 continue
544 continue
545 if '=' not in line:
545 if '=' not in line:
546 key, vals = line, ()
546 key, vals = line, ()
547 else:
547 else:
548 key, vals = line.split('=', 1)
548 key, vals = line.split('=', 1)
549 vals = vals.split(',')
549 vals = vals.split(',')
550 key = urlreq.unquote(key)
550 key = urlreq.unquote(key)
551 vals = [urlreq.unquote(v) for v in vals]
551 vals = [urlreq.unquote(v) for v in vals]
552 caps[key] = vals
552 caps[key] = vals
553 return caps
553 return caps
554
554
555 def encodecaps(caps):
555 def encodecaps(caps):
556 """encode a bundle2 caps dictionary into a bytes blob"""
556 """encode a bundle2 caps dictionary into a bytes blob"""
557 chunks = []
557 chunks = []
558 for ca in sorted(caps):
558 for ca in sorted(caps):
559 vals = caps[ca]
559 vals = caps[ca]
560 ca = urlreq.quote(ca)
560 ca = urlreq.quote(ca)
561 vals = [urlreq.quote(v) for v in vals]
561 vals = [urlreq.quote(v) for v in vals]
562 if vals:
562 if vals:
563 ca = "%s=%s" % (ca, ','.join(vals))
563 ca = "%s=%s" % (ca, ','.join(vals))
564 chunks.append(ca)
564 chunks.append(ca)
565 return '\n'.join(chunks)
565 return '\n'.join(chunks)
566
566
567 bundletypes = {
567 bundletypes = {
568 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
568 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
569 # since the unification ssh accepts a header but there
569 # since the unification ssh accepts a header but there
570 # is no capability signaling it.
570 # is no capability signaling it.
571 "HG20": (), # special-cased below
571 "HG20": (), # special-cased below
572 "HG10UN": ("HG10UN", 'UN'),
572 "HG10UN": ("HG10UN", 'UN'),
573 "HG10BZ": ("HG10", 'BZ'),
573 "HG10BZ": ("HG10", 'BZ'),
574 "HG10GZ": ("HG10GZ", 'GZ'),
574 "HG10GZ": ("HG10GZ", 'GZ'),
575 }
575 }
576
576
577 # hgweb uses this list to communicate its preferred type
577 # hgweb uses this list to communicate its preferred type
578 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
578 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
579
579
580 class bundle20(object):
580 class bundle20(object):
581 """represent an outgoing bundle2 container
581 """represent an outgoing bundle2 container
582
582
583 Use the `addparam` method to add stream level parameter. and `newpart` to
583 Use the `addparam` method to add stream level parameter. and `newpart` to
584 populate it. Then call `getchunks` to retrieve all the binary chunks of
584 populate it. Then call `getchunks` to retrieve all the binary chunks of
585 data that compose the bundle2 container."""
585 data that compose the bundle2 container."""
586
586
587 _magicstring = 'HG20'
587 _magicstring = 'HG20'
588
588
589 def __init__(self, ui, capabilities=()):
589 def __init__(self, ui, capabilities=()):
590 self.ui = ui
590 self.ui = ui
591 self._params = []
591 self._params = []
592 self._parts = []
592 self._parts = []
593 self.capabilities = dict(capabilities)
593 self.capabilities = dict(capabilities)
594 self._compengine = util.compengines.forbundletype('UN')
594 self._compengine = util.compengines.forbundletype('UN')
595 self._compopts = None
595 self._compopts = None
596
596
597 def setcompression(self, alg, compopts=None):
597 def setcompression(self, alg, compopts=None):
598 """setup core part compression to <alg>"""
598 """setup core part compression to <alg>"""
599 if alg in (None, 'UN'):
599 if alg in (None, 'UN'):
600 return
600 return
601 assert not any(n.lower() == 'compression' for n, v in self._params)
601 assert not any(n.lower() == 'compression' for n, v in self._params)
602 self.addparam('Compression', alg)
602 self.addparam('Compression', alg)
603 self._compengine = util.compengines.forbundletype(alg)
603 self._compengine = util.compengines.forbundletype(alg)
604 self._compopts = compopts
604 self._compopts = compopts
605
605
606 @property
606 @property
607 def nbparts(self):
607 def nbparts(self):
608 """total number of parts added to the bundler"""
608 """total number of parts added to the bundler"""
609 return len(self._parts)
609 return len(self._parts)
610
610
611 # methods used to defines the bundle2 content
611 # methods used to defines the bundle2 content
612 def addparam(self, name, value=None):
612 def addparam(self, name, value=None):
613 """add a stream level parameter"""
613 """add a stream level parameter"""
614 if not name:
614 if not name:
615 raise ValueError(r'empty parameter name')
615 raise ValueError(r'empty parameter name')
616 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
616 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
617 raise ValueError(r'non letter first character: %s' % name)
617 raise ValueError(r'non letter first character: %s' % name)
618 self._params.append((name, value))
618 self._params.append((name, value))
619
619
620 def addpart(self, part):
620 def addpart(self, part):
621 """add a new part to the bundle2 container
621 """add a new part to the bundle2 container
622
622
623 Parts contains the actual applicative payload."""
623 Parts contains the actual applicative payload."""
624 assert part.id is None
624 assert part.id is None
625 part.id = len(self._parts) # very cheap counter
625 part.id = len(self._parts) # very cheap counter
626 self._parts.append(part)
626 self._parts.append(part)
627
627
628 def newpart(self, typeid, *args, **kwargs):
628 def newpart(self, typeid, *args, **kwargs):
629 """create a new part and add it to the containers
629 """create a new part and add it to the containers
630
630
631 As the part is directly added to the containers. For now, this means
631 As the part is directly added to the containers. For now, this means
632 that any failure to properly initialize the part after calling
632 that any failure to properly initialize the part after calling
633 ``newpart`` should result in a failure of the whole bundling process.
633 ``newpart`` should result in a failure of the whole bundling process.
634
634
635 You can still fall back to manually create and add if you need better
635 You can still fall back to manually create and add if you need better
636 control."""
636 control."""
637 part = bundlepart(typeid, *args, **kwargs)
637 part = bundlepart(typeid, *args, **kwargs)
638 self.addpart(part)
638 self.addpart(part)
639 return part
639 return part
640
640
641 # methods used to generate the bundle2 stream
641 # methods used to generate the bundle2 stream
642 def getchunks(self):
642 def getchunks(self):
643 if self.ui.debugflag:
643 if self.ui.debugflag:
644 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
644 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
645 if self._params:
645 if self._params:
646 msg.append(' (%i params)' % len(self._params))
646 msg.append(' (%i params)' % len(self._params))
647 msg.append(' %i parts total\n' % len(self._parts))
647 msg.append(' %i parts total\n' % len(self._parts))
648 self.ui.debug(''.join(msg))
648 self.ui.debug(''.join(msg))
649 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
649 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
650 yield self._magicstring
650 yield self._magicstring
651 param = self._paramchunk()
651 param = self._paramchunk()
652 outdebug(self.ui, 'bundle parameter: %s' % param)
652 outdebug(self.ui, 'bundle parameter: %s' % param)
653 yield _pack(_fstreamparamsize, len(param))
653 yield _pack(_fstreamparamsize, len(param))
654 if param:
654 if param:
655 yield param
655 yield param
656 for chunk in self._compengine.compressstream(self._getcorechunk(),
656 for chunk in self._compengine.compressstream(self._getcorechunk(),
657 self._compopts):
657 self._compopts):
658 yield chunk
658 yield chunk
659
659
660 def _paramchunk(self):
660 def _paramchunk(self):
661 """return a encoded version of all stream parameters"""
661 """return a encoded version of all stream parameters"""
662 blocks = []
662 blocks = []
663 for par, value in self._params:
663 for par, value in self._params:
664 par = urlreq.quote(par)
664 par = urlreq.quote(par)
665 if value is not None:
665 if value is not None:
666 value = urlreq.quote(value)
666 value = urlreq.quote(value)
667 par = '%s=%s' % (par, value)
667 par = '%s=%s' % (par, value)
668 blocks.append(par)
668 blocks.append(par)
669 return ' '.join(blocks)
669 return ' '.join(blocks)
670
670
671 def _getcorechunk(self):
671 def _getcorechunk(self):
672 """yield chunk for the core part of the bundle
672 """yield chunk for the core part of the bundle
673
673
674 (all but headers and parameters)"""
674 (all but headers and parameters)"""
675 outdebug(self.ui, 'start of parts')
675 outdebug(self.ui, 'start of parts')
676 for part in self._parts:
676 for part in self._parts:
677 outdebug(self.ui, 'bundle part: "%s"' % part.type)
677 outdebug(self.ui, 'bundle part: "%s"' % part.type)
678 for chunk in part.getchunks(ui=self.ui):
678 for chunk in part.getchunks(ui=self.ui):
679 yield chunk
679 yield chunk
680 outdebug(self.ui, 'end of bundle')
680 outdebug(self.ui, 'end of bundle')
681 yield _pack(_fpartheadersize, 0)
681 yield _pack(_fpartheadersize, 0)
682
682
683
683
684 def salvageoutput(self):
684 def salvageoutput(self):
685 """return a list with a copy of all output parts in the bundle
685 """return a list with a copy of all output parts in the bundle
686
686
687 This is meant to be used during error handling to make sure we preserve
687 This is meant to be used during error handling to make sure we preserve
688 server output"""
688 server output"""
689 salvaged = []
689 salvaged = []
690 for part in self._parts:
690 for part in self._parts:
691 if part.type.startswith('output'):
691 if part.type.startswith('output'):
692 salvaged.append(part.copy())
692 salvaged.append(part.copy())
693 return salvaged
693 return salvaged
694
694
695
695
696 class unpackermixin(object):
696 class unpackermixin(object):
697 """A mixin to extract bytes and struct data from a stream"""
697 """A mixin to extract bytes and struct data from a stream"""
698
698
699 def __init__(self, fp):
699 def __init__(self, fp):
700 self._fp = fp
700 self._fp = fp
701
701
702 def _unpack(self, format):
702 def _unpack(self, format):
703 """unpack this struct format from the stream
703 """unpack this struct format from the stream
704
704
705 This method is meant for internal usage by the bundle2 protocol only.
705 This method is meant for internal usage by the bundle2 protocol only.
706 They directly manipulate the low level stream including bundle2 level
706 They directly manipulate the low level stream including bundle2 level
707 instruction.
707 instruction.
708
708
709 Do not use it to implement higher-level logic or methods."""
709 Do not use it to implement higher-level logic or methods."""
710 data = self._readexact(struct.calcsize(format))
710 data = self._readexact(struct.calcsize(format))
711 return _unpack(format, data)
711 return _unpack(format, data)
712
712
713 def _readexact(self, size):
713 def _readexact(self, size):
714 """read exactly <size> bytes from the stream
714 """read exactly <size> bytes from the stream
715
715
716 This method is meant for internal usage by the bundle2 protocol only.
716 This method is meant for internal usage by the bundle2 protocol only.
717 They directly manipulate the low level stream including bundle2 level
717 They directly manipulate the low level stream including bundle2 level
718 instruction.
718 instruction.
719
719
720 Do not use it to implement higher-level logic or methods."""
720 Do not use it to implement higher-level logic or methods."""
721 return changegroup.readexactly(self._fp, size)
721 return changegroup.readexactly(self._fp, size)
722
722
723 def getunbundler(ui, fp, magicstring=None):
723 def getunbundler(ui, fp, magicstring=None):
724 """return a valid unbundler object for a given magicstring"""
724 """return a valid unbundler object for a given magicstring"""
725 if magicstring is None:
725 if magicstring is None:
726 magicstring = changegroup.readexactly(fp, 4)
726 magicstring = changegroup.readexactly(fp, 4)
727 magic, version = magicstring[0:2], magicstring[2:4]
727 magic, version = magicstring[0:2], magicstring[2:4]
728 if magic != 'HG':
728 if magic != 'HG':
729 ui.debug(
729 ui.debug(
730 "error: invalid magic: %r (version %r), should be 'HG'\n"
730 "error: invalid magic: %r (version %r), should be 'HG'\n"
731 % (magic, version))
731 % (magic, version))
732 raise error.Abort(_('not a Mercurial bundle'))
732 raise error.Abort(_('not a Mercurial bundle'))
733 unbundlerclass = formatmap.get(version)
733 unbundlerclass = formatmap.get(version)
734 if unbundlerclass is None:
734 if unbundlerclass is None:
735 raise error.Abort(_('unknown bundle version %s') % version)
735 raise error.Abort(_('unknown bundle version %s') % version)
736 unbundler = unbundlerclass(ui, fp)
736 unbundler = unbundlerclass(ui, fp)
737 indebug(ui, 'start processing of %s stream' % magicstring)
737 indebug(ui, 'start processing of %s stream' % magicstring)
738 return unbundler
738 return unbundler
739
739
740 class unbundle20(unpackermixin):
740 class unbundle20(unpackermixin):
741 """interpret a bundle2 stream
741 """interpret a bundle2 stream
742
742
743 This class is fed with a binary stream and yields parts through its
743 This class is fed with a binary stream and yields parts through its
744 `iterparts` methods."""
744 `iterparts` methods."""
745
745
746 _magicstring = 'HG20'
746 _magicstring = 'HG20'
747
747
748 def __init__(self, ui, fp):
748 def __init__(self, ui, fp):
749 """If header is specified, we do not read it out of the stream."""
749 """If header is specified, we do not read it out of the stream."""
750 self.ui = ui
750 self.ui = ui
751 self._compengine = util.compengines.forbundletype('UN')
751 self._compengine = util.compengines.forbundletype('UN')
752 self._compressed = None
752 self._compressed = None
753 super(unbundle20, self).__init__(fp)
753 super(unbundle20, self).__init__(fp)
754
754
755 @util.propertycache
755 @util.propertycache
756 def params(self):
756 def params(self):
757 """dictionary of stream level parameters"""
757 """dictionary of stream level parameters"""
758 indebug(self.ui, 'reading bundle2 stream parameters')
758 indebug(self.ui, 'reading bundle2 stream parameters')
759 params = {}
759 params = {}
760 paramssize = self._unpack(_fstreamparamsize)[0]
760 paramssize = self._unpack(_fstreamparamsize)[0]
761 if paramssize < 0:
761 if paramssize < 0:
762 raise error.BundleValueError('negative bundle param size: %i'
762 raise error.BundleValueError('negative bundle param size: %i'
763 % paramssize)
763 % paramssize)
764 if paramssize:
764 if paramssize:
765 params = self._readexact(paramssize)
765 params = self._readexact(paramssize)
766 params = self._processallparams(params)
766 params = self._processallparams(params)
767 return params
767 return params
768
768
769 def _processallparams(self, paramsblock):
769 def _processallparams(self, paramsblock):
770 """"""
770 """"""
771 params = util.sortdict()
771 params = util.sortdict()
772 for p in paramsblock.split(' '):
772 for p in paramsblock.split(' '):
773 p = p.split('=', 1)
773 p = p.split('=', 1)
774 p = [urlreq.unquote(i) for i in p]
774 p = [urlreq.unquote(i) for i in p]
775 if len(p) < 2:
775 if len(p) < 2:
776 p.append(None)
776 p.append(None)
777 self._processparam(*p)
777 self._processparam(*p)
778 params[p[0]] = p[1]
778 params[p[0]] = p[1]
779 return params
779 return params
780
780
781
781
782 def _processparam(self, name, value):
782 def _processparam(self, name, value):
783 """process a parameter, applying its effect if needed
783 """process a parameter, applying its effect if needed
784
784
785 Parameter starting with a lower case letter are advisory and will be
785 Parameter starting with a lower case letter are advisory and will be
786 ignored when unknown. Those starting with an upper case letter are
786 ignored when unknown. Those starting with an upper case letter are
787 mandatory and will this function will raise a KeyError when unknown.
787 mandatory and will this function will raise a KeyError when unknown.
788
788
789 Note: no option are currently supported. Any input will be either
789 Note: no option are currently supported. Any input will be either
790 ignored or failing.
790 ignored or failing.
791 """
791 """
792 if not name:
792 if not name:
793 raise ValueError(r'empty parameter name')
793 raise ValueError(r'empty parameter name')
794 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
794 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
795 raise ValueError(r'non letter first character: %s' % name)
795 raise ValueError(r'non letter first character: %s' % name)
796 try:
796 try:
797 handler = b2streamparamsmap[name.lower()]
797 handler = b2streamparamsmap[name.lower()]
798 except KeyError:
798 except KeyError:
799 if name[0:1].islower():
799 if name[0:1].islower():
800 indebug(self.ui, "ignoring unknown parameter %s" % name)
800 indebug(self.ui, "ignoring unknown parameter %s" % name)
801 else:
801 else:
802 raise error.BundleUnknownFeatureError(params=(name,))
802 raise error.BundleUnknownFeatureError(params=(name,))
803 else:
803 else:
804 handler(self, name, value)
804 handler(self, name, value)
805
805
806 def _forwardchunks(self):
806 def _forwardchunks(self):
807 """utility to transfer a bundle2 as binary
807 """utility to transfer a bundle2 as binary
808
808
809 This is made necessary by the fact the 'getbundle' command over 'ssh'
809 This is made necessary by the fact the 'getbundle' command over 'ssh'
810 have no way to know then the reply end, relying on the bundle to be
810 have no way to know then the reply end, relying on the bundle to be
811 interpreted to know its end. This is terrible and we are sorry, but we
811 interpreted to know its end. This is terrible and we are sorry, but we
812 needed to move forward to get general delta enabled.
812 needed to move forward to get general delta enabled.
813 """
813 """
814 yield self._magicstring
814 yield self._magicstring
815 assert 'params' not in vars(self)
815 assert 'params' not in vars(self)
816 paramssize = self._unpack(_fstreamparamsize)[0]
816 paramssize = self._unpack(_fstreamparamsize)[0]
817 if paramssize < 0:
817 if paramssize < 0:
818 raise error.BundleValueError('negative bundle param size: %i'
818 raise error.BundleValueError('negative bundle param size: %i'
819 % paramssize)
819 % paramssize)
820 yield _pack(_fstreamparamsize, paramssize)
820 yield _pack(_fstreamparamsize, paramssize)
821 if paramssize:
821 if paramssize:
822 params = self._readexact(paramssize)
822 params = self._readexact(paramssize)
823 self._processallparams(params)
823 self._processallparams(params)
824 yield params
824 yield params
825 assert self._compengine.bundletype == 'UN'
825 assert self._compengine.bundletype == 'UN'
826 # From there, payload might need to be decompressed
826 # From there, payload might need to be decompressed
827 self._fp = self._compengine.decompressorreader(self._fp)
827 self._fp = self._compengine.decompressorreader(self._fp)
828 emptycount = 0
828 emptycount = 0
829 while emptycount < 2:
829 while emptycount < 2:
830 # so we can brainlessly loop
830 # so we can brainlessly loop
831 assert _fpartheadersize == _fpayloadsize
831 assert _fpartheadersize == _fpayloadsize
832 size = self._unpack(_fpartheadersize)[0]
832 size = self._unpack(_fpartheadersize)[0]
833 yield _pack(_fpartheadersize, size)
833 yield _pack(_fpartheadersize, size)
834 if size:
834 if size:
835 emptycount = 0
835 emptycount = 0
836 else:
836 else:
837 emptycount += 1
837 emptycount += 1
838 continue
838 continue
839 if size == flaginterrupt:
839 if size == flaginterrupt:
840 continue
840 continue
841 elif size < 0:
841 elif size < 0:
842 raise error.BundleValueError('negative chunk size: %i')
842 raise error.BundleValueError('negative chunk size: %i')
843 yield self._readexact(size)
843 yield self._readexact(size)
844
844
845
845
846 def iterparts(self):
846 def iterparts(self):
847 """yield all parts contained in the stream"""
847 """yield all parts contained in the stream"""
848 # make sure param have been loaded
848 # make sure param have been loaded
849 self.params
849 self.params
850 # From there, payload need to be decompressed
850 # From there, payload need to be decompressed
851 self._fp = self._compengine.decompressorreader(self._fp)
851 self._fp = self._compengine.decompressorreader(self._fp)
852 indebug(self.ui, 'start extraction of bundle2 parts')
852 indebug(self.ui, 'start extraction of bundle2 parts')
853 headerblock = self._readpartheader()
853 headerblock = self._readpartheader()
854 while headerblock is not None:
854 while headerblock is not None:
855 part = unbundlepart(self.ui, headerblock, self._fp)
855 part = unbundlepart(self.ui, headerblock, self._fp)
856 yield part
856 yield part
857 # Seek to the end of the part to force it's consumption so the next
857 # Seek to the end of the part to force it's consumption so the next
858 # part can be read. But then seek back to the beginning so the
858 # part can be read. But then seek back to the beginning so the
859 # code consuming this generator has a part that starts at 0.
859 # code consuming this generator has a part that starts at 0.
860 part.seek(0, 2)
860 part.seek(0, 2)
861 part.seek(0)
861 part.seek(0)
862 headerblock = self._readpartheader()
862 headerblock = self._readpartheader()
863 indebug(self.ui, 'end of bundle2 stream')
863 indebug(self.ui, 'end of bundle2 stream')
864
864
865 def _readpartheader(self):
865 def _readpartheader(self):
866 """reads a part header size and return the bytes blob
866 """reads a part header size and return the bytes blob
867
867
868 returns None if empty"""
868 returns None if empty"""
869 headersize = self._unpack(_fpartheadersize)[0]
869 headersize = self._unpack(_fpartheadersize)[0]
870 if headersize < 0:
870 if headersize < 0:
871 raise error.BundleValueError('negative part header size: %i'
871 raise error.BundleValueError('negative part header size: %i'
872 % headersize)
872 % headersize)
873 indebug(self.ui, 'part header size: %i' % headersize)
873 indebug(self.ui, 'part header size: %i' % headersize)
874 if headersize:
874 if headersize:
875 return self._readexact(headersize)
875 return self._readexact(headersize)
876 return None
876 return None
877
877
878 def compressed(self):
878 def compressed(self):
879 self.params # load params
879 self.params # load params
880 return self._compressed
880 return self._compressed
881
881
882 def close(self):
882 def close(self):
883 """close underlying file"""
883 """close underlying file"""
884 if util.safehasattr(self._fp, 'close'):
884 if util.safehasattr(self._fp, 'close'):
885 return self._fp.close()
885 return self._fp.close()
886
886
887 formatmap = {'20': unbundle20}
887 formatmap = {'20': unbundle20}
888
888
889 b2streamparamsmap = {}
889 b2streamparamsmap = {}
890
890
891 def b2streamparamhandler(name):
891 def b2streamparamhandler(name):
892 """register a handler for a stream level parameter"""
892 """register a handler for a stream level parameter"""
893 def decorator(func):
893 def decorator(func):
894 assert name not in formatmap
894 assert name not in formatmap
895 b2streamparamsmap[name] = func
895 b2streamparamsmap[name] = func
896 return func
896 return func
897 return decorator
897 return decorator
898
898
899 @b2streamparamhandler('compression')
899 @b2streamparamhandler('compression')
900 def processcompression(unbundler, param, value):
900 def processcompression(unbundler, param, value):
901 """read compression parameter and install payload decompression"""
901 """read compression parameter and install payload decompression"""
902 if value not in util.compengines.supportedbundletypes:
902 if value not in util.compengines.supportedbundletypes:
903 raise error.BundleUnknownFeatureError(params=(param,),
903 raise error.BundleUnknownFeatureError(params=(param,),
904 values=(value,))
904 values=(value,))
905 unbundler._compengine = util.compengines.forbundletype(value)
905 unbundler._compengine = util.compengines.forbundletype(value)
906 if value is not None:
906 if value is not None:
907 unbundler._compressed = True
907 unbundler._compressed = True
908
908
909 class bundlepart(object):
909 class bundlepart(object):
910 """A bundle2 part contains application level payload
910 """A bundle2 part contains application level payload
911
911
912 The part `type` is used to route the part to the application level
912 The part `type` is used to route the part to the application level
913 handler.
913 handler.
914
914
915 The part payload is contained in ``part.data``. It could be raw bytes or a
915 The part payload is contained in ``part.data``. It could be raw bytes or a
916 generator of byte chunks.
916 generator of byte chunks.
917
917
918 You can add parameters to the part using the ``addparam`` method.
918 You can add parameters to the part using the ``addparam`` method.
919 Parameters can be either mandatory (default) or advisory. Remote side
919 Parameters can be either mandatory (default) or advisory. Remote side
920 should be able to safely ignore the advisory ones.
920 should be able to safely ignore the advisory ones.
921
921
922 Both data and parameters cannot be modified after the generation has begun.
922 Both data and parameters cannot be modified after the generation has begun.
923 """
923 """
924
924
925 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
925 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
926 data='', mandatory=True):
926 data='', mandatory=True):
927 validateparttype(parttype)
927 validateparttype(parttype)
928 self.id = None
928 self.id = None
929 self.type = parttype
929 self.type = parttype
930 self._data = data
930 self._data = data
931 self._mandatoryparams = list(mandatoryparams)
931 self._mandatoryparams = list(mandatoryparams)
932 self._advisoryparams = list(advisoryparams)
932 self._advisoryparams = list(advisoryparams)
933 # checking for duplicated entries
933 # checking for duplicated entries
934 self._seenparams = set()
934 self._seenparams = set()
935 for pname, __ in self._mandatoryparams + self._advisoryparams:
935 for pname, __ in self._mandatoryparams + self._advisoryparams:
936 if pname in self._seenparams:
936 if pname in self._seenparams:
937 raise error.ProgrammingError('duplicated params: %s' % pname)
937 raise error.ProgrammingError('duplicated params: %s' % pname)
938 self._seenparams.add(pname)
938 self._seenparams.add(pname)
939 # status of the part's generation:
939 # status of the part's generation:
940 # - None: not started,
940 # - None: not started,
941 # - False: currently generated,
941 # - False: currently generated,
942 # - True: generation done.
942 # - True: generation done.
943 self._generated = None
943 self._generated = None
944 self.mandatory = mandatory
944 self.mandatory = mandatory
945
945
946 def __repr__(self):
946 def __repr__(self):
947 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
947 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
948 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
948 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
949 % (cls, id(self), self.id, self.type, self.mandatory))
949 % (cls, id(self), self.id, self.type, self.mandatory))
950
950
951 def copy(self):
951 def copy(self):
952 """return a copy of the part
952 """return a copy of the part
953
953
954 The new part have the very same content but no partid assigned yet.
954 The new part have the very same content but no partid assigned yet.
955 Parts with generated data cannot be copied."""
955 Parts with generated data cannot be copied."""
956 assert not util.safehasattr(self.data, 'next')
956 assert not util.safehasattr(self.data, 'next')
957 return self.__class__(self.type, self._mandatoryparams,
957 return self.__class__(self.type, self._mandatoryparams,
958 self._advisoryparams, self._data, self.mandatory)
958 self._advisoryparams, self._data, self.mandatory)
959
959
960 # methods used to defines the part content
960 # methods used to defines the part content
961 @property
961 @property
962 def data(self):
962 def data(self):
963 return self._data
963 return self._data
964
964
965 @data.setter
965 @data.setter
966 def data(self, data):
966 def data(self, data):
967 if self._generated is not None:
967 if self._generated is not None:
968 raise error.ReadOnlyPartError('part is being generated')
968 raise error.ReadOnlyPartError('part is being generated')
969 self._data = data
969 self._data = data
970
970
971 @property
971 @property
972 def mandatoryparams(self):
972 def mandatoryparams(self):
973 # make it an immutable tuple to force people through ``addparam``
973 # make it an immutable tuple to force people through ``addparam``
974 return tuple(self._mandatoryparams)
974 return tuple(self._mandatoryparams)
975
975
976 @property
976 @property
977 def advisoryparams(self):
977 def advisoryparams(self):
978 # make it an immutable tuple to force people through ``addparam``
978 # make it an immutable tuple to force people through ``addparam``
979 return tuple(self._advisoryparams)
979 return tuple(self._advisoryparams)
980
980
981 def addparam(self, name, value='', mandatory=True):
981 def addparam(self, name, value='', mandatory=True):
982 """add a parameter to the part
982 """add a parameter to the part
983
983
984 If 'mandatory' is set to True, the remote handler must claim support
984 If 'mandatory' is set to True, the remote handler must claim support
985 for this parameter or the unbundling will be aborted.
985 for this parameter or the unbundling will be aborted.
986
986
987 The 'name' and 'value' cannot exceed 255 bytes each.
987 The 'name' and 'value' cannot exceed 255 bytes each.
988 """
988 """
989 if self._generated is not None:
989 if self._generated is not None:
990 raise error.ReadOnlyPartError('part is being generated')
990 raise error.ReadOnlyPartError('part is being generated')
991 if name in self._seenparams:
991 if name in self._seenparams:
992 raise ValueError('duplicated params: %s' % name)
992 raise ValueError('duplicated params: %s' % name)
993 self._seenparams.add(name)
993 self._seenparams.add(name)
994 params = self._advisoryparams
994 params = self._advisoryparams
995 if mandatory:
995 if mandatory:
996 params = self._mandatoryparams
996 params = self._mandatoryparams
997 params.append((name, value))
997 params.append((name, value))
998
998
999 # methods used to generates the bundle2 stream
999 # methods used to generates the bundle2 stream
1000 def getchunks(self, ui):
1000 def getchunks(self, ui):
1001 if self._generated is not None:
1001 if self._generated is not None:
1002 raise error.ProgrammingError('part can only be consumed once')
1002 raise error.ProgrammingError('part can only be consumed once')
1003 self._generated = False
1003 self._generated = False
1004
1004
1005 if ui.debugflag:
1005 if ui.debugflag:
1006 msg = ['bundle2-output-part: "%s"' % self.type]
1006 msg = ['bundle2-output-part: "%s"' % self.type]
1007 if not self.mandatory:
1007 if not self.mandatory:
1008 msg.append(' (advisory)')
1008 msg.append(' (advisory)')
1009 nbmp = len(self.mandatoryparams)
1009 nbmp = len(self.mandatoryparams)
1010 nbap = len(self.advisoryparams)
1010 nbap = len(self.advisoryparams)
1011 if nbmp or nbap:
1011 if nbmp or nbap:
1012 msg.append(' (params:')
1012 msg.append(' (params:')
1013 if nbmp:
1013 if nbmp:
1014 msg.append(' %i mandatory' % nbmp)
1014 msg.append(' %i mandatory' % nbmp)
1015 if nbap:
1015 if nbap:
1016 msg.append(' %i advisory' % nbmp)
1016 msg.append(' %i advisory' % nbmp)
1017 msg.append(')')
1017 msg.append(')')
1018 if not self.data:
1018 if not self.data:
1019 msg.append(' empty payload')
1019 msg.append(' empty payload')
1020 elif (util.safehasattr(self.data, 'next')
1020 elif (util.safehasattr(self.data, 'next')
1021 or util.safehasattr(self.data, '__next__')):
1021 or util.safehasattr(self.data, '__next__')):
1022 msg.append(' streamed payload')
1022 msg.append(' streamed payload')
1023 else:
1023 else:
1024 msg.append(' %i bytes payload' % len(self.data))
1024 msg.append(' %i bytes payload' % len(self.data))
1025 msg.append('\n')
1025 msg.append('\n')
1026 ui.debug(''.join(msg))
1026 ui.debug(''.join(msg))
1027
1027
1028 #### header
1028 #### header
1029 if self.mandatory:
1029 if self.mandatory:
1030 parttype = self.type.upper()
1030 parttype = self.type.upper()
1031 else:
1031 else:
1032 parttype = self.type.lower()
1032 parttype = self.type.lower()
1033 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1033 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1034 ## parttype
1034 ## parttype
1035 header = [_pack(_fparttypesize, len(parttype)),
1035 header = [_pack(_fparttypesize, len(parttype)),
1036 parttype, _pack(_fpartid, self.id),
1036 parttype, _pack(_fpartid, self.id),
1037 ]
1037 ]
1038 ## parameters
1038 ## parameters
1039 # count
1039 # count
1040 manpar = self.mandatoryparams
1040 manpar = self.mandatoryparams
1041 advpar = self.advisoryparams
1041 advpar = self.advisoryparams
1042 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1042 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1043 # size
1043 # size
1044 parsizes = []
1044 parsizes = []
1045 for key, value in manpar:
1045 for key, value in manpar:
1046 parsizes.append(len(key))
1046 parsizes.append(len(key))
1047 parsizes.append(len(value))
1047 parsizes.append(len(value))
1048 for key, value in advpar:
1048 for key, value in advpar:
1049 parsizes.append(len(key))
1049 parsizes.append(len(key))
1050 parsizes.append(len(value))
1050 parsizes.append(len(value))
1051 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1051 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1052 header.append(paramsizes)
1052 header.append(paramsizes)
1053 # key, value
1053 # key, value
1054 for key, value in manpar:
1054 for key, value in manpar:
1055 header.append(key)
1055 header.append(key)
1056 header.append(value)
1056 header.append(value)
1057 for key, value in advpar:
1057 for key, value in advpar:
1058 header.append(key)
1058 header.append(key)
1059 header.append(value)
1059 header.append(value)
1060 ## finalize header
1060 ## finalize header
1061 try:
1061 try:
1062 headerchunk = ''.join(header)
1062 headerchunk = ''.join(header)
1063 except TypeError:
1063 except TypeError:
1064 raise TypeError(r'Found a non-bytes trying to '
1064 raise TypeError(r'Found a non-bytes trying to '
1065 r'build bundle part header: %r' % header)
1065 r'build bundle part header: %r' % header)
1066 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1066 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1067 yield _pack(_fpartheadersize, len(headerchunk))
1067 yield _pack(_fpartheadersize, len(headerchunk))
1068 yield headerchunk
1068 yield headerchunk
1069 ## payload
1069 ## payload
1070 try:
1070 try:
1071 for chunk in self._payloadchunks():
1071 for chunk in self._payloadchunks():
1072 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1072 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1073 yield _pack(_fpayloadsize, len(chunk))
1073 yield _pack(_fpayloadsize, len(chunk))
1074 yield chunk
1074 yield chunk
1075 except GeneratorExit:
1075 except GeneratorExit:
1076 # GeneratorExit means that nobody is listening for our
1076 # GeneratorExit means that nobody is listening for our
1077 # results anyway, so just bail quickly rather than trying
1077 # results anyway, so just bail quickly rather than trying
1078 # to produce an error part.
1078 # to produce an error part.
1079 ui.debug('bundle2-generatorexit\n')
1079 ui.debug('bundle2-generatorexit\n')
1080 raise
1080 raise
1081 except BaseException as exc:
1081 except BaseException as exc:
1082 bexc = util.forcebytestr(exc)
1082 bexc = util.forcebytestr(exc)
1083 # backup exception data for later
1083 # backup exception data for later
1084 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1084 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1085 % bexc)
1085 % bexc)
1086 tb = sys.exc_info()[2]
1086 tb = sys.exc_info()[2]
1087 msg = 'unexpected error: %s' % bexc
1087 msg = 'unexpected error: %s' % bexc
1088 interpart = bundlepart('error:abort', [('message', msg)],
1088 interpart = bundlepart('error:abort', [('message', msg)],
1089 mandatory=False)
1089 mandatory=False)
1090 interpart.id = 0
1090 interpart.id = 0
1091 yield _pack(_fpayloadsize, -1)
1091 yield _pack(_fpayloadsize, -1)
1092 for chunk in interpart.getchunks(ui=ui):
1092 for chunk in interpart.getchunks(ui=ui):
1093 yield chunk
1093 yield chunk
1094 outdebug(ui, 'closing payload chunk')
1094 outdebug(ui, 'closing payload chunk')
1095 # abort current part payload
1095 # abort current part payload
1096 yield _pack(_fpayloadsize, 0)
1096 yield _pack(_fpayloadsize, 0)
1097 pycompat.raisewithtb(exc, tb)
1097 pycompat.raisewithtb(exc, tb)
1098 # end of payload
1098 # end of payload
1099 outdebug(ui, 'closing payload chunk')
1099 outdebug(ui, 'closing payload chunk')
1100 yield _pack(_fpayloadsize, 0)
1100 yield _pack(_fpayloadsize, 0)
1101 self._generated = True
1101 self._generated = True
1102
1102
1103 def _payloadchunks(self):
1103 def _payloadchunks(self):
1104 """yield chunks of a the part payload
1104 """yield chunks of a the part payload
1105
1105
1106 Exists to handle the different methods to provide data to a part."""
1106 Exists to handle the different methods to provide data to a part."""
1107 # we only support fixed size data now.
1107 # we only support fixed size data now.
1108 # This will be improved in the future.
1108 # This will be improved in the future.
1109 if (util.safehasattr(self.data, 'next')
1109 if (util.safehasattr(self.data, 'next')
1110 or util.safehasattr(self.data, '__next__')):
1110 or util.safehasattr(self.data, '__next__')):
1111 buff = util.chunkbuffer(self.data)
1111 buff = util.chunkbuffer(self.data)
1112 chunk = buff.read(preferedchunksize)
1112 chunk = buff.read(preferedchunksize)
1113 while chunk:
1113 while chunk:
1114 yield chunk
1114 yield chunk
1115 chunk = buff.read(preferedchunksize)
1115 chunk = buff.read(preferedchunksize)
1116 elif len(self.data):
1116 elif len(self.data):
1117 yield self.data
1117 yield self.data
1118
1118
1119
1119
1120 flaginterrupt = -1
1120 flaginterrupt = -1
1121
1121
1122 class interrupthandler(unpackermixin):
1122 class interrupthandler(unpackermixin):
1123 """read one part and process it with restricted capability
1123 """read one part and process it with restricted capability
1124
1124
1125 This allows to transmit exception raised on the producer size during part
1125 This allows to transmit exception raised on the producer size during part
1126 iteration while the consumer is reading a part.
1126 iteration while the consumer is reading a part.
1127
1127
1128 Part processed in this manner only have access to a ui object,"""
1128 Part processed in this manner only have access to a ui object,"""
1129
1129
1130 def __init__(self, ui, fp):
1130 def __init__(self, ui, fp):
1131 super(interrupthandler, self).__init__(fp)
1131 super(interrupthandler, self).__init__(fp)
1132 self.ui = ui
1132 self.ui = ui
1133
1133
1134 def _readpartheader(self):
1134 def _readpartheader(self):
1135 """reads a part header size and return the bytes blob
1135 """reads a part header size and return the bytes blob
1136
1136
1137 returns None if empty"""
1137 returns None if empty"""
1138 headersize = self._unpack(_fpartheadersize)[0]
1138 headersize = self._unpack(_fpartheadersize)[0]
1139 if headersize < 0:
1139 if headersize < 0:
1140 raise error.BundleValueError('negative part header size: %i'
1140 raise error.BundleValueError('negative part header size: %i'
1141 % headersize)
1141 % headersize)
1142 indebug(self.ui, 'part header size: %i\n' % headersize)
1142 indebug(self.ui, 'part header size: %i\n' % headersize)
1143 if headersize:
1143 if headersize:
1144 return self._readexact(headersize)
1144 return self._readexact(headersize)
1145 return None
1145 return None
1146
1146
1147 def __call__(self):
1147 def __call__(self):
1148
1148
1149 self.ui.debug('bundle2-input-stream-interrupt:'
1149 self.ui.debug('bundle2-input-stream-interrupt:'
1150 ' opening out of band context\n')
1150 ' opening out of band context\n')
1151 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1151 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1152 headerblock = self._readpartheader()
1152 headerblock = self._readpartheader()
1153 if headerblock is None:
1153 if headerblock is None:
1154 indebug(self.ui, 'no part found during interruption.')
1154 indebug(self.ui, 'no part found during interruption.')
1155 return
1155 return
1156 part = unbundlepart(self.ui, headerblock, self._fp)
1156 part = unbundlepart(self.ui, headerblock, self._fp)
1157 op = interruptoperation(self.ui)
1157 op = interruptoperation(self.ui)
1158 hardabort = False
1158 hardabort = False
1159 try:
1159 try:
1160 _processpart(op, part)
1160 _processpart(op, part)
1161 except (SystemExit, KeyboardInterrupt):
1161 except (SystemExit, KeyboardInterrupt):
1162 hardabort = True
1162 hardabort = True
1163 raise
1163 raise
1164 finally:
1164 finally:
1165 if not hardabort:
1165 if not hardabort:
1166 part.seek(0, 2)
1166 part.seek(0, 2)
1167 self.ui.debug('bundle2-input-stream-interrupt:'
1167 self.ui.debug('bundle2-input-stream-interrupt:'
1168 ' closing out of band context\n')
1168 ' closing out of band context\n')
1169
1169
1170 class interruptoperation(object):
1170 class interruptoperation(object):
1171 """A limited operation to be use by part handler during interruption
1171 """A limited operation to be use by part handler during interruption
1172
1172
1173 It only have access to an ui object.
1173 It only have access to an ui object.
1174 """
1174 """
1175
1175
1176 def __init__(self, ui):
1176 def __init__(self, ui):
1177 self.ui = ui
1177 self.ui = ui
1178 self.reply = None
1178 self.reply = None
1179 self.captureoutput = False
1179 self.captureoutput = False
1180
1180
1181 @property
1181 @property
1182 def repo(self):
1182 def repo(self):
1183 raise error.ProgrammingError('no repo access from stream interruption')
1183 raise error.ProgrammingError('no repo access from stream interruption')
1184
1184
1185 def gettransaction(self):
1185 def gettransaction(self):
1186 raise TransactionUnavailable('no repo access from stream interruption')
1186 raise TransactionUnavailable('no repo access from stream interruption')
1187
1187
1188 class unbundlepart(unpackermixin):
1188 class unbundlepart(unpackermixin):
1189 """a bundle part read from a bundle"""
1189 """a bundle part read from a bundle"""
1190
1190
1191 def __init__(self, ui, header, fp):
1191 def __init__(self, ui, header, fp):
1192 super(unbundlepart, self).__init__(fp)
1192 super(unbundlepart, self).__init__(fp)
1193 self._seekable = (util.safehasattr(fp, 'seek') and
1193 self._seekable = (util.safehasattr(fp, 'seek') and
1194 util.safehasattr(fp, 'tell'))
1194 util.safehasattr(fp, 'tell'))
1195 self.ui = ui
1195 self.ui = ui
1196 # unbundle state attr
1196 # unbundle state attr
1197 self._headerdata = header
1197 self._headerdata = header
1198 self._headeroffset = 0
1198 self._headeroffset = 0
1199 self._initialized = False
1199 self._initialized = False
1200 self.consumed = False
1200 self.consumed = False
1201 # part data
1201 # part data
1202 self.id = None
1202 self.id = None
1203 self.type = None
1203 self.type = None
1204 self.mandatoryparams = None
1204 self.mandatoryparams = None
1205 self.advisoryparams = None
1205 self.advisoryparams = None
1206 self.params = None
1206 self.params = None
1207 self.mandatorykeys = ()
1207 self.mandatorykeys = ()
1208 self._payloadstream = None
1208 self._payloadstream = None
1209 self._readheader()
1209 self._readheader()
1210 self._mandatory = None
1210 self._mandatory = None
1211 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1211 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1212 self._pos = 0
1212 self._pos = 0
1213
1213
1214 def _fromheader(self, size):
1214 def _fromheader(self, size):
1215 """return the next <size> byte from the header"""
1215 """return the next <size> byte from the header"""
1216 offset = self._headeroffset
1216 offset = self._headeroffset
1217 data = self._headerdata[offset:(offset + size)]
1217 data = self._headerdata[offset:(offset + size)]
1218 self._headeroffset = offset + size
1218 self._headeroffset = offset + size
1219 return data
1219 return data
1220
1220
1221 def _unpackheader(self, format):
1221 def _unpackheader(self, format):
1222 """read given format from header
1222 """read given format from header
1223
1223
1224 This automatically compute the size of the format to read."""
1224 This automatically compute the size of the format to read."""
1225 data = self._fromheader(struct.calcsize(format))
1225 data = self._fromheader(struct.calcsize(format))
1226 return _unpack(format, data)
1226 return _unpack(format, data)
1227
1227
1228 def _initparams(self, mandatoryparams, advisoryparams):
1228 def _initparams(self, mandatoryparams, advisoryparams):
1229 """internal function to setup all logic related parameters"""
1229 """internal function to setup all logic related parameters"""
1230 # make it read only to prevent people touching it by mistake.
1230 # make it read only to prevent people touching it by mistake.
1231 self.mandatoryparams = tuple(mandatoryparams)
1231 self.mandatoryparams = tuple(mandatoryparams)
1232 self.advisoryparams = tuple(advisoryparams)
1232 self.advisoryparams = tuple(advisoryparams)
1233 # user friendly UI
1233 # user friendly UI
1234 self.params = util.sortdict(self.mandatoryparams)
1234 self.params = util.sortdict(self.mandatoryparams)
1235 self.params.update(self.advisoryparams)
1235 self.params.update(self.advisoryparams)
1236 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1236 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1237
1237
1238 def _payloadchunks(self, chunknum=0):
1238 def _payloadchunks(self, chunknum=0):
1239 '''seek to specified chunk and start yielding data'''
1239 '''seek to specified chunk and start yielding data'''
1240 if len(self._chunkindex) == 0:
1240 if len(self._chunkindex) == 0:
1241 assert chunknum == 0, 'Must start with chunk 0'
1241 assert chunknum == 0, 'Must start with chunk 0'
1242 self._chunkindex.append((0, self._tellfp()))
1242 self._chunkindex.append((0, self._tellfp()))
1243 else:
1243 else:
1244 assert chunknum < len(self._chunkindex), \
1244 assert chunknum < len(self._chunkindex), \
1245 'Unknown chunk %d' % chunknum
1245 'Unknown chunk %d' % chunknum
1246 self._seekfp(self._chunkindex[chunknum][1])
1246 self._seekfp(self._chunkindex[chunknum][1])
1247
1247
1248 pos = self._chunkindex[chunknum][0]
1248 pos = self._chunkindex[chunknum][0]
1249 payloadsize = self._unpack(_fpayloadsize)[0]
1249 payloadsize = self._unpack(_fpayloadsize)[0]
1250 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1250 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1251 while payloadsize:
1251 while payloadsize:
1252 if payloadsize == flaginterrupt:
1252 if payloadsize == flaginterrupt:
1253 # interruption detection, the handler will now read a
1253 # interruption detection, the handler will now read a
1254 # single part and process it.
1254 # single part and process it.
1255 interrupthandler(self.ui, self._fp)()
1255 interrupthandler(self.ui, self._fp)()
1256 elif payloadsize < 0:
1256 elif payloadsize < 0:
1257 msg = 'negative payload chunk size: %i' % payloadsize
1257 msg = 'negative payload chunk size: %i' % payloadsize
1258 raise error.BundleValueError(msg)
1258 raise error.BundleValueError(msg)
1259 else:
1259 else:
1260 result = self._readexact(payloadsize)
1260 result = self._readexact(payloadsize)
1261 chunknum += 1
1261 chunknum += 1
1262 pos += payloadsize
1262 pos += payloadsize
1263 if chunknum == len(self._chunkindex):
1263 if chunknum == len(self._chunkindex):
1264 self._chunkindex.append((pos, self._tellfp()))
1264 self._chunkindex.append((pos, self._tellfp()))
1265 yield result
1265 yield result
1266 payloadsize = self._unpack(_fpayloadsize)[0]
1266 payloadsize = self._unpack(_fpayloadsize)[0]
1267 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1267 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1268
1268
1269 def _findchunk(self, pos):
1269 def _findchunk(self, pos):
1270 '''for a given payload position, return a chunk number and offset'''
1270 '''for a given payload position, return a chunk number and offset'''
1271 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1271 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1272 if ppos == pos:
1272 if ppos == pos:
1273 return chunk, 0
1273 return chunk, 0
1274 elif ppos > pos:
1274 elif ppos > pos:
1275 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1275 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1276 raise ValueError('Unknown chunk')
1276 raise ValueError('Unknown chunk')
1277
1277
1278 def _readheader(self):
1278 def _readheader(self):
1279 """read the header and setup the object"""
1279 """read the header and setup the object"""
1280 typesize = self._unpackheader(_fparttypesize)[0]
1280 typesize = self._unpackheader(_fparttypesize)[0]
1281 self.type = self._fromheader(typesize)
1281 self.type = self._fromheader(typesize)
1282 indebug(self.ui, 'part type: "%s"' % self.type)
1282 indebug(self.ui, 'part type: "%s"' % self.type)
1283 self.id = self._unpackheader(_fpartid)[0]
1283 self.id = self._unpackheader(_fpartid)[0]
1284 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1284 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1285 # extract mandatory bit from type
1285 # extract mandatory bit from type
1286 self.mandatory = (self.type != self.type.lower())
1286 self.mandatory = (self.type != self.type.lower())
1287 self.type = self.type.lower()
1287 self.type = self.type.lower()
1288 ## reading parameters
1288 ## reading parameters
1289 # param count
1289 # param count
1290 mancount, advcount = self._unpackheader(_fpartparamcount)
1290 mancount, advcount = self._unpackheader(_fpartparamcount)
1291 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1291 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1292 # param size
1292 # param size
1293 fparamsizes = _makefpartparamsizes(mancount + advcount)
1293 fparamsizes = _makefpartparamsizes(mancount + advcount)
1294 paramsizes = self._unpackheader(fparamsizes)
1294 paramsizes = self._unpackheader(fparamsizes)
1295 # make it a list of couple again
1295 # make it a list of couple again
1296 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1296 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1297 # split mandatory from advisory
1297 # split mandatory from advisory
1298 mansizes = paramsizes[:mancount]
1298 mansizes = paramsizes[:mancount]
1299 advsizes = paramsizes[mancount:]
1299 advsizes = paramsizes[mancount:]
1300 # retrieve param value
1300 # retrieve param value
1301 manparams = []
1301 manparams = []
1302 for key, value in mansizes:
1302 for key, value in mansizes:
1303 manparams.append((self._fromheader(key), self._fromheader(value)))
1303 manparams.append((self._fromheader(key), self._fromheader(value)))
1304 advparams = []
1304 advparams = []
1305 for key, value in advsizes:
1305 for key, value in advsizes:
1306 advparams.append((self._fromheader(key), self._fromheader(value)))
1306 advparams.append((self._fromheader(key), self._fromheader(value)))
1307 self._initparams(manparams, advparams)
1307 self._initparams(manparams, advparams)
1308 ## part payload
1308 ## part payload
1309 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1309 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1310 # we read the data, tell it
1310 # we read the data, tell it
1311 self._initialized = True
1311 self._initialized = True
1312
1312
1313 def read(self, size=None):
1313 def read(self, size=None):
1314 """read payload data"""
1314 """read payload data"""
1315 if not self._initialized:
1315 if not self._initialized:
1316 self._readheader()
1316 self._readheader()
1317 if size is None:
1317 if size is None:
1318 data = self._payloadstream.read()
1318 data = self._payloadstream.read()
1319 else:
1319 else:
1320 data = self._payloadstream.read(size)
1320 data = self._payloadstream.read(size)
1321 self._pos += len(data)
1321 self._pos += len(data)
1322 if size is None or len(data) < size:
1322 if size is None or len(data) < size:
1323 if not self.consumed and self._pos:
1323 if not self.consumed and self._pos:
1324 self.ui.debug('bundle2-input-part: total payload size %i\n'
1324 self.ui.debug('bundle2-input-part: total payload size %i\n'
1325 % self._pos)
1325 % self._pos)
1326 self.consumed = True
1326 self.consumed = True
1327 return data
1327 return data
1328
1328
1329 def tell(self):
1329 def tell(self):
1330 return self._pos
1330 return self._pos
1331
1331
1332 def seek(self, offset, whence=0):
1332 def seek(self, offset, whence=0):
1333 if whence == 0:
1333 if whence == 0:
1334 newpos = offset
1334 newpos = offset
1335 elif whence == 1:
1335 elif whence == 1:
1336 newpos = self._pos + offset
1336 newpos = self._pos + offset
1337 elif whence == 2:
1337 elif whence == 2:
1338 if not self.consumed:
1338 if not self.consumed:
1339 self.read()
1339 self.read()
1340 newpos = self._chunkindex[-1][0] - offset
1340 newpos = self._chunkindex[-1][0] - offset
1341 else:
1341 else:
1342 raise ValueError('Unknown whence value: %r' % (whence,))
1342 raise ValueError('Unknown whence value: %r' % (whence,))
1343
1343
1344 if newpos > self._chunkindex[-1][0] and not self.consumed:
1344 if newpos > self._chunkindex[-1][0] and not self.consumed:
1345 self.read()
1345 self.read()
1346 if not 0 <= newpos <= self._chunkindex[-1][0]:
1346 if not 0 <= newpos <= self._chunkindex[-1][0]:
1347 raise ValueError('Offset out of range')
1347 raise ValueError('Offset out of range')
1348
1348
1349 if self._pos != newpos:
1349 if self._pos != newpos:
1350 chunk, internaloffset = self._findchunk(newpos)
1350 chunk, internaloffset = self._findchunk(newpos)
1351 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1351 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1352 adjust = self.read(internaloffset)
1352 adjust = self.read(internaloffset)
1353 if len(adjust) != internaloffset:
1353 if len(adjust) != internaloffset:
1354 raise error.Abort(_('Seek failed\n'))
1354 raise error.Abort(_('Seek failed\n'))
1355 self._pos = newpos
1355 self._pos = newpos
1356
1356
1357 def _seekfp(self, offset, whence=0):
1357 def _seekfp(self, offset, whence=0):
1358 """move the underlying file pointer
1358 """move the underlying file pointer
1359
1359
1360 This method is meant for internal usage by the bundle2 protocol only.
1360 This method is meant for internal usage by the bundle2 protocol only.
1361 They directly manipulate the low level stream including bundle2 level
1361 They directly manipulate the low level stream including bundle2 level
1362 instruction.
1362 instruction.
1363
1363
1364 Do not use it to implement higher-level logic or methods."""
1364 Do not use it to implement higher-level logic or methods."""
1365 if self._seekable:
1365 if self._seekable:
1366 return self._fp.seek(offset, whence)
1366 return self._fp.seek(offset, whence)
1367 else:
1367 else:
1368 raise NotImplementedError(_('File pointer is not seekable'))
1368 raise NotImplementedError(_('File pointer is not seekable'))
1369
1369
1370 def _tellfp(self):
1370 def _tellfp(self):
1371 """return the file offset, or None if file is not seekable
1371 """return the file offset, or None if file is not seekable
1372
1372
1373 This method is meant for internal usage by the bundle2 protocol only.
1373 This method is meant for internal usage by the bundle2 protocol only.
1374 They directly manipulate the low level stream including bundle2 level
1374 They directly manipulate the low level stream including bundle2 level
1375 instruction.
1375 instruction.
1376
1376
1377 Do not use it to implement higher-level logic or methods."""
1377 Do not use it to implement higher-level logic or methods."""
1378 if self._seekable:
1378 if self._seekable:
1379 try:
1379 try:
1380 return self._fp.tell()
1380 return self._fp.tell()
1381 except IOError as e:
1381 except IOError as e:
1382 if e.errno == errno.ESPIPE:
1382 if e.errno == errno.ESPIPE:
1383 self._seekable = False
1383 self._seekable = False
1384 else:
1384 else:
1385 raise
1385 raise
1386 return None
1386 return None
1387
1387
1388 # These are only the static capabilities.
1388 # These are only the static capabilities.
1389 # Check the 'getrepocaps' function for the rest.
1389 # Check the 'getrepocaps' function for the rest.
1390 capabilities = {'HG20': (),
1390 capabilities = {'HG20': (),
1391 'error': ('abort', 'unsupportedcontent', 'pushraced',
1391 'error': ('abort', 'unsupportedcontent', 'pushraced',
1392 'pushkey'),
1392 'pushkey'),
1393 'listkeys': (),
1393 'listkeys': (),
1394 'pushkey': (),
1394 'pushkey': (),
1395 'digests': tuple(sorted(util.DIGESTS.keys())),
1395 'digests': tuple(sorted(util.DIGESTS.keys())),
1396 'remote-changegroup': ('http', 'https'),
1396 'remote-changegroup': ('http', 'https'),
1397 'hgtagsfnodes': (),
1397 'hgtagsfnodes': (),
1398 }
1398 }
1399
1399
1400 def getrepocaps(repo, allowpushback=False):
1400 def getrepocaps(repo, allowpushback=False):
1401 """return the bundle2 capabilities for a given repo
1401 """return the bundle2 capabilities for a given repo
1402
1402
1403 Exists to allow extensions (like evolution) to mutate the capabilities.
1403 Exists to allow extensions (like evolution) to mutate the capabilities.
1404 """
1404 """
1405 caps = capabilities.copy()
1405 caps = capabilities.copy()
1406 caps['changegroup'] = tuple(sorted(
1406 caps['changegroup'] = tuple(sorted(
1407 changegroup.supportedincomingversions(repo)))
1407 changegroup.supportedincomingversions(repo)))
1408 if obsolete.isenabled(repo, obsolete.exchangeopt):
1408 if obsolete.isenabled(repo, obsolete.exchangeopt):
1409 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1409 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1410 caps['obsmarkers'] = supportedformat
1410 caps['obsmarkers'] = supportedformat
1411 if allowpushback:
1411 if allowpushback:
1412 caps['pushback'] = ()
1412 caps['pushback'] = ()
1413 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1413 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1414 if cpmode == 'check-related':
1414 if cpmode == 'check-related':
1415 caps['checkheads'] = ('related',)
1415 caps['checkheads'] = ('related',)
1416 return caps
1416 return caps
1417
1417
1418 def bundle2caps(remote):
1418 def bundle2caps(remote):
1419 """return the bundle capabilities of a peer as dict"""
1419 """return the bundle capabilities of a peer as dict"""
1420 raw = remote.capable('bundle2')
1420 raw = remote.capable('bundle2')
1421 if not raw and raw != '':
1421 if not raw and raw != '':
1422 return {}
1422 return {}
1423 capsblob = urlreq.unquote(remote.capable('bundle2'))
1423 capsblob = urlreq.unquote(remote.capable('bundle2'))
1424 return decodecaps(capsblob)
1424 return decodecaps(capsblob)
1425
1425
1426 def obsmarkersversion(caps):
1426 def obsmarkersversion(caps):
1427 """extract the list of supported obsmarkers versions from a bundle2caps dict
1427 """extract the list of supported obsmarkers versions from a bundle2caps dict
1428 """
1428 """
1429 obscaps = caps.get('obsmarkers', ())
1429 obscaps = caps.get('obsmarkers', ())
1430 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1430 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1431
1431
1432 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1432 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1433 vfs=None, compression=None, compopts=None):
1433 vfs=None, compression=None, compopts=None):
1434 if bundletype.startswith('HG10'):
1434 if bundletype.startswith('HG10'):
1435 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1435 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1436 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1436 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1437 compression=compression, compopts=compopts)
1437 compression=compression, compopts=compopts)
1438 elif not bundletype.startswith('HG20'):
1438 elif not bundletype.startswith('HG20'):
1439 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1439 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1440
1440
1441 caps = {}
1441 caps = {}
1442 if 'obsolescence' in opts:
1442 if 'obsolescence' in opts:
1443 caps['obsmarkers'] = ('V1',)
1443 caps['obsmarkers'] = ('V1',)
1444 bundle = bundle20(ui, caps)
1444 bundle = bundle20(ui, caps)
1445 bundle.setcompression(compression, compopts)
1445 bundle.setcompression(compression, compopts)
1446 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1446 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1447 chunkiter = bundle.getchunks()
1447 chunkiter = bundle.getchunks()
1448
1448
1449 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1449 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1450
1450
1451 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1451 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1452 # We should eventually reconcile this logic with the one behind
1452 # We should eventually reconcile this logic with the one behind
1453 # 'exchange.getbundle2partsgenerator'.
1453 # 'exchange.getbundle2partsgenerator'.
1454 #
1454 #
1455 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1455 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1456 # different right now. So we keep them separated for now for the sake of
1456 # different right now. So we keep them separated for now for the sake of
1457 # simplicity.
1457 # simplicity.
1458
1458
1459 # we always want a changegroup in such bundle
1459 # we always want a changegroup in such bundle
1460 cgversion = opts.get('cg.version')
1460 cgversion = opts.get('cg.version')
1461 if cgversion is None:
1461 if cgversion is None:
1462 cgversion = changegroup.safeversion(repo)
1462 cgversion = changegroup.safeversion(repo)
1463 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1463 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1464 part = bundler.newpart('changegroup', data=cg.getchunks())
1464 part = bundler.newpart('changegroup', data=cg.getchunks())
1465 part.addparam('version', cg.version)
1465 part.addparam('version', cg.version)
1466 if 'clcount' in cg.extras:
1466 if 'clcount' in cg.extras:
1467 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1467 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1468 mandatory=False)
1468 mandatory=False)
1469 if opts.get('phases') and repo.revs('%ln and secret()',
1469 if opts.get('phases') and repo.revs('%ln and secret()',
1470 outgoing.missingheads):
1470 outgoing.missingheads):
1471 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1471 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1472
1472
1473 addparttagsfnodescache(repo, bundler, outgoing)
1473 addparttagsfnodescache(repo, bundler, outgoing)
1474
1474
1475 if opts.get('obsolescence', False):
1475 if opts.get('obsolescence', False):
1476 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1476 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1477 buildobsmarkerspart(bundler, obsmarkers)
1477 buildobsmarkerspart(bundler, obsmarkers)
1478
1478
1479 if opts.get('phases', False):
1479 if opts.get('phases', False):
1480 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1480 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1481 phasedata = phases.binaryencode(headsbyphase)
1481 phasedata = phases.binaryencode(headsbyphase)
1482 bundler.newpart('phase-heads', data=phasedata)
1482 bundler.newpart('phase-heads', data=phasedata)
1483
1483
1484 def addparttagsfnodescache(repo, bundler, outgoing):
1484 def addparttagsfnodescache(repo, bundler, outgoing):
1485 # we include the tags fnode cache for the bundle changeset
1485 # we include the tags fnode cache for the bundle changeset
1486 # (as an optional parts)
1486 # (as an optional parts)
1487 cache = tags.hgtagsfnodescache(repo.unfiltered())
1487 cache = tags.hgtagsfnodescache(repo.unfiltered())
1488 chunks = []
1488 chunks = []
1489
1489
1490 # .hgtags fnodes are only relevant for head changesets. While we could
1490 # .hgtags fnodes are only relevant for head changesets. While we could
1491 # transfer values for all known nodes, there will likely be little to
1491 # transfer values for all known nodes, there will likely be little to
1492 # no benefit.
1492 # no benefit.
1493 #
1493 #
1494 # We don't bother using a generator to produce output data because
1494 # We don't bother using a generator to produce output data because
1495 # a) we only have 40 bytes per head and even esoteric numbers of heads
1495 # a) we only have 40 bytes per head and even esoteric numbers of heads
1496 # consume little memory (1M heads is 40MB) b) we don't want to send the
1496 # consume little memory (1M heads is 40MB) b) we don't want to send the
1497 # part if we don't have entries and knowing if we have entries requires
1497 # part if we don't have entries and knowing if we have entries requires
1498 # cache lookups.
1498 # cache lookups.
1499 for node in outgoing.missingheads:
1499 for node in outgoing.missingheads:
1500 # Don't compute missing, as this may slow down serving.
1500 # Don't compute missing, as this may slow down serving.
1501 fnode = cache.getfnode(node, computemissing=False)
1501 fnode = cache.getfnode(node, computemissing=False)
1502 if fnode is not None:
1502 if fnode is not None:
1503 chunks.extend([node, fnode])
1503 chunks.extend([node, fnode])
1504
1504
1505 if chunks:
1505 if chunks:
1506 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1506 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1507
1507
1508 def buildobsmarkerspart(bundler, markers):
1508 def buildobsmarkerspart(bundler, markers):
1509 """add an obsmarker part to the bundler with <markers>
1509 """add an obsmarker part to the bundler with <markers>
1510
1510
1511 No part is created if markers is empty.
1511 No part is created if markers is empty.
1512 Raises ValueError if the bundler doesn't support any known obsmarker format.
1512 Raises ValueError if the bundler doesn't support any known obsmarker format.
1513 """
1513 """
1514 if not markers:
1514 if not markers:
1515 return None
1515 return None
1516
1516
1517 remoteversions = obsmarkersversion(bundler.capabilities)
1517 remoteversions = obsmarkersversion(bundler.capabilities)
1518 version = obsolete.commonversion(remoteversions)
1518 version = obsolete.commonversion(remoteversions)
1519 if version is None:
1519 if version is None:
1520 raise ValueError('bundler does not support common obsmarker format')
1520 raise ValueError('bundler does not support common obsmarker format')
1521 stream = obsolete.encodemarkers(markers, True, version=version)
1521 stream = obsolete.encodemarkers(markers, True, version=version)
1522 return bundler.newpart('obsmarkers', data=stream)
1522 return bundler.newpart('obsmarkers', data=stream)
1523
1523
1524 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1524 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1525 compopts=None):
1525 compopts=None):
1526 """Write a bundle file and return its filename.
1526 """Write a bundle file and return its filename.
1527
1527
1528 Existing files will not be overwritten.
1528 Existing files will not be overwritten.
1529 If no filename is specified, a temporary file is created.
1529 If no filename is specified, a temporary file is created.
1530 bz2 compression can be turned off.
1530 bz2 compression can be turned off.
1531 The bundle file will be deleted in case of errors.
1531 The bundle file will be deleted in case of errors.
1532 """
1532 """
1533
1533
1534 if bundletype == "HG20":
1534 if bundletype == "HG20":
1535 bundle = bundle20(ui)
1535 bundle = bundle20(ui)
1536 bundle.setcompression(compression, compopts)
1536 bundle.setcompression(compression, compopts)
1537 part = bundle.newpart('changegroup', data=cg.getchunks())
1537 part = bundle.newpart('changegroup', data=cg.getchunks())
1538 part.addparam('version', cg.version)
1538 part.addparam('version', cg.version)
1539 if 'clcount' in cg.extras:
1539 if 'clcount' in cg.extras:
1540 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1540 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1541 mandatory=False)
1541 mandatory=False)
1542 chunkiter = bundle.getchunks()
1542 chunkiter = bundle.getchunks()
1543 else:
1543 else:
1544 # compression argument is only for the bundle2 case
1544 # compression argument is only for the bundle2 case
1545 assert compression is None
1545 assert compression is None
1546 if cg.version != '01':
1546 if cg.version != '01':
1547 raise error.Abort(_('old bundle types only supports v1 '
1547 raise error.Abort(_('old bundle types only supports v1 '
1548 'changegroups'))
1548 'changegroups'))
1549 header, comp = bundletypes[bundletype]
1549 header, comp = bundletypes[bundletype]
1550 if comp not in util.compengines.supportedbundletypes:
1550 if comp not in util.compengines.supportedbundletypes:
1551 raise error.Abort(_('unknown stream compression type: %s')
1551 raise error.Abort(_('unknown stream compression type: %s')
1552 % comp)
1552 % comp)
1553 compengine = util.compengines.forbundletype(comp)
1553 compengine = util.compengines.forbundletype(comp)
1554 def chunkiter():
1554 def chunkiter():
1555 yield header
1555 yield header
1556 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1556 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1557 yield chunk
1557 yield chunk
1558 chunkiter = chunkiter()
1558 chunkiter = chunkiter()
1559
1559
1560 # parse the changegroup data, otherwise we will block
1560 # parse the changegroup data, otherwise we will block
1561 # in case of sshrepo because we don't know the end of the stream
1561 # in case of sshrepo because we don't know the end of the stream
1562 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1562 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1563
1563
1564 def combinechangegroupresults(op):
1564 def combinechangegroupresults(op):
1565 """logic to combine 0 or more addchangegroup results into one"""
1565 """logic to combine 0 or more addchangegroup results into one"""
1566 results = [r.get('return', 0)
1566 results = [r.get('return', 0)
1567 for r in op.records['changegroup']]
1567 for r in op.records['changegroup']]
1568 changedheads = 0
1568 changedheads = 0
1569 result = 1
1569 result = 1
1570 for ret in results:
1570 for ret in results:
1571 # If any changegroup result is 0, return 0
1571 # If any changegroup result is 0, return 0
1572 if ret == 0:
1572 if ret == 0:
1573 result = 0
1573 result = 0
1574 break
1574 break
1575 if ret < -1:
1575 if ret < -1:
1576 changedheads += ret + 1
1576 changedheads += ret + 1
1577 elif ret > 1:
1577 elif ret > 1:
1578 changedheads += ret - 1
1578 changedheads += ret - 1
1579 if changedheads > 0:
1579 if changedheads > 0:
1580 result = 1 + changedheads
1580 result = 1 + changedheads
1581 elif changedheads < 0:
1581 elif changedheads < 0:
1582 result = -1 + changedheads
1582 result = -1 + changedheads
1583 return result
1583 return result
1584
1584
1585 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1585 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1586 'targetphase'))
1586 'targetphase'))
1587 def handlechangegroup(op, inpart):
1587 def handlechangegroup(op, inpart):
1588 """apply a changegroup part on the repo
1588 """apply a changegroup part on the repo
1589
1589
1590 This is a very early implementation that will massive rework before being
1590 This is a very early implementation that will massive rework before being
1591 inflicted to any end-user.
1591 inflicted to any end-user.
1592 """
1592 """
1593 tr = op.gettransaction()
1593 tr = op.gettransaction()
1594 unpackerversion = inpart.params.get('version', '01')
1594 unpackerversion = inpart.params.get('version', '01')
1595 # We should raise an appropriate exception here
1595 # We should raise an appropriate exception here
1596 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1596 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1597 # the source and url passed here are overwritten by the one contained in
1597 # the source and url passed here are overwritten by the one contained in
1598 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1598 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1599 nbchangesets = None
1599 nbchangesets = None
1600 if 'nbchanges' in inpart.params:
1600 if 'nbchanges' in inpart.params:
1601 nbchangesets = int(inpart.params.get('nbchanges'))
1601 nbchangesets = int(inpart.params.get('nbchanges'))
1602 if ('treemanifest' in inpart.params and
1602 if ('treemanifest' in inpart.params and
1603 'treemanifest' not in op.repo.requirements):
1603 'treemanifest' not in op.repo.requirements):
1604 if len(op.repo.changelog) != 0:
1604 if len(op.repo.changelog) != 0:
1605 raise error.Abort(_(
1605 raise error.Abort(_(
1606 "bundle contains tree manifests, but local repo is "
1606 "bundle contains tree manifests, but local repo is "
1607 "non-empty and does not use tree manifests"))
1607 "non-empty and does not use tree manifests"))
1608 op.repo.requirements.add('treemanifest')
1608 op.repo.requirements.add('treemanifest')
1609 op.repo._applyopenerreqs()
1609 op.repo._applyopenerreqs()
1610 op.repo._writerequirements()
1610 op.repo._writerequirements()
1611 extrakwargs = {}
1611 extrakwargs = {}
1612 targetphase = inpart.params.get('targetphase')
1612 targetphase = inpart.params.get('targetphase')
1613 if targetphase is not None:
1613 if targetphase is not None:
1614 extrakwargs['targetphase'] = int(targetphase)
1614 extrakwargs['targetphase'] = int(targetphase)
1615 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1615 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1616 expectedtotal=nbchangesets, **extrakwargs)
1616 expectedtotal=nbchangesets, **extrakwargs)
1617 if op.reply is not None:
1617 if op.reply is not None:
1618 # This is definitely not the final form of this
1618 # This is definitely not the final form of this
1619 # return. But one need to start somewhere.
1619 # return. But one need to start somewhere.
1620 part = op.reply.newpart('reply:changegroup', mandatory=False)
1620 part = op.reply.newpart('reply:changegroup', mandatory=False)
1621 part.addparam(
1621 part.addparam(
1622 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1622 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1623 part.addparam('return', '%i' % ret, mandatory=False)
1623 part.addparam('return', '%i' % ret, mandatory=False)
1624 assert not inpart.read()
1624 assert not inpart.read()
1625
1625
1626 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1626 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1627 ['digest:%s' % k for k in util.DIGESTS.keys()])
1627 ['digest:%s' % k for k in util.DIGESTS.keys()])
1628 @parthandler('remote-changegroup', _remotechangegroupparams)
1628 @parthandler('remote-changegroup', _remotechangegroupparams)
1629 def handleremotechangegroup(op, inpart):
1629 def handleremotechangegroup(op, inpart):
1630 """apply a bundle10 on the repo, given an url and validation information
1630 """apply a bundle10 on the repo, given an url and validation information
1631
1631
1632 All the information about the remote bundle to import are given as
1632 All the information about the remote bundle to import are given as
1633 parameters. The parameters include:
1633 parameters. The parameters include:
1634 - url: the url to the bundle10.
1634 - url: the url to the bundle10.
1635 - size: the bundle10 file size. It is used to validate what was
1635 - size: the bundle10 file size. It is used to validate what was
1636 retrieved by the client matches the server knowledge about the bundle.
1636 retrieved by the client matches the server knowledge about the bundle.
1637 - digests: a space separated list of the digest types provided as
1637 - digests: a space separated list of the digest types provided as
1638 parameters.
1638 parameters.
1639 - digest:<digest-type>: the hexadecimal representation of the digest with
1639 - digest:<digest-type>: the hexadecimal representation of the digest with
1640 that name. Like the size, it is used to validate what was retrieved by
1640 that name. Like the size, it is used to validate what was retrieved by
1641 the client matches what the server knows about the bundle.
1641 the client matches what the server knows about the bundle.
1642
1642
1643 When multiple digest types are given, all of them are checked.
1643 When multiple digest types are given, all of them are checked.
1644 """
1644 """
1645 try:
1645 try:
1646 raw_url = inpart.params['url']
1646 raw_url = inpart.params['url']
1647 except KeyError:
1647 except KeyError:
1648 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1648 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1649 parsed_url = util.url(raw_url)
1649 parsed_url = util.url(raw_url)
1650 if parsed_url.scheme not in capabilities['remote-changegroup']:
1650 if parsed_url.scheme not in capabilities['remote-changegroup']:
1651 raise error.Abort(_('remote-changegroup does not support %s urls') %
1651 raise error.Abort(_('remote-changegroup does not support %s urls') %
1652 parsed_url.scheme)
1652 parsed_url.scheme)
1653
1653
1654 try:
1654 try:
1655 size = int(inpart.params['size'])
1655 size = int(inpart.params['size'])
1656 except ValueError:
1656 except ValueError:
1657 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1657 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1658 % 'size')
1658 % 'size')
1659 except KeyError:
1659 except KeyError:
1660 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1660 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1661
1661
1662 digests = {}
1662 digests = {}
1663 for typ in inpart.params.get('digests', '').split():
1663 for typ in inpart.params.get('digests', '').split():
1664 param = 'digest:%s' % typ
1664 param = 'digest:%s' % typ
1665 try:
1665 try:
1666 value = inpart.params[param]
1666 value = inpart.params[param]
1667 except KeyError:
1667 except KeyError:
1668 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1668 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1669 param)
1669 param)
1670 digests[typ] = value
1670 digests[typ] = value
1671
1671
1672 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1672 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1673
1673
1674 tr = op.gettransaction()
1674 tr = op.gettransaction()
1675 from . import exchange
1675 from . import exchange
1676 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1676 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1677 if not isinstance(cg, changegroup.cg1unpacker):
1677 if not isinstance(cg, changegroup.cg1unpacker):
1678 raise error.Abort(_('%s: not a bundle version 1.0') %
1678 raise error.Abort(_('%s: not a bundle version 1.0') %
1679 util.hidepassword(raw_url))
1679 util.hidepassword(raw_url))
1680 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1680 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1681 if op.reply is not None:
1681 if op.reply is not None:
1682 # This is definitely not the final form of this
1682 # This is definitely not the final form of this
1683 # return. But one need to start somewhere.
1683 # return. But one need to start somewhere.
1684 part = op.reply.newpart('reply:changegroup')
1684 part = op.reply.newpart('reply:changegroup')
1685 part.addparam(
1685 part.addparam(
1686 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1686 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1687 part.addparam('return', '%i' % ret, mandatory=False)
1687 part.addparam('return', '%i' % ret, mandatory=False)
1688 try:
1688 try:
1689 real_part.validate()
1689 real_part.validate()
1690 except error.Abort as e:
1690 except error.Abort as e:
1691 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1691 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1692 (util.hidepassword(raw_url), str(e)))
1692 (util.hidepassword(raw_url), str(e)))
1693 assert not inpart.read()
1693 assert not inpart.read()
1694
1694
1695 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1695 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1696 def handlereplychangegroup(op, inpart):
1696 def handlereplychangegroup(op, inpart):
1697 ret = int(inpart.params['return'])
1697 ret = int(inpart.params['return'])
1698 replyto = int(inpart.params['in-reply-to'])
1698 replyto = int(inpart.params['in-reply-to'])
1699 op.records.add('changegroup', {'return': ret}, replyto)
1699 op.records.add('changegroup', {'return': ret}, replyto)
1700
1700
1701 @parthandler('check:heads')
1701 @parthandler('check:heads')
1702 def handlecheckheads(op, inpart):
1702 def handlecheckheads(op, inpart):
1703 """check that head of the repo did not change
1703 """check that head of the repo did not change
1704
1704
1705 This is used to detect a push race when using unbundle.
1705 This is used to detect a push race when using unbundle.
1706 This replaces the "heads" argument of unbundle."""
1706 This replaces the "heads" argument of unbundle."""
1707 h = inpart.read(20)
1707 h = inpart.read(20)
1708 heads = []
1708 heads = []
1709 while len(h) == 20:
1709 while len(h) == 20:
1710 heads.append(h)
1710 heads.append(h)
1711 h = inpart.read(20)
1711 h = inpart.read(20)
1712 assert not h
1712 assert not h
1713 # Trigger a transaction so that we are guaranteed to have the lock now.
1713 # Trigger a transaction so that we are guaranteed to have the lock now.
1714 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1714 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1715 op.gettransaction()
1715 op.gettransaction()
1716 if sorted(heads) != sorted(op.repo.heads()):
1716 if sorted(heads) != sorted(op.repo.heads()):
1717 raise error.PushRaced('repository changed while pushing - '
1717 raise error.PushRaced('repository changed while pushing - '
1718 'please try again')
1718 'please try again')
1719
1719
1720 @parthandler('check:updated-heads')
1720 @parthandler('check:updated-heads')
1721 def handlecheckupdatedheads(op, inpart):
1721 def handlecheckupdatedheads(op, inpart):
1722 """check for race on the heads touched by a push
1722 """check for race on the heads touched by a push
1723
1723
1724 This is similar to 'check:heads' but focus on the heads actually updated
1724 This is similar to 'check:heads' but focus on the heads actually updated
1725 during the push. If other activities happen on unrelated heads, it is
1725 during the push. If other activities happen on unrelated heads, it is
1726 ignored.
1726 ignored.
1727
1727
1728 This allow server with high traffic to avoid push contention as long as
1728 This allow server with high traffic to avoid push contention as long as
1729 unrelated parts of the graph are involved."""
1729 unrelated parts of the graph are involved."""
1730 h = inpart.read(20)
1730 h = inpart.read(20)
1731 heads = []
1731 heads = []
1732 while len(h) == 20:
1732 while len(h) == 20:
1733 heads.append(h)
1733 heads.append(h)
1734 h = inpart.read(20)
1734 h = inpart.read(20)
1735 assert not h
1735 assert not h
1736 # trigger a transaction so that we are guaranteed to have the lock now.
1736 # trigger a transaction so that we are guaranteed to have the lock now.
1737 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1737 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1738 op.gettransaction()
1738 op.gettransaction()
1739
1739
1740 currentheads = set()
1740 currentheads = set()
1741 for ls in op.repo.branchmap().itervalues():
1741 for ls in op.repo.branchmap().itervalues():
1742 currentheads.update(ls)
1742 currentheads.update(ls)
1743
1743
1744 for h in heads:
1744 for h in heads:
1745 if h not in currentheads:
1745 if h not in currentheads:
1746 raise error.PushRaced('repository changed while pushing - '
1746 raise error.PushRaced('repository changed while pushing - '
1747 'please try again')
1747 'please try again')
1748
1748
1749 @parthandler('output')
1749 @parthandler('output')
1750 def handleoutput(op, inpart):
1750 def handleoutput(op, inpart):
1751 """forward output captured on the server to the client"""
1751 """forward output captured on the server to the client"""
1752 for line in inpart.read().splitlines():
1752 for line in inpart.read().splitlines():
1753 op.ui.status(_('remote: %s\n') % line)
1753 op.ui.status(_('remote: %s\n') % line)
1754
1754
1755 @parthandler('replycaps')
1755 @parthandler('replycaps')
1756 def handlereplycaps(op, inpart):
1756 def handlereplycaps(op, inpart):
1757 """Notify that a reply bundle should be created
1757 """Notify that a reply bundle should be created
1758
1758
1759 The payload contains the capabilities information for the reply"""
1759 The payload contains the capabilities information for the reply"""
1760 caps = decodecaps(inpart.read())
1760 caps = decodecaps(inpart.read())
1761 if op.reply is None:
1761 if op.reply is None:
1762 op.reply = bundle20(op.ui, caps)
1762 op.reply = bundle20(op.ui, caps)
1763
1763
1764 class AbortFromPart(error.Abort):
1764 class AbortFromPart(error.Abort):
1765 """Sub-class of Abort that denotes an error from a bundle2 part."""
1765 """Sub-class of Abort that denotes an error from a bundle2 part."""
1766
1766
1767 @parthandler('error:abort', ('message', 'hint'))
1767 @parthandler('error:abort', ('message', 'hint'))
1768 def handleerrorabort(op, inpart):
1768 def handleerrorabort(op, inpart):
1769 """Used to transmit abort error over the wire"""
1769 """Used to transmit abort error over the wire"""
1770 raise AbortFromPart(inpart.params['message'],
1770 raise AbortFromPart(inpart.params['message'],
1771 hint=inpart.params.get('hint'))
1771 hint=inpart.params.get('hint'))
1772
1772
1773 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1773 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1774 'in-reply-to'))
1774 'in-reply-to'))
1775 def handleerrorpushkey(op, inpart):
1775 def handleerrorpushkey(op, inpart):
1776 """Used to transmit failure of a mandatory pushkey over the wire"""
1776 """Used to transmit failure of a mandatory pushkey over the wire"""
1777 kwargs = {}
1777 kwargs = {}
1778 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1778 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1779 value = inpart.params.get(name)
1779 value = inpart.params.get(name)
1780 if value is not None:
1780 if value is not None:
1781 kwargs[name] = value
1781 kwargs[name] = value
1782 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1782 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1783
1783
1784 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1784 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1785 def handleerrorunsupportedcontent(op, inpart):
1785 def handleerrorunsupportedcontent(op, inpart):
1786 """Used to transmit unknown content error over the wire"""
1786 """Used to transmit unknown content error over the wire"""
1787 kwargs = {}
1787 kwargs = {}
1788 parttype = inpart.params.get('parttype')
1788 parttype = inpart.params.get('parttype')
1789 if parttype is not None:
1789 if parttype is not None:
1790 kwargs['parttype'] = parttype
1790 kwargs['parttype'] = parttype
1791 params = inpart.params.get('params')
1791 params = inpart.params.get('params')
1792 if params is not None:
1792 if params is not None:
1793 kwargs['params'] = params.split('\0')
1793 kwargs['params'] = params.split('\0')
1794
1794
1795 raise error.BundleUnknownFeatureError(**kwargs)
1795 raise error.BundleUnknownFeatureError(**kwargs)
1796
1796
1797 @parthandler('error:pushraced', ('message',))
1797 @parthandler('error:pushraced', ('message',))
1798 def handleerrorpushraced(op, inpart):
1798 def handleerrorpushraced(op, inpart):
1799 """Used to transmit push race error over the wire"""
1799 """Used to transmit push race error over the wire"""
1800 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1800 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1801
1801
1802 @parthandler('listkeys', ('namespace',))
1802 @parthandler('listkeys', ('namespace',))
1803 def handlelistkeys(op, inpart):
1803 def handlelistkeys(op, inpart):
1804 """retrieve pushkey namespace content stored in a bundle2"""
1804 """retrieve pushkey namespace content stored in a bundle2"""
1805 namespace = inpart.params['namespace']
1805 namespace = inpart.params['namespace']
1806 r = pushkey.decodekeys(inpart.read())
1806 r = pushkey.decodekeys(inpart.read())
1807 op.records.add('listkeys', (namespace, r))
1807 op.records.add('listkeys', (namespace, r))
1808
1808
1809 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1809 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1810 def handlepushkey(op, inpart):
1810 def handlepushkey(op, inpart):
1811 """process a pushkey request"""
1811 """process a pushkey request"""
1812 dec = pushkey.decode
1812 dec = pushkey.decode
1813 namespace = dec(inpart.params['namespace'])
1813 namespace = dec(inpart.params['namespace'])
1814 key = dec(inpart.params['key'])
1814 key = dec(inpart.params['key'])
1815 old = dec(inpart.params['old'])
1815 old = dec(inpart.params['old'])
1816 new = dec(inpart.params['new'])
1816 new = dec(inpart.params['new'])
1817 # Grab the transaction to ensure that we have the lock before performing the
1817 # Grab the transaction to ensure that we have the lock before performing the
1818 # pushkey.
1818 # pushkey.
1819 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1819 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1820 op.gettransaction()
1820 op.gettransaction()
1821 ret = op.repo.pushkey(namespace, key, old, new)
1821 ret = op.repo.pushkey(namespace, key, old, new)
1822 record = {'namespace': namespace,
1822 record = {'namespace': namespace,
1823 'key': key,
1823 'key': key,
1824 'old': old,
1824 'old': old,
1825 'new': new}
1825 'new': new}
1826 op.records.add('pushkey', record)
1826 op.records.add('pushkey', record)
1827 if op.reply is not None:
1827 if op.reply is not None:
1828 rpart = op.reply.newpart('reply:pushkey')
1828 rpart = op.reply.newpart('reply:pushkey')
1829 rpart.addparam(
1829 rpart.addparam(
1830 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1830 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1831 rpart.addparam('return', '%i' % ret, mandatory=False)
1831 rpart.addparam('return', '%i' % ret, mandatory=False)
1832 if inpart.mandatory and not ret:
1832 if inpart.mandatory and not ret:
1833 kwargs = {}
1833 kwargs = {}
1834 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1834 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1835 if key in inpart.params:
1835 if key in inpart.params:
1836 kwargs[key] = inpart.params[key]
1836 kwargs[key] = inpart.params[key]
1837 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1837 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1838
1838
1839 @parthandler('phase-heads')
1839 @parthandler('phase-heads')
1840 def handlephases(op, inpart):
1840 def handlephases(op, inpart):
1841 """apply phases from bundle part to repo"""
1841 """apply phases from bundle part to repo"""
1842 headsbyphase = phases.binarydecode(inpart)
1842 headsbyphase = phases.binarydecode(inpart)
1843 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1843 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
1844 op.records.add('phase-heads', {})
1844 op.records.add('phase-heads', {})
1845
1845
1846 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1846 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1847 def handlepushkeyreply(op, inpart):
1847 def handlepushkeyreply(op, inpart):
1848 """retrieve the result of a pushkey request"""
1848 """retrieve the result of a pushkey request"""
1849 ret = int(inpart.params['return'])
1849 ret = int(inpart.params['return'])
1850 partid = int(inpart.params['in-reply-to'])
1850 partid = int(inpart.params['in-reply-to'])
1851 op.records.add('pushkey', {'return': ret}, partid)
1851 op.records.add('pushkey', {'return': ret}, partid)
1852
1852
1853 @parthandler('obsmarkers')
1853 @parthandler('obsmarkers')
1854 def handleobsmarker(op, inpart):
1854 def handleobsmarker(op, inpart):
1855 """add a stream of obsmarkers to the repo"""
1855 """add a stream of obsmarkers to the repo"""
1856 tr = op.gettransaction()
1856 tr = op.gettransaction()
1857 markerdata = inpart.read()
1857 markerdata = inpart.read()
1858 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1858 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1859 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1859 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1860 % len(markerdata))
1860 % len(markerdata))
1861 # The mergemarkers call will crash if marker creation is not enabled.
1861 # The mergemarkers call will crash if marker creation is not enabled.
1862 # we want to avoid this if the part is advisory.
1862 # we want to avoid this if the part is advisory.
1863 if not inpart.mandatory and op.repo.obsstore.readonly:
1863 if not inpart.mandatory and op.repo.obsstore.readonly:
1864 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1864 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1865 return
1865 return
1866 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1866 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1867 op.repo.invalidatevolatilesets()
1867 op.repo.invalidatevolatilesets()
1868 if new:
1868 if new:
1869 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1869 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1870 op.records.add('obsmarkers', {'new': new})
1870 op.records.add('obsmarkers', {'new': new})
1871 if op.reply is not None:
1871 if op.reply is not None:
1872 rpart = op.reply.newpart('reply:obsmarkers')
1872 rpart = op.reply.newpart('reply:obsmarkers')
1873 rpart.addparam(
1873 rpart.addparam(
1874 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1874 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1875 rpart.addparam('new', '%i' % new, mandatory=False)
1875 rpart.addparam('new', '%i' % new, mandatory=False)
1876
1876
1877
1877
1878 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1878 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1879 def handleobsmarkerreply(op, inpart):
1879 def handleobsmarkerreply(op, inpart):
1880 """retrieve the result of a pushkey request"""
1880 """retrieve the result of a pushkey request"""
1881 ret = int(inpart.params['new'])
1881 ret = int(inpart.params['new'])
1882 partid = int(inpart.params['in-reply-to'])
1882 partid = int(inpart.params['in-reply-to'])
1883 op.records.add('obsmarkers', {'new': ret}, partid)
1883 op.records.add('obsmarkers', {'new': ret}, partid)
1884
1884
1885 @parthandler('hgtagsfnodes')
1885 @parthandler('hgtagsfnodes')
1886 def handlehgtagsfnodes(op, inpart):
1886 def handlehgtagsfnodes(op, inpart):
1887 """Applies .hgtags fnodes cache entries to the local repo.
1887 """Applies .hgtags fnodes cache entries to the local repo.
1888
1888
1889 Payload is pairs of 20 byte changeset nodes and filenodes.
1889 Payload is pairs of 20 byte changeset nodes and filenodes.
1890 """
1890 """
1891 # Grab the transaction so we ensure that we have the lock at this point.
1891 # Grab the transaction so we ensure that we have the lock at this point.
1892 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1892 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1893 op.gettransaction()
1893 op.gettransaction()
1894 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1894 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1895
1895
1896 count = 0
1896 count = 0
1897 while True:
1897 while True:
1898 node = inpart.read(20)
1898 node = inpart.read(20)
1899 fnode = inpart.read(20)
1899 fnode = inpart.read(20)
1900 if len(node) < 20 or len(fnode) < 20:
1900 if len(node) < 20 or len(fnode) < 20:
1901 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1901 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1902 break
1902 break
1903 cache.setfnode(node, fnode)
1903 cache.setfnode(node, fnode)
1904 count += 1
1904 count += 1
1905
1905
1906 cache.write()
1906 cache.write()
1907 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1907 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1908
1908
1909 @parthandler('pushvars')
1909 @parthandler('pushvars')
1910 def bundle2getvars(op, part):
1910 def bundle2getvars(op, part):
1911 '''unbundle a bundle2 containing shellvars on the server'''
1911 '''unbundle a bundle2 containing shellvars on the server'''
1912 # An option to disable unbundling on server-side for security reasons
1912 # An option to disable unbundling on server-side for security reasons
1913 if op.ui.configbool('push', 'pushvars.server'):
1913 if op.ui.configbool('push', 'pushvars.server'):
1914 hookargs = {}
1914 hookargs = {}
1915 for key, value in part.advisoryparams:
1915 for key, value in part.advisoryparams:
1916 key = key.upper()
1916 key = key.upper()
1917 # We want pushed variables to have USERVAR_ prepended so we know
1917 # We want pushed variables to have USERVAR_ prepended so we know
1918 # they came from the --pushvar flag.
1918 # they came from the --pushvar flag.
1919 key = "USERVAR_" + key
1919 key = "USERVAR_" + key
1920 hookargs[key] = value
1920 hookargs[key] = value
1921 op.addhookargs(hookargs)
1921 op.addhookargs(hookargs)
@@ -1,627 +1,634 b''
1 """ Mercurial phases support code
1 """ Mercurial phases support code
2
2
3 ---
3 ---
4
4
5 Copyright 2011 Pierre-Yves David <pierre-yves.david@ens-lyon.org>
5 Copyright 2011 Pierre-Yves David <pierre-yves.david@ens-lyon.org>
6 Logilab SA <contact@logilab.fr>
6 Logilab SA <contact@logilab.fr>
7 Augie Fackler <durin42@gmail.com>
7 Augie Fackler <durin42@gmail.com>
8
8
9 This software may be used and distributed according to the terms
9 This software may be used and distributed according to the terms
10 of the GNU General Public License version 2 or any later version.
10 of the GNU General Public License version 2 or any later version.
11
11
12 ---
12 ---
13
13
14 This module implements most phase logic in mercurial.
14 This module implements most phase logic in mercurial.
15
15
16
16
17 Basic Concept
17 Basic Concept
18 =============
18 =============
19
19
20 A 'changeset phase' is an indicator that tells us how a changeset is
20 A 'changeset phase' is an indicator that tells us how a changeset is
21 manipulated and communicated. The details of each phase is described
21 manipulated and communicated. The details of each phase is described
22 below, here we describe the properties they have in common.
22 below, here we describe the properties they have in common.
23
23
24 Like bookmarks, phases are not stored in history and thus are not
24 Like bookmarks, phases are not stored in history and thus are not
25 permanent and leave no audit trail.
25 permanent and leave no audit trail.
26
26
27 First, no changeset can be in two phases at once. Phases are ordered,
27 First, no changeset can be in two phases at once. Phases are ordered,
28 so they can be considered from lowest to highest. The default, lowest
28 so they can be considered from lowest to highest. The default, lowest
29 phase is 'public' - this is the normal phase of existing changesets. A
29 phase is 'public' - this is the normal phase of existing changesets. A
30 child changeset can not be in a lower phase than its parents.
30 child changeset can not be in a lower phase than its parents.
31
31
32 These phases share a hierarchy of traits:
32 These phases share a hierarchy of traits:
33
33
34 immutable shared
34 immutable shared
35 public: X X
35 public: X X
36 draft: X
36 draft: X
37 secret:
37 secret:
38
38
39 Local commits are draft by default.
39 Local commits are draft by default.
40
40
41 Phase Movement and Exchange
41 Phase Movement and Exchange
42 ===========================
42 ===========================
43
43
44 Phase data is exchanged by pushkey on pull and push. Some servers have
44 Phase data is exchanged by pushkey on pull and push. Some servers have
45 a publish option set, we call such a server a "publishing server".
45 a publish option set, we call such a server a "publishing server".
46 Pushing a draft changeset to a publishing server changes the phase to
46 Pushing a draft changeset to a publishing server changes the phase to
47 public.
47 public.
48
48
49 A small list of fact/rules define the exchange of phase:
49 A small list of fact/rules define the exchange of phase:
50
50
51 * old client never changes server states
51 * old client never changes server states
52 * pull never changes server states
52 * pull never changes server states
53 * publish and old server changesets are seen as public by client
53 * publish and old server changesets are seen as public by client
54 * any secret changeset seen in another repository is lowered to at
54 * any secret changeset seen in another repository is lowered to at
55 least draft
55 least draft
56
56
57 Here is the final table summing up the 49 possible use cases of phase
57 Here is the final table summing up the 49 possible use cases of phase
58 exchange:
58 exchange:
59
59
60 server
60 server
61 old publish non-publish
61 old publish non-publish
62 N X N D P N D P
62 N X N D P N D P
63 old client
63 old client
64 pull
64 pull
65 N - X/X - X/D X/P - X/D X/P
65 N - X/X - X/D X/P - X/D X/P
66 X - X/X - X/D X/P - X/D X/P
66 X - X/X - X/D X/P - X/D X/P
67 push
67 push
68 X X/X X/X X/P X/P X/P X/D X/D X/P
68 X X/X X/X X/P X/P X/P X/D X/D X/P
69 new client
69 new client
70 pull
70 pull
71 N - P/X - P/D P/P - D/D P/P
71 N - P/X - P/D P/P - D/D P/P
72 D - P/X - P/D P/P - D/D P/P
72 D - P/X - P/D P/P - D/D P/P
73 P - P/X - P/D P/P - P/D P/P
73 P - P/X - P/D P/P - P/D P/P
74 push
74 push
75 D P/X P/X P/P P/P P/P D/D D/D P/P
75 D P/X P/X P/P P/P P/P D/D D/D P/P
76 P P/X P/X P/P P/P P/P P/P P/P P/P
76 P P/X P/X P/P P/P P/P P/P P/P P/P
77
77
78 Legend:
78 Legend:
79
79
80 A/B = final state on client / state on server
80 A/B = final state on client / state on server
81
81
82 * N = new/not present,
82 * N = new/not present,
83 * P = public,
83 * P = public,
84 * D = draft,
84 * D = draft,
85 * X = not tracked (i.e., the old client or server has no internal
85 * X = not tracked (i.e., the old client or server has no internal
86 way of recording the phase.)
86 way of recording the phase.)
87
87
88 passive = only pushes
88 passive = only pushes
89
89
90
90
91 A cell here can be read like this:
91 A cell here can be read like this:
92
92
93 "When a new client pushes a draft changeset (D) to a publishing
93 "When a new client pushes a draft changeset (D) to a publishing
94 server where it's not present (N), it's marked public on both
94 server where it's not present (N), it's marked public on both
95 sides (P/P)."
95 sides (P/P)."
96
96
97 Note: old client behave as a publishing server with draft only content
97 Note: old client behave as a publishing server with draft only content
98 - other people see it as public
98 - other people see it as public
99 - content is pushed as draft
99 - content is pushed as draft
100
100
101 """
101 """
102
102
103 from __future__ import absolute_import
103 from __future__ import absolute_import
104
104
105 import errno
105 import errno
106 import struct
106 import struct
107
107
108 from .i18n import _
108 from .i18n import _
109 from .node import (
109 from .node import (
110 bin,
110 bin,
111 hex,
111 hex,
112 nullid,
112 nullid,
113 nullrev,
113 nullrev,
114 short,
114 short,
115 )
115 )
116 from . import (
116 from . import (
117 error,
117 error,
118 smartset,
118 smartset,
119 txnutil,
119 txnutil,
120 util,
120 util,
121 )
121 )
122
122
123 _fphasesentry = struct.Struct('>i20s')
123 _fphasesentry = struct.Struct('>i20s')
124
124
125 allphases = public, draft, secret = range(3)
125 allphases = public, draft, secret = range(3)
126 trackedphases = allphases[1:]
126 trackedphases = allphases[1:]
127 phasenames = ['public', 'draft', 'secret']
127 phasenames = ['public', 'draft', 'secret']
128
128
129 def _readroots(repo, phasedefaults=None):
129 def _readroots(repo, phasedefaults=None):
130 """Read phase roots from disk
130 """Read phase roots from disk
131
131
132 phasedefaults is a list of fn(repo, roots) callable, which are
132 phasedefaults is a list of fn(repo, roots) callable, which are
133 executed if the phase roots file does not exist. When phases are
133 executed if the phase roots file does not exist. When phases are
134 being initialized on an existing repository, this could be used to
134 being initialized on an existing repository, this could be used to
135 set selected changesets phase to something else than public.
135 set selected changesets phase to something else than public.
136
136
137 Return (roots, dirty) where dirty is true if roots differ from
137 Return (roots, dirty) where dirty is true if roots differ from
138 what is being stored.
138 what is being stored.
139 """
139 """
140 repo = repo.unfiltered()
140 repo = repo.unfiltered()
141 dirty = False
141 dirty = False
142 roots = [set() for i in allphases]
142 roots = [set() for i in allphases]
143 try:
143 try:
144 f, pending = txnutil.trypending(repo.root, repo.svfs, 'phaseroots')
144 f, pending = txnutil.trypending(repo.root, repo.svfs, 'phaseroots')
145 try:
145 try:
146 for line in f:
146 for line in f:
147 phase, nh = line.split()
147 phase, nh = line.split()
148 roots[int(phase)].add(bin(nh))
148 roots[int(phase)].add(bin(nh))
149 finally:
149 finally:
150 f.close()
150 f.close()
151 except IOError as inst:
151 except IOError as inst:
152 if inst.errno != errno.ENOENT:
152 if inst.errno != errno.ENOENT:
153 raise
153 raise
154 if phasedefaults:
154 if phasedefaults:
155 for f in phasedefaults:
155 for f in phasedefaults:
156 roots = f(repo, roots)
156 roots = f(repo, roots)
157 dirty = True
157 dirty = True
158 return roots, dirty
158 return roots, dirty
159
159
160 def binaryencode(phasemapping):
160 def binaryencode(phasemapping):
161 """encode a 'phase -> nodes' mapping into a binary stream
161 """encode a 'phase -> nodes' mapping into a binary stream
162
162
163 Since phases are integer the mapping is actually a python list:
163 Since phases are integer the mapping is actually a python list:
164 [[PUBLIC_HEADS], [DRAFTS_HEADS], [SECRET_HEADS]]
164 [[PUBLIC_HEADS], [DRAFTS_HEADS], [SECRET_HEADS]]
165 """
165 """
166 binarydata = []
166 binarydata = []
167 for phase, nodes in enumerate(phasemapping):
167 for phase, nodes in enumerate(phasemapping):
168 for head in nodes:
168 for head in nodes:
169 binarydata.append(_fphasesentry.pack(phase, head))
169 binarydata.append(_fphasesentry.pack(phase, head))
170 return ''.join(binarydata)
170 return ''.join(binarydata)
171
171
172 def binarydecode(stream):
172 def binarydecode(stream):
173 """decode a binary stream into a 'phase -> nodes' mapping
173 """decode a binary stream into a 'phase -> nodes' mapping
174
174
175 Since phases are integer the mapping is actually a python list."""
175 Since phases are integer the mapping is actually a python list."""
176 headsbyphase = [[] for i in allphases]
176 headsbyphase = [[] for i in allphases]
177 entrysize = _fphasesentry.size
177 entrysize = _fphasesentry.size
178 while True:
178 while True:
179 entry = stream.read(entrysize)
179 entry = stream.read(entrysize)
180 if len(entry) < entrysize:
180 if len(entry) < entrysize:
181 if entry:
181 if entry:
182 raise error.Abort(_('bad phase-heads stream'))
182 raise error.Abort(_('bad phase-heads stream'))
183 break
183 break
184 phase, node = _fphasesentry.unpack(entry)
184 phase, node = _fphasesentry.unpack(entry)
185 headsbyphase[phase].append(node)
185 headsbyphase[phase].append(node)
186 return headsbyphase
186 return headsbyphase
187
187
188 def _trackphasechange(data, rev, old, new):
188 def _trackphasechange(data, rev, old, new):
189 """add a phase move the <data> dictionnary
189 """add a phase move the <data> dictionnary
190
190
191 If data is None, nothing happens.
191 If data is None, nothing happens.
192 """
192 """
193 if data is None:
193 if data is None:
194 return
194 return
195 existing = data.get(rev)
195 existing = data.get(rev)
196 if existing is not None:
196 if existing is not None:
197 old = existing[0]
197 old = existing[0]
198 data[rev] = (old, new)
198 data[rev] = (old, new)
199
199
200 class phasecache(object):
200 class phasecache(object):
201 def __init__(self, repo, phasedefaults, _load=True):
201 def __init__(self, repo, phasedefaults, _load=True):
202 if _load:
202 if _load:
203 # Cheap trick to allow shallow-copy without copy module
203 # Cheap trick to allow shallow-copy without copy module
204 self.phaseroots, self.dirty = _readroots(repo, phasedefaults)
204 self.phaseroots, self.dirty = _readroots(repo, phasedefaults)
205 self._phaserevs = None
205 self._phaserevs = None
206 self._phasesets = None
206 self._phasesets = None
207 self.filterunknown(repo)
207 self.filterunknown(repo)
208 self.opener = repo.svfs
208 self.opener = repo.svfs
209
209
210 def getrevset(self, repo, phases):
210 def getrevset(self, repo, phases):
211 """return a smartset for the given phases"""
211 """return a smartset for the given phases"""
212 self.loadphaserevs(repo) # ensure phase's sets are loaded
212 self.loadphaserevs(repo) # ensure phase's sets are loaded
213
213
214 if self._phasesets and all(self._phasesets[p] is not None
214 if self._phasesets and all(self._phasesets[p] is not None
215 for p in phases):
215 for p in phases):
216 # fast path - use _phasesets
216 # fast path - use _phasesets
217 revs = self._phasesets[phases[0]]
217 revs = self._phasesets[phases[0]]
218 if len(phases) > 1:
218 if len(phases) > 1:
219 revs = revs.copy() # only copy when needed
219 revs = revs.copy() # only copy when needed
220 for p in phases[1:]:
220 for p in phases[1:]:
221 revs.update(self._phasesets[p])
221 revs.update(self._phasesets[p])
222 if repo.changelog.filteredrevs:
222 if repo.changelog.filteredrevs:
223 revs = revs - repo.changelog.filteredrevs
223 revs = revs - repo.changelog.filteredrevs
224 return smartset.baseset(revs)
224 return smartset.baseset(revs)
225 else:
225 else:
226 # slow path - enumerate all revisions
226 # slow path - enumerate all revisions
227 phase = self.phase
227 phase = self.phase
228 revs = (r for r in repo if phase(repo, r) in phases)
228 revs = (r for r in repo if phase(repo, r) in phases)
229 return smartset.generatorset(revs, iterasc=True)
229 return smartset.generatorset(revs, iterasc=True)
230
230
231 def copy(self):
231 def copy(self):
232 # Shallow copy meant to ensure isolation in
232 # Shallow copy meant to ensure isolation in
233 # advance/retractboundary(), nothing more.
233 # advance/retractboundary(), nothing more.
234 ph = self.__class__(None, None, _load=False)
234 ph = self.__class__(None, None, _load=False)
235 ph.phaseroots = self.phaseroots[:]
235 ph.phaseroots = self.phaseroots[:]
236 ph.dirty = self.dirty
236 ph.dirty = self.dirty
237 ph.opener = self.opener
237 ph.opener = self.opener
238 ph._phaserevs = self._phaserevs
238 ph._phaserevs = self._phaserevs
239 ph._phasesets = self._phasesets
239 ph._phasesets = self._phasesets
240 return ph
240 return ph
241
241
242 def replace(self, phcache):
242 def replace(self, phcache):
243 """replace all values in 'self' with content of phcache"""
243 """replace all values in 'self' with content of phcache"""
244 for a in ('phaseroots', 'dirty', 'opener', '_phaserevs', '_phasesets'):
244 for a in ('phaseroots', 'dirty', 'opener', '_phaserevs', '_phasesets'):
245 setattr(self, a, getattr(phcache, a))
245 setattr(self, a, getattr(phcache, a))
246
246
247 def _getphaserevsnative(self, repo):
247 def _getphaserevsnative(self, repo):
248 repo = repo.unfiltered()
248 repo = repo.unfiltered()
249 nativeroots = []
249 nativeroots = []
250 for phase in trackedphases:
250 for phase in trackedphases:
251 nativeroots.append(map(repo.changelog.rev, self.phaseroots[phase]))
251 nativeroots.append(map(repo.changelog.rev, self.phaseroots[phase]))
252 return repo.changelog.computephases(nativeroots)
252 return repo.changelog.computephases(nativeroots)
253
253
254 def _computephaserevspure(self, repo):
254 def _computephaserevspure(self, repo):
255 repo = repo.unfiltered()
255 repo = repo.unfiltered()
256 revs = [public] * len(repo.changelog)
256 revs = [public] * len(repo.changelog)
257 self._phaserevs = revs
257 self._phaserevs = revs
258 self._populatephaseroots(repo)
258 self._populatephaseroots(repo)
259 for phase in trackedphases:
259 for phase in trackedphases:
260 roots = list(map(repo.changelog.rev, self.phaseroots[phase]))
260 roots = list(map(repo.changelog.rev, self.phaseroots[phase]))
261 if roots:
261 if roots:
262 for rev in roots:
262 for rev in roots:
263 revs[rev] = phase
263 revs[rev] = phase
264 for rev in repo.changelog.descendants(roots):
264 for rev in repo.changelog.descendants(roots):
265 revs[rev] = phase
265 revs[rev] = phase
266
266
267 def loadphaserevs(self, repo):
267 def loadphaserevs(self, repo):
268 """ensure phase information is loaded in the object"""
268 """ensure phase information is loaded in the object"""
269 if self._phaserevs is None:
269 if self._phaserevs is None:
270 try:
270 try:
271 res = self._getphaserevsnative(repo)
271 res = self._getphaserevsnative(repo)
272 self._phaserevs, self._phasesets = res
272 self._phaserevs, self._phasesets = res
273 except AttributeError:
273 except AttributeError:
274 self._computephaserevspure(repo)
274 self._computephaserevspure(repo)
275
275
276 def invalidate(self):
276 def invalidate(self):
277 self._phaserevs = None
277 self._phaserevs = None
278 self._phasesets = None
278 self._phasesets = None
279
279
280 def _populatephaseroots(self, repo):
280 def _populatephaseroots(self, repo):
281 """Fills the _phaserevs cache with phases for the roots.
281 """Fills the _phaserevs cache with phases for the roots.
282 """
282 """
283 cl = repo.changelog
283 cl = repo.changelog
284 phaserevs = self._phaserevs
284 phaserevs = self._phaserevs
285 for phase in trackedphases:
285 for phase in trackedphases:
286 roots = map(cl.rev, self.phaseroots[phase])
286 roots = map(cl.rev, self.phaseroots[phase])
287 for root in roots:
287 for root in roots:
288 phaserevs[root] = phase
288 phaserevs[root] = phase
289
289
290 def phase(self, repo, rev):
290 def phase(self, repo, rev):
291 # We need a repo argument here to be able to build _phaserevs
291 # We need a repo argument here to be able to build _phaserevs
292 # if necessary. The repository instance is not stored in
292 # if necessary. The repository instance is not stored in
293 # phasecache to avoid reference cycles. The changelog instance
293 # phasecache to avoid reference cycles. The changelog instance
294 # is not stored because it is a filecache() property and can
294 # is not stored because it is a filecache() property and can
295 # be replaced without us being notified.
295 # be replaced without us being notified.
296 if rev == nullrev:
296 if rev == nullrev:
297 return public
297 return public
298 if rev < nullrev:
298 if rev < nullrev:
299 raise ValueError(_('cannot lookup negative revision'))
299 raise ValueError(_('cannot lookup negative revision'))
300 if self._phaserevs is None or rev >= len(self._phaserevs):
300 if self._phaserevs is None or rev >= len(self._phaserevs):
301 self.invalidate()
301 self.invalidate()
302 self.loadphaserevs(repo)
302 self.loadphaserevs(repo)
303 return self._phaserevs[rev]
303 return self._phaserevs[rev]
304
304
305 def write(self):
305 def write(self):
306 if not self.dirty:
306 if not self.dirty:
307 return
307 return
308 f = self.opener('phaseroots', 'w', atomictemp=True, checkambig=True)
308 f = self.opener('phaseroots', 'w', atomictemp=True, checkambig=True)
309 try:
309 try:
310 self._write(f)
310 self._write(f)
311 finally:
311 finally:
312 f.close()
312 f.close()
313
313
314 def _write(self, fp):
314 def _write(self, fp):
315 for phase, roots in enumerate(self.phaseroots):
315 for phase, roots in enumerate(self.phaseroots):
316 for h in roots:
316 for h in roots:
317 fp.write('%i %s\n' % (phase, hex(h)))
317 fp.write('%i %s\n' % (phase, hex(h)))
318 self.dirty = False
318 self.dirty = False
319
319
320 def _updateroots(self, phase, newroots, tr):
320 def _updateroots(self, phase, newroots, tr):
321 self.phaseroots[phase] = newroots
321 self.phaseroots[phase] = newroots
322 self.invalidate()
322 self.invalidate()
323 self.dirty = True
323 self.dirty = True
324
324
325 tr.addfilegenerator('phase', ('phaseroots',), self._write)
325 tr.addfilegenerator('phase', ('phaseroots',), self._write)
326 tr.hookargs['phases_moved'] = '1'
326 tr.hookargs['phases_moved'] = '1'
327
327
328 def registernew(self, repo, tr, targetphase, nodes):
328 def registernew(self, repo, tr, targetphase, nodes):
329 repo = repo.unfiltered()
329 repo = repo.unfiltered()
330 self._retractboundary(repo, tr, targetphase, nodes)
330 self._retractboundary(repo, tr, targetphase, nodes)
331 if tr is not None and 'phases' in tr.changes:
331 if tr is not None and 'phases' in tr.changes:
332 phasetracking = tr.changes['phases']
332 phasetracking = tr.changes['phases']
333 torev = repo.changelog.rev
333 torev = repo.changelog.rev
334 phase = self.phase
334 phase = self.phase
335 for n in nodes:
335 for n in nodes:
336 rev = torev(n)
336 rev = torev(n)
337 revphase = phase(repo, rev)
337 revphase = phase(repo, rev)
338 _trackphasechange(phasetracking, rev, None, revphase)
338 _trackphasechange(phasetracking, rev, None, revphase)
339 repo.invalidatevolatilesets()
339 repo.invalidatevolatilesets()
340
340
341 def advanceboundary(self, repo, tr, targetphase, nodes):
341 def advanceboundary(self, repo, tr, targetphase, nodes):
342 """Set all 'nodes' to phase 'targetphase'
342 """Set all 'nodes' to phase 'targetphase'
343
343
344 Nodes with a phase lower than 'targetphase' are not affected.
344 Nodes with a phase lower than 'targetphase' are not affected.
345 """
345 """
346 # Be careful to preserve shallow-copied values: do not update
346 # Be careful to preserve shallow-copied values: do not update
347 # phaseroots values, replace them.
347 # phaseroots values, replace them.
348 if tr is None:
348 if tr is None:
349 phasetracking = None
349 phasetracking = None
350 else:
350 else:
351 phasetracking = tr.changes.get('phases')
351 phasetracking = tr.changes.get('phases')
352
352
353 repo = repo.unfiltered()
353 repo = repo.unfiltered()
354
354
355 delroots = [] # set of root deleted by this path
355 delroots = [] # set of root deleted by this path
356 for phase in xrange(targetphase + 1, len(allphases)):
356 for phase in xrange(targetphase + 1, len(allphases)):
357 # filter nodes that are not in a compatible phase already
357 # filter nodes that are not in a compatible phase already
358 nodes = [n for n in nodes
358 nodes = [n for n in nodes
359 if self.phase(repo, repo[n].rev()) >= phase]
359 if self.phase(repo, repo[n].rev()) >= phase]
360 if not nodes:
360 if not nodes:
361 break # no roots to move anymore
361 break # no roots to move anymore
362
362
363 olds = self.phaseroots[phase]
363 olds = self.phaseroots[phase]
364
364
365 affected = repo.revs('%ln::%ln', olds, nodes)
365 affected = repo.revs('%ln::%ln', olds, nodes)
366 for r in affected:
366 for r in affected:
367 _trackphasechange(phasetracking, r, self.phase(repo, r),
367 _trackphasechange(phasetracking, r, self.phase(repo, r),
368 targetphase)
368 targetphase)
369
369
370 roots = set(ctx.node() for ctx in repo.set(
370 roots = set(ctx.node() for ctx in repo.set(
371 'roots((%ln::) - %ld)', olds, affected))
371 'roots((%ln::) - %ld)', olds, affected))
372 if olds != roots:
372 if olds != roots:
373 self._updateroots(phase, roots, tr)
373 self._updateroots(phase, roots, tr)
374 # some roots may need to be declared for lower phases
374 # some roots may need to be declared for lower phases
375 delroots.extend(olds - roots)
375 delroots.extend(olds - roots)
376 # declare deleted root in the target phase
376 # declare deleted root in the target phase
377 if targetphase != 0:
377 if targetphase != 0:
378 self._retractboundary(repo, tr, targetphase, delroots)
378 self._retractboundary(repo, tr, targetphase, delroots)
379 repo.invalidatevolatilesets()
379 repo.invalidatevolatilesets()
380
380
381 def retractboundary(self, repo, tr, targetphase, nodes):
381 def retractboundary(self, repo, tr, targetphase, nodes):
382 oldroots = self.phaseroots[:targetphase + 1]
382 oldroots = self.phaseroots[:targetphase + 1]
383 if tr is None:
383 if tr is None:
384 phasetracking = None
384 phasetracking = None
385 else:
385 else:
386 phasetracking = tr.changes.get('phases')
386 phasetracking = tr.changes.get('phases')
387 repo = repo.unfiltered()
387 repo = repo.unfiltered()
388 if (self._retractboundary(repo, tr, targetphase, nodes)
388 if (self._retractboundary(repo, tr, targetphase, nodes)
389 and phasetracking is not None):
389 and phasetracking is not None):
390
390
391 # find the affected revisions
391 # find the affected revisions
392 new = self.phaseroots[targetphase]
392 new = self.phaseroots[targetphase]
393 old = oldroots[targetphase]
393 old = oldroots[targetphase]
394 affected = set(repo.revs('(%ln::) - (%ln::)', new, old))
394 affected = set(repo.revs('(%ln::) - (%ln::)', new, old))
395
395
396 # find the phase of the affected revision
396 # find the phase of the affected revision
397 for phase in xrange(targetphase, -1, -1):
397 for phase in xrange(targetphase, -1, -1):
398 if phase:
398 if phase:
399 roots = oldroots[phase]
399 roots = oldroots[phase]
400 revs = set(repo.revs('%ln::%ld', roots, affected))
400 revs = set(repo.revs('%ln::%ld', roots, affected))
401 affected -= revs
401 affected -= revs
402 else: # public phase
402 else: # public phase
403 revs = affected
403 revs = affected
404 for r in revs:
404 for r in revs:
405 _trackphasechange(phasetracking, r, phase, targetphase)
405 _trackphasechange(phasetracking, r, phase, targetphase)
406 repo.invalidatevolatilesets()
406 repo.invalidatevolatilesets()
407
407
408 def _retractboundary(self, repo, tr, targetphase, nodes):
408 def _retractboundary(self, repo, tr, targetphase, nodes):
409 # Be careful to preserve shallow-copied values: do not update
409 # Be careful to preserve shallow-copied values: do not update
410 # phaseroots values, replace them.
410 # phaseroots values, replace them.
411
411
412 repo = repo.unfiltered()
412 repo = repo.unfiltered()
413 currentroots = self.phaseroots[targetphase]
413 currentroots = self.phaseroots[targetphase]
414 finalroots = oldroots = set(currentroots)
414 finalroots = oldroots = set(currentroots)
415 newroots = [n for n in nodes
415 newroots = [n for n in nodes
416 if self.phase(repo, repo[n].rev()) < targetphase]
416 if self.phase(repo, repo[n].rev()) < targetphase]
417 if newroots:
417 if newroots:
418
418
419 if nullid in newroots:
419 if nullid in newroots:
420 raise error.Abort(_('cannot change null revision phase'))
420 raise error.Abort(_('cannot change null revision phase'))
421 currentroots = currentroots.copy()
421 currentroots = currentroots.copy()
422 currentroots.update(newroots)
422 currentroots.update(newroots)
423
423
424 # Only compute new roots for revs above the roots that are being
424 # Only compute new roots for revs above the roots that are being
425 # retracted.
425 # retracted.
426 minnewroot = min(repo[n].rev() for n in newroots)
426 minnewroot = min(repo[n].rev() for n in newroots)
427 aboveroots = [n for n in currentroots
427 aboveroots = [n for n in currentroots
428 if repo[n].rev() >= minnewroot]
428 if repo[n].rev() >= minnewroot]
429 updatedroots = repo.set('roots(%ln::)', aboveroots)
429 updatedroots = repo.set('roots(%ln::)', aboveroots)
430
430
431 finalroots = set(n for n in currentroots if repo[n].rev() <
431 finalroots = set(n for n in currentroots if repo[n].rev() <
432 minnewroot)
432 minnewroot)
433 finalroots.update(ctx.node() for ctx in updatedroots)
433 finalroots.update(ctx.node() for ctx in updatedroots)
434 if finalroots != oldroots:
434 if finalroots != oldroots:
435 self._updateroots(targetphase, finalroots, tr)
435 self._updateroots(targetphase, finalroots, tr)
436 return True
436 return True
437 return False
437 return False
438
438
439 def filterunknown(self, repo):
439 def filterunknown(self, repo):
440 """remove unknown nodes from the phase boundary
440 """remove unknown nodes from the phase boundary
441
441
442 Nothing is lost as unknown nodes only hold data for their descendants.
442 Nothing is lost as unknown nodes only hold data for their descendants.
443 """
443 """
444 filtered = False
444 filtered = False
445 nodemap = repo.changelog.nodemap # to filter unknown nodes
445 nodemap = repo.changelog.nodemap # to filter unknown nodes
446 for phase, nodes in enumerate(self.phaseroots):
446 for phase, nodes in enumerate(self.phaseroots):
447 missing = sorted(node for node in nodes if node not in nodemap)
447 missing = sorted(node for node in nodes if node not in nodemap)
448 if missing:
448 if missing:
449 for mnode in missing:
449 for mnode in missing:
450 repo.ui.debug(
450 repo.ui.debug(
451 'removing unknown node %s from %i-phase boundary\n'
451 'removing unknown node %s from %i-phase boundary\n'
452 % (short(mnode), phase))
452 % (short(mnode), phase))
453 nodes.symmetric_difference_update(missing)
453 nodes.symmetric_difference_update(missing)
454 filtered = True
454 filtered = True
455 if filtered:
455 if filtered:
456 self.dirty = True
456 self.dirty = True
457 # filterunknown is called by repo.destroyed, we may have no changes in
457 # filterunknown is called by repo.destroyed, we may have no changes in
458 # root but phaserevs contents is certainly invalid (or at least we
458 # root but phaserevs contents is certainly invalid (or at least we
459 # have not proper way to check that). related to issue 3858.
459 # have not proper way to check that). related to issue 3858.
460 #
460 #
461 # The other caller is __init__ that have no _phaserevs initialized
461 # The other caller is __init__ that have no _phaserevs initialized
462 # anyway. If this change we should consider adding a dedicated
462 # anyway. If this change we should consider adding a dedicated
463 # "destroyed" function to phasecache or a proper cache key mechanism
463 # "destroyed" function to phasecache or a proper cache key mechanism
464 # (see branchmap one)
464 # (see branchmap one)
465 self.invalidate()
465 self.invalidate()
466
466
467 def advanceboundary(repo, tr, targetphase, nodes):
467 def advanceboundary(repo, tr, targetphase, nodes):
468 """Add nodes to a phase changing other nodes phases if necessary.
468 """Add nodes to a phase changing other nodes phases if necessary.
469
469
470 This function move boundary *forward* this means that all nodes
470 This function move boundary *forward* this means that all nodes
471 are set in the target phase or kept in a *lower* phase.
471 are set in the target phase or kept in a *lower* phase.
472
472
473 Simplify boundary to contains phase roots only."""
473 Simplify boundary to contains phase roots only."""
474 phcache = repo._phasecache.copy()
474 phcache = repo._phasecache.copy()
475 phcache.advanceboundary(repo, tr, targetphase, nodes)
475 phcache.advanceboundary(repo, tr, targetphase, nodes)
476 repo._phasecache.replace(phcache)
476 repo._phasecache.replace(phcache)
477
477
478 def retractboundary(repo, tr, targetphase, nodes):
478 def retractboundary(repo, tr, targetphase, nodes):
479 """Set nodes back to a phase changing other nodes phases if
479 """Set nodes back to a phase changing other nodes phases if
480 necessary.
480 necessary.
481
481
482 This function move boundary *backward* this means that all nodes
482 This function move boundary *backward* this means that all nodes
483 are set in the target phase or kept in a *higher* phase.
483 are set in the target phase or kept in a *higher* phase.
484
484
485 Simplify boundary to contains phase roots only."""
485 Simplify boundary to contains phase roots only."""
486 phcache = repo._phasecache.copy()
486 phcache = repo._phasecache.copy()
487 phcache.retractboundary(repo, tr, targetphase, nodes)
487 phcache.retractboundary(repo, tr, targetphase, nodes)
488 repo._phasecache.replace(phcache)
488 repo._phasecache.replace(phcache)
489
489
490 def registernew(repo, tr, targetphase, nodes):
490 def registernew(repo, tr, targetphase, nodes):
491 """register a new revision and its phase
491 """register a new revision and its phase
492
492
493 Code adding revisions to the repository should use this function to
493 Code adding revisions to the repository should use this function to
494 set new changeset in their target phase (or higher).
494 set new changeset in their target phase (or higher).
495 """
495 """
496 phcache = repo._phasecache.copy()
496 phcache = repo._phasecache.copy()
497 phcache.registernew(repo, tr, targetphase, nodes)
497 phcache.registernew(repo, tr, targetphase, nodes)
498 repo._phasecache.replace(phcache)
498 repo._phasecache.replace(phcache)
499
499
500 def listphases(repo):
500 def listphases(repo):
501 """List phases root for serialization over pushkey"""
501 """List phases root for serialization over pushkey"""
502 # Use ordered dictionary so behavior is deterministic.
502 # Use ordered dictionary so behavior is deterministic.
503 keys = util.sortdict()
503 keys = util.sortdict()
504 value = '%i' % draft
504 value = '%i' % draft
505 for root in repo._phasecache.phaseroots[draft]:
505 for root in repo._phasecache.phaseroots[draft]:
506 keys[hex(root)] = value
506 keys[hex(root)] = value
507
507
508 if repo.publishing():
508 if repo.publishing():
509 # Add an extra data to let remote know we are a publishing
509 # Add an extra data to let remote know we are a publishing
510 # repo. Publishing repo can't just pretend they are old repo.
510 # repo. Publishing repo can't just pretend they are old repo.
511 # When pushing to a publishing repo, the client still need to
511 # When pushing to a publishing repo, the client still need to
512 # push phase boundary
512 # push phase boundary
513 #
513 #
514 # Push do not only push changeset. It also push phase data.
514 # Push do not only push changeset. It also push phase data.
515 # New phase data may apply to common changeset which won't be
515 # New phase data may apply to common changeset which won't be
516 # push (as they are common). Here is a very simple example:
516 # push (as they are common). Here is a very simple example:
517 #
517 #
518 # 1) repo A push changeset X as draft to repo B
518 # 1) repo A push changeset X as draft to repo B
519 # 2) repo B make changeset X public
519 # 2) repo B make changeset X public
520 # 3) repo B push to repo A. X is not pushed but the data that
520 # 3) repo B push to repo A. X is not pushed but the data that
521 # X as now public should
521 # X as now public should
522 #
522 #
523 # The server can't handle it on it's own as it has no idea of
523 # The server can't handle it on it's own as it has no idea of
524 # client phase data.
524 # client phase data.
525 keys['publishing'] = 'True'
525 keys['publishing'] = 'True'
526 return keys
526 return keys
527
527
528 def pushphase(repo, nhex, oldphasestr, newphasestr):
528 def pushphase(repo, nhex, oldphasestr, newphasestr):
529 """List phases root for serialization over pushkey"""
529 """List phases root for serialization over pushkey"""
530 repo = repo.unfiltered()
530 repo = repo.unfiltered()
531 with repo.lock():
531 with repo.lock():
532 currentphase = repo[nhex].phase()
532 currentphase = repo[nhex].phase()
533 newphase = abs(int(newphasestr)) # let's avoid negative index surprise
533 newphase = abs(int(newphasestr)) # let's avoid negative index surprise
534 oldphase = abs(int(oldphasestr)) # let's avoid negative index surprise
534 oldphase = abs(int(oldphasestr)) # let's avoid negative index surprise
535 if currentphase == oldphase and newphase < oldphase:
535 if currentphase == oldphase and newphase < oldphase:
536 with repo.transaction('pushkey-phase') as tr:
536 with repo.transaction('pushkey-phase') as tr:
537 advanceboundary(repo, tr, newphase, [bin(nhex)])
537 advanceboundary(repo, tr, newphase, [bin(nhex)])
538 return True
538 return True
539 elif currentphase == newphase:
539 elif currentphase == newphase:
540 # raced, but got correct result
540 # raced, but got correct result
541 return True
541 return True
542 else:
542 else:
543 return False
543 return False
544
544
545 def subsetphaseheads(repo, subset):
545 def subsetphaseheads(repo, subset):
546 """Finds the phase heads for a subset of a history
546 """Finds the phase heads for a subset of a history
547
547
548 Returns a list indexed by phase number where each item is a list of phase
548 Returns a list indexed by phase number where each item is a list of phase
549 head nodes.
549 head nodes.
550 """
550 """
551 cl = repo.changelog
551 cl = repo.changelog
552
552
553 headsbyphase = [[] for i in allphases]
553 headsbyphase = [[] for i in allphases]
554 # No need to keep track of secret phase; any heads in the subset that
554 # No need to keep track of secret phase; any heads in the subset that
555 # are not mentioned are implicitly secret.
555 # are not mentioned are implicitly secret.
556 for phase in allphases[:-1]:
556 for phase in allphases[:-1]:
557 revset = "heads(%%ln & %s())" % phasenames[phase]
557 revset = "heads(%%ln & %s())" % phasenames[phase]
558 headsbyphase[phase] = [cl.node(r) for r in repo.revs(revset, subset)]
558 headsbyphase[phase] = [cl.node(r) for r in repo.revs(revset, subset)]
559 return headsbyphase
559 return headsbyphase
560
560
561 def updatephases(repo, tr, headsbyphase):
561 def updatephases(repo, trgetter, headsbyphase):
562 """Updates the repo with the given phase heads"""
562 """Updates the repo with the given phase heads"""
563 # Now advance phase boundaries of all but secret phase
563 # Now advance phase boundaries of all but secret phase
564 #
565 # run the update (and fetch transaction) only if there are actually things
566 # to update. This avoid creating empty transaction during no-op operation.
567
564 for phase in allphases[:-1]:
568 for phase in allphases[:-1]:
565 advanceboundary(repo, tr, phase, headsbyphase[phase])
569 revset = '%%ln - %s()' % phasenames[phase]
570 heads = [c.node() for c in repo.set(revset, headsbyphase[phase])]
571 if heads:
572 advanceboundary(repo, trgetter(), phase, heads)
566
573
567 def analyzeremotephases(repo, subset, roots):
574 def analyzeremotephases(repo, subset, roots):
568 """Compute phases heads and root in a subset of node from root dict
575 """Compute phases heads and root in a subset of node from root dict
569
576
570 * subset is heads of the subset
577 * subset is heads of the subset
571 * roots is {<nodeid> => phase} mapping. key and value are string.
578 * roots is {<nodeid> => phase} mapping. key and value are string.
572
579
573 Accept unknown element input
580 Accept unknown element input
574 """
581 """
575 repo = repo.unfiltered()
582 repo = repo.unfiltered()
576 # build list from dictionary
583 # build list from dictionary
577 draftroots = []
584 draftroots = []
578 nodemap = repo.changelog.nodemap # to filter unknown nodes
585 nodemap = repo.changelog.nodemap # to filter unknown nodes
579 for nhex, phase in roots.iteritems():
586 for nhex, phase in roots.iteritems():
580 if nhex == 'publishing': # ignore data related to publish option
587 if nhex == 'publishing': # ignore data related to publish option
581 continue
588 continue
582 node = bin(nhex)
589 node = bin(nhex)
583 phase = int(phase)
590 phase = int(phase)
584 if phase == public:
591 if phase == public:
585 if node != nullid:
592 if node != nullid:
586 repo.ui.warn(_('ignoring inconsistent public root'
593 repo.ui.warn(_('ignoring inconsistent public root'
587 ' from remote: %s\n') % nhex)
594 ' from remote: %s\n') % nhex)
588 elif phase == draft:
595 elif phase == draft:
589 if node in nodemap:
596 if node in nodemap:
590 draftroots.append(node)
597 draftroots.append(node)
591 else:
598 else:
592 repo.ui.warn(_('ignoring unexpected root from remote: %i %s\n')
599 repo.ui.warn(_('ignoring unexpected root from remote: %i %s\n')
593 % (phase, nhex))
600 % (phase, nhex))
594 # compute heads
601 # compute heads
595 publicheads = newheads(repo, subset, draftroots)
602 publicheads = newheads(repo, subset, draftroots)
596 return publicheads, draftroots
603 return publicheads, draftroots
597
604
598 def newheads(repo, heads, roots):
605 def newheads(repo, heads, roots):
599 """compute new head of a subset minus another
606 """compute new head of a subset minus another
600
607
601 * `heads`: define the first subset
608 * `heads`: define the first subset
602 * `roots`: define the second we subtract from the first"""
609 * `roots`: define the second we subtract from the first"""
603 repo = repo.unfiltered()
610 repo = repo.unfiltered()
604 revset = repo.set('heads((%ln + parents(%ln)) - (%ln::%ln))',
611 revset = repo.set('heads((%ln + parents(%ln)) - (%ln::%ln))',
605 heads, roots, roots, heads)
612 heads, roots, roots, heads)
606 return [c.node() for c in revset]
613 return [c.node() for c in revset]
607
614
608
615
609 def newcommitphase(ui):
616 def newcommitphase(ui):
610 """helper to get the target phase of new commit
617 """helper to get the target phase of new commit
611
618
612 Handle all possible values for the phases.new-commit options.
619 Handle all possible values for the phases.new-commit options.
613
620
614 """
621 """
615 v = ui.config('phases', 'new-commit', draft)
622 v = ui.config('phases', 'new-commit', draft)
616 try:
623 try:
617 return phasenames.index(v)
624 return phasenames.index(v)
618 except ValueError:
625 except ValueError:
619 try:
626 try:
620 return int(v)
627 return int(v)
621 except ValueError:
628 except ValueError:
622 msg = _("phases.new-commit: not a valid phase name ('%s')")
629 msg = _("phases.new-commit: not a valid phase name ('%s')")
623 raise error.ConfigError(msg % v)
630 raise error.ConfigError(msg % v)
624
631
625 def hassecret(repo):
632 def hassecret(repo):
626 """utility function that check if a repo have any secret changeset."""
633 """utility function that check if a repo have any secret changeset."""
627 return bool(repo._phasecache.phaseroots[2])
634 return bool(repo._phasecache.phaseroots[2])
General Comments 0
You need to be logged in to leave comments. Login now