##// END OF EJS Templates
phases: move binary encoding into a reusable function...
Boris Feld -
r34320:5779d096 default
parent child Browse files
Show More
@@ -1,1939 +1,1934 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 phases,
161 phases,
162 pushkey,
162 pushkey,
163 pycompat,
163 pycompat,
164 tags,
164 tags,
165 url,
165 url,
166 util,
166 util,
167 )
167 )
168
168
169 urlerr = util.urlerr
169 urlerr = util.urlerr
170 urlreq = util.urlreq
170 urlreq = util.urlreq
171
171
172 _pack = struct.pack
172 _pack = struct.pack
173 _unpack = struct.unpack
173 _unpack = struct.unpack
174
174
175 _fstreamparamsize = '>i'
175 _fstreamparamsize = '>i'
176 _fpartheadersize = '>i'
176 _fpartheadersize = '>i'
177 _fparttypesize = '>B'
177 _fparttypesize = '>B'
178 _fpartid = '>I'
178 _fpartid = '>I'
179 _fpayloadsize = '>i'
179 _fpayloadsize = '>i'
180 _fpartparamcount = '>BB'
180 _fpartparamcount = '>BB'
181
181
182 _fphasesentry = struct.Struct('>i20s')
183
184 preferedchunksize = 4096
182 preferedchunksize = 4096
185
183
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
184 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187
185
188 def outdebug(ui, message):
186 def outdebug(ui, message):
189 """debug regarding output stream (bundling)"""
187 """debug regarding output stream (bundling)"""
190 if ui.configbool('devel', 'bundle2.debug'):
188 if ui.configbool('devel', 'bundle2.debug'):
191 ui.debug('bundle2-output: %s\n' % message)
189 ui.debug('bundle2-output: %s\n' % message)
192
190
193 def indebug(ui, message):
191 def indebug(ui, message):
194 """debug on input stream (unbundling)"""
192 """debug on input stream (unbundling)"""
195 if ui.configbool('devel', 'bundle2.debug'):
193 if ui.configbool('devel', 'bundle2.debug'):
196 ui.debug('bundle2-input: %s\n' % message)
194 ui.debug('bundle2-input: %s\n' % message)
197
195
198 def validateparttype(parttype):
196 def validateparttype(parttype):
199 """raise ValueError if a parttype contains invalid character"""
197 """raise ValueError if a parttype contains invalid character"""
200 if _parttypeforbidden.search(parttype):
198 if _parttypeforbidden.search(parttype):
201 raise ValueError(parttype)
199 raise ValueError(parttype)
202
200
203 def _makefpartparamsizes(nbparams):
201 def _makefpartparamsizes(nbparams):
204 """return a struct format to read part parameter sizes
202 """return a struct format to read part parameter sizes
205
203
206 The number parameters is variable so we need to build that format
204 The number parameters is variable so we need to build that format
207 dynamically.
205 dynamically.
208 """
206 """
209 return '>'+('BB'*nbparams)
207 return '>'+('BB'*nbparams)
210
208
211 parthandlermapping = {}
209 parthandlermapping = {}
212
210
213 def parthandler(parttype, params=()):
211 def parthandler(parttype, params=()):
214 """decorator that register a function as a bundle2 part handler
212 """decorator that register a function as a bundle2 part handler
215
213
216 eg::
214 eg::
217
215
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
216 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 def myparttypehandler(...):
217 def myparttypehandler(...):
220 '''process a part of type "my part".'''
218 '''process a part of type "my part".'''
221 ...
219 ...
222 """
220 """
223 validateparttype(parttype)
221 validateparttype(parttype)
224 def _decorator(func):
222 def _decorator(func):
225 lparttype = parttype.lower() # enforce lower case matching.
223 lparttype = parttype.lower() # enforce lower case matching.
226 assert lparttype not in parthandlermapping
224 assert lparttype not in parthandlermapping
227 parthandlermapping[lparttype] = func
225 parthandlermapping[lparttype] = func
228 func.params = frozenset(params)
226 func.params = frozenset(params)
229 return func
227 return func
230 return _decorator
228 return _decorator
231
229
232 class unbundlerecords(object):
230 class unbundlerecords(object):
233 """keep record of what happens during and unbundle
231 """keep record of what happens during and unbundle
234
232
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
233 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 category of record and obj is an arbitrary object.
234 category of record and obj is an arbitrary object.
237
235
238 `records['cat']` will return all entries of this category 'cat'.
236 `records['cat']` will return all entries of this category 'cat'.
239
237
240 Iterating on the object itself will yield `('category', obj)` tuples
238 Iterating on the object itself will yield `('category', obj)` tuples
241 for all entries.
239 for all entries.
242
240
243 All iterations happens in chronological order.
241 All iterations happens in chronological order.
244 """
242 """
245
243
246 def __init__(self):
244 def __init__(self):
247 self._categories = {}
245 self._categories = {}
248 self._sequences = []
246 self._sequences = []
249 self._replies = {}
247 self._replies = {}
250
248
251 def add(self, category, entry, inreplyto=None):
249 def add(self, category, entry, inreplyto=None):
252 """add a new record of a given category.
250 """add a new record of a given category.
253
251
254 The entry can then be retrieved in the list returned by
252 The entry can then be retrieved in the list returned by
255 self['category']."""
253 self['category']."""
256 self._categories.setdefault(category, []).append(entry)
254 self._categories.setdefault(category, []).append(entry)
257 self._sequences.append((category, entry))
255 self._sequences.append((category, entry))
258 if inreplyto is not None:
256 if inreplyto is not None:
259 self.getreplies(inreplyto).add(category, entry)
257 self.getreplies(inreplyto).add(category, entry)
260
258
261 def getreplies(self, partid):
259 def getreplies(self, partid):
262 """get the records that are replies to a specific part"""
260 """get the records that are replies to a specific part"""
263 return self._replies.setdefault(partid, unbundlerecords())
261 return self._replies.setdefault(partid, unbundlerecords())
264
262
265 def __getitem__(self, cat):
263 def __getitem__(self, cat):
266 return tuple(self._categories.get(cat, ()))
264 return tuple(self._categories.get(cat, ()))
267
265
268 def __iter__(self):
266 def __iter__(self):
269 return iter(self._sequences)
267 return iter(self._sequences)
270
268
271 def __len__(self):
269 def __len__(self):
272 return len(self._sequences)
270 return len(self._sequences)
273
271
274 def __nonzero__(self):
272 def __nonzero__(self):
275 return bool(self._sequences)
273 return bool(self._sequences)
276
274
277 __bool__ = __nonzero__
275 __bool__ = __nonzero__
278
276
279 class bundleoperation(object):
277 class bundleoperation(object):
280 """an object that represents a single bundling process
278 """an object that represents a single bundling process
281
279
282 Its purpose is to carry unbundle-related objects and states.
280 Its purpose is to carry unbundle-related objects and states.
283
281
284 A new object should be created at the beginning of each bundle processing.
282 A new object should be created at the beginning of each bundle processing.
285 The object is to be returned by the processing function.
283 The object is to be returned by the processing function.
286
284
287 The object has very little content now it will ultimately contain:
285 The object has very little content now it will ultimately contain:
288 * an access to the repo the bundle is applied to,
286 * an access to the repo the bundle is applied to,
289 * a ui object,
287 * a ui object,
290 * a way to retrieve a transaction to add changes to the repo,
288 * a way to retrieve a transaction to add changes to the repo,
291 * a way to record the result of processing each part,
289 * a way to record the result of processing each part,
292 * a way to construct a bundle response when applicable.
290 * a way to construct a bundle response when applicable.
293 """
291 """
294
292
295 def __init__(self, repo, transactiongetter, captureoutput=True):
293 def __init__(self, repo, transactiongetter, captureoutput=True):
296 self.repo = repo
294 self.repo = repo
297 self.ui = repo.ui
295 self.ui = repo.ui
298 self.records = unbundlerecords()
296 self.records = unbundlerecords()
299 self.reply = None
297 self.reply = None
300 self.captureoutput = captureoutput
298 self.captureoutput = captureoutput
301 self.hookargs = {}
299 self.hookargs = {}
302 self._gettransaction = transactiongetter
300 self._gettransaction = transactiongetter
303
301
304 def gettransaction(self):
302 def gettransaction(self):
305 transaction = self._gettransaction()
303 transaction = self._gettransaction()
306
304
307 if self.hookargs:
305 if self.hookargs:
308 # the ones added to the transaction supercede those added
306 # the ones added to the transaction supercede those added
309 # to the operation.
307 # to the operation.
310 self.hookargs.update(transaction.hookargs)
308 self.hookargs.update(transaction.hookargs)
311 transaction.hookargs = self.hookargs
309 transaction.hookargs = self.hookargs
312
310
313 # mark the hookargs as flushed. further attempts to add to
311 # mark the hookargs as flushed. further attempts to add to
314 # hookargs will result in an abort.
312 # hookargs will result in an abort.
315 self.hookargs = None
313 self.hookargs = None
316
314
317 return transaction
315 return transaction
318
316
319 def addhookargs(self, hookargs):
317 def addhookargs(self, hookargs):
320 if self.hookargs is None:
318 if self.hookargs is None:
321 raise error.ProgrammingError('attempted to add hookargs to '
319 raise error.ProgrammingError('attempted to add hookargs to '
322 'operation after transaction started')
320 'operation after transaction started')
323 self.hookargs.update(hookargs)
321 self.hookargs.update(hookargs)
324
322
325 class TransactionUnavailable(RuntimeError):
323 class TransactionUnavailable(RuntimeError):
326 pass
324 pass
327
325
328 def _notransaction():
326 def _notransaction():
329 """default method to get a transaction while processing a bundle
327 """default method to get a transaction while processing a bundle
330
328
331 Raise an exception to highlight the fact that no transaction was expected
329 Raise an exception to highlight the fact that no transaction was expected
332 to be created"""
330 to be created"""
333 raise TransactionUnavailable()
331 raise TransactionUnavailable()
334
332
335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
333 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
336 # transform me into unbundler.apply() as soon as the freeze is lifted
334 # transform me into unbundler.apply() as soon as the freeze is lifted
337 if isinstance(unbundler, unbundle20):
335 if isinstance(unbundler, unbundle20):
338 tr.hookargs['bundle2'] = '1'
336 tr.hookargs['bundle2'] = '1'
339 if source is not None and 'source' not in tr.hookargs:
337 if source is not None and 'source' not in tr.hookargs:
340 tr.hookargs['source'] = source
338 tr.hookargs['source'] = source
341 if url is not None and 'url' not in tr.hookargs:
339 if url is not None and 'url' not in tr.hookargs:
342 tr.hookargs['url'] = url
340 tr.hookargs['url'] = url
343 return processbundle(repo, unbundler, lambda: tr)
341 return processbundle(repo, unbundler, lambda: tr)
344 else:
342 else:
345 # the transactiongetter won't be used, but we might as well set it
343 # the transactiongetter won't be used, but we might as well set it
346 op = bundleoperation(repo, lambda: tr)
344 op = bundleoperation(repo, lambda: tr)
347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
345 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
348 return op
346 return op
349
347
350 class partiterator(object):
348 class partiterator(object):
351 def __init__(self, repo, op, unbundler):
349 def __init__(self, repo, op, unbundler):
352 self.repo = repo
350 self.repo = repo
353 self.op = op
351 self.op = op
354 self.unbundler = unbundler
352 self.unbundler = unbundler
355 self.iterator = None
353 self.iterator = None
356 self.count = 0
354 self.count = 0
357 self.current = None
355 self.current = None
358
356
359 def __enter__(self):
357 def __enter__(self):
360 def func():
358 def func():
361 itr = enumerate(self.unbundler.iterparts())
359 itr = enumerate(self.unbundler.iterparts())
362 for count, p in itr:
360 for count, p in itr:
363 self.count = count
361 self.count = count
364 self.current = p
362 self.current = p
365 yield p
363 yield p
366 p.seek(0, 2)
364 p.seek(0, 2)
367 self.current = None
365 self.current = None
368 self.iterator = func()
366 self.iterator = func()
369 return self.iterator
367 return self.iterator
370
368
371 def __exit__(self, type, exc, tb):
369 def __exit__(self, type, exc, tb):
372 if not self.iterator:
370 if not self.iterator:
373 return
371 return
374
372
375 if exc:
373 if exc:
376 # If exiting or interrupted, do not attempt to seek the stream in
374 # If exiting or interrupted, do not attempt to seek the stream in
377 # the finally block below. This makes abort faster.
375 # the finally block below. This makes abort faster.
378 if (self.current and
376 if (self.current and
379 not isinstance(exc, (SystemExit, KeyboardInterrupt))):
377 not isinstance(exc, (SystemExit, KeyboardInterrupt))):
380 # consume the part content to not corrupt the stream.
378 # consume the part content to not corrupt the stream.
381 self.current.seek(0, 2)
379 self.current.seek(0, 2)
382
380
383 # Any exceptions seeking to the end of the bundle at this point are
381 # Any exceptions seeking to the end of the bundle at this point are
384 # almost certainly related to the underlying stream being bad.
382 # almost certainly related to the underlying stream being bad.
385 # And, chances are that the exception we're handling is related to
383 # And, chances are that the exception we're handling is related to
386 # getting in that bad state. So, we swallow the seeking error and
384 # getting in that bad state. So, we swallow the seeking error and
387 # re-raise the original error.
385 # re-raise the original error.
388 seekerror = False
386 seekerror = False
389 try:
387 try:
390 for part in self.iterator:
388 for part in self.iterator:
391 # consume the bundle content
389 # consume the bundle content
392 part.seek(0, 2)
390 part.seek(0, 2)
393 except Exception:
391 except Exception:
394 seekerror = True
392 seekerror = True
395
393
396 # Small hack to let caller code distinguish exceptions from bundle2
394 # Small hack to let caller code distinguish exceptions from bundle2
397 # processing from processing the old format. This is mostly needed
395 # processing from processing the old format. This is mostly needed
398 # to handle different return codes to unbundle according to the type
396 # to handle different return codes to unbundle according to the type
399 # of bundle. We should probably clean up or drop this return code
397 # of bundle. We should probably clean up or drop this return code
400 # craziness in a future version.
398 # craziness in a future version.
401 exc.duringunbundle2 = True
399 exc.duringunbundle2 = True
402 salvaged = []
400 salvaged = []
403 replycaps = None
401 replycaps = None
404 if self.op.reply is not None:
402 if self.op.reply is not None:
405 salvaged = self.op.reply.salvageoutput()
403 salvaged = self.op.reply.salvageoutput()
406 replycaps = self.op.reply.capabilities
404 replycaps = self.op.reply.capabilities
407 exc._replycaps = replycaps
405 exc._replycaps = replycaps
408 exc._bundle2salvagedoutput = salvaged
406 exc._bundle2salvagedoutput = salvaged
409
407
410 # Re-raising from a variable loses the original stack. So only use
408 # Re-raising from a variable loses the original stack. So only use
411 # that form if we need to.
409 # that form if we need to.
412 if seekerror:
410 if seekerror:
413 raise exc
411 raise exc
414
412
415 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
413 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
416 self.count)
414 self.count)
417
415
418 def processbundle(repo, unbundler, transactiongetter=None, op=None):
416 def processbundle(repo, unbundler, transactiongetter=None, op=None):
419 """This function process a bundle, apply effect to/from a repo
417 """This function process a bundle, apply effect to/from a repo
420
418
421 It iterates over each part then searches for and uses the proper handling
419 It iterates over each part then searches for and uses the proper handling
422 code to process the part. Parts are processed in order.
420 code to process the part. Parts are processed in order.
423
421
424 Unknown Mandatory part will abort the process.
422 Unknown Mandatory part will abort the process.
425
423
426 It is temporarily possible to provide a prebuilt bundleoperation to the
424 It is temporarily possible to provide a prebuilt bundleoperation to the
427 function. This is used to ensure output is properly propagated in case of
425 function. This is used to ensure output is properly propagated in case of
428 an error during the unbundling. This output capturing part will likely be
426 an error during the unbundling. This output capturing part will likely be
429 reworked and this ability will probably go away in the process.
427 reworked and this ability will probably go away in the process.
430 """
428 """
431 if op is None:
429 if op is None:
432 if transactiongetter is None:
430 if transactiongetter is None:
433 transactiongetter = _notransaction
431 transactiongetter = _notransaction
434 op = bundleoperation(repo, transactiongetter)
432 op = bundleoperation(repo, transactiongetter)
435 # todo:
433 # todo:
436 # - replace this is a init function soon.
434 # - replace this is a init function soon.
437 # - exception catching
435 # - exception catching
438 unbundler.params
436 unbundler.params
439 if repo.ui.debugflag:
437 if repo.ui.debugflag:
440 msg = ['bundle2-input-bundle:']
438 msg = ['bundle2-input-bundle:']
441 if unbundler.params:
439 if unbundler.params:
442 msg.append(' %i params' % len(unbundler.params))
440 msg.append(' %i params' % len(unbundler.params))
443 if op._gettransaction is None or op._gettransaction is _notransaction:
441 if op._gettransaction is None or op._gettransaction is _notransaction:
444 msg.append(' no-transaction')
442 msg.append(' no-transaction')
445 else:
443 else:
446 msg.append(' with-transaction')
444 msg.append(' with-transaction')
447 msg.append('\n')
445 msg.append('\n')
448 repo.ui.debug(''.join(msg))
446 repo.ui.debug(''.join(msg))
449
447
450 processparts(repo, op, unbundler)
448 processparts(repo, op, unbundler)
451
449
452 return op
450 return op
453
451
454 def processparts(repo, op, unbundler):
452 def processparts(repo, op, unbundler):
455 with partiterator(repo, op, unbundler) as parts:
453 with partiterator(repo, op, unbundler) as parts:
456 for part in parts:
454 for part in parts:
457 _processpart(op, part)
455 _processpart(op, part)
458
456
459 def _processchangegroup(op, cg, tr, source, url, **kwargs):
457 def _processchangegroup(op, cg, tr, source, url, **kwargs):
460 ret = cg.apply(op.repo, tr, source, url, **kwargs)
458 ret = cg.apply(op.repo, tr, source, url, **kwargs)
461 op.records.add('changegroup', {
459 op.records.add('changegroup', {
462 'return': ret,
460 'return': ret,
463 })
461 })
464 return ret
462 return ret
465
463
466 def _gethandler(op, part):
464 def _gethandler(op, part):
467 status = 'unknown' # used by debug output
465 status = 'unknown' # used by debug output
468 try:
466 try:
469 handler = parthandlermapping.get(part.type)
467 handler = parthandlermapping.get(part.type)
470 if handler is None:
468 if handler is None:
471 status = 'unsupported-type'
469 status = 'unsupported-type'
472 raise error.BundleUnknownFeatureError(parttype=part.type)
470 raise error.BundleUnknownFeatureError(parttype=part.type)
473 indebug(op.ui, 'found a handler for part %s' % part.type)
471 indebug(op.ui, 'found a handler for part %s' % part.type)
474 unknownparams = part.mandatorykeys - handler.params
472 unknownparams = part.mandatorykeys - handler.params
475 if unknownparams:
473 if unknownparams:
476 unknownparams = list(unknownparams)
474 unknownparams = list(unknownparams)
477 unknownparams.sort()
475 unknownparams.sort()
478 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
476 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
479 raise error.BundleUnknownFeatureError(parttype=part.type,
477 raise error.BundleUnknownFeatureError(parttype=part.type,
480 params=unknownparams)
478 params=unknownparams)
481 status = 'supported'
479 status = 'supported'
482 except error.BundleUnknownFeatureError as exc:
480 except error.BundleUnknownFeatureError as exc:
483 if part.mandatory: # mandatory parts
481 if part.mandatory: # mandatory parts
484 raise
482 raise
485 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
483 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
486 return # skip to part processing
484 return # skip to part processing
487 finally:
485 finally:
488 if op.ui.debugflag:
486 if op.ui.debugflag:
489 msg = ['bundle2-input-part: "%s"' % part.type]
487 msg = ['bundle2-input-part: "%s"' % part.type]
490 if not part.mandatory:
488 if not part.mandatory:
491 msg.append(' (advisory)')
489 msg.append(' (advisory)')
492 nbmp = len(part.mandatorykeys)
490 nbmp = len(part.mandatorykeys)
493 nbap = len(part.params) - nbmp
491 nbap = len(part.params) - nbmp
494 if nbmp or nbap:
492 if nbmp or nbap:
495 msg.append(' (params:')
493 msg.append(' (params:')
496 if nbmp:
494 if nbmp:
497 msg.append(' %i mandatory' % nbmp)
495 msg.append(' %i mandatory' % nbmp)
498 if nbap:
496 if nbap:
499 msg.append(' %i advisory' % nbmp)
497 msg.append(' %i advisory' % nbmp)
500 msg.append(')')
498 msg.append(')')
501 msg.append(' %s\n' % status)
499 msg.append(' %s\n' % status)
502 op.ui.debug(''.join(msg))
500 op.ui.debug(''.join(msg))
503
501
504 return handler
502 return handler
505
503
506 def _processpart(op, part):
504 def _processpart(op, part):
507 """process a single part from a bundle
505 """process a single part from a bundle
508
506
509 The part is guaranteed to have been fully consumed when the function exits
507 The part is guaranteed to have been fully consumed when the function exits
510 (even if an exception is raised)."""
508 (even if an exception is raised)."""
511 handler = _gethandler(op, part)
509 handler = _gethandler(op, part)
512 if handler is None:
510 if handler is None:
513 return
511 return
514
512
515 # handler is called outside the above try block so that we don't
513 # handler is called outside the above try block so that we don't
516 # risk catching KeyErrors from anything other than the
514 # risk catching KeyErrors from anything other than the
517 # parthandlermapping lookup (any KeyError raised by handler()
515 # parthandlermapping lookup (any KeyError raised by handler()
518 # itself represents a defect of a different variety).
516 # itself represents a defect of a different variety).
519 output = None
517 output = None
520 if op.captureoutput and op.reply is not None:
518 if op.captureoutput and op.reply is not None:
521 op.ui.pushbuffer(error=True, subproc=True)
519 op.ui.pushbuffer(error=True, subproc=True)
522 output = ''
520 output = ''
523 try:
521 try:
524 handler(op, part)
522 handler(op, part)
525 finally:
523 finally:
526 if output is not None:
524 if output is not None:
527 output = op.ui.popbuffer()
525 output = op.ui.popbuffer()
528 if output:
526 if output:
529 outpart = op.reply.newpart('output', data=output,
527 outpart = op.reply.newpart('output', data=output,
530 mandatory=False)
528 mandatory=False)
531 outpart.addparam(
529 outpart.addparam(
532 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
530 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
533
531
534 def decodecaps(blob):
532 def decodecaps(blob):
535 """decode a bundle2 caps bytes blob into a dictionary
533 """decode a bundle2 caps bytes blob into a dictionary
536
534
537 The blob is a list of capabilities (one per line)
535 The blob is a list of capabilities (one per line)
538 Capabilities may have values using a line of the form::
536 Capabilities may have values using a line of the form::
539
537
540 capability=value1,value2,value3
538 capability=value1,value2,value3
541
539
542 The values are always a list."""
540 The values are always a list."""
543 caps = {}
541 caps = {}
544 for line in blob.splitlines():
542 for line in blob.splitlines():
545 if not line:
543 if not line:
546 continue
544 continue
547 if '=' not in line:
545 if '=' not in line:
548 key, vals = line, ()
546 key, vals = line, ()
549 else:
547 else:
550 key, vals = line.split('=', 1)
548 key, vals = line.split('=', 1)
551 vals = vals.split(',')
549 vals = vals.split(',')
552 key = urlreq.unquote(key)
550 key = urlreq.unquote(key)
553 vals = [urlreq.unquote(v) for v in vals]
551 vals = [urlreq.unquote(v) for v in vals]
554 caps[key] = vals
552 caps[key] = vals
555 return caps
553 return caps
556
554
557 def encodecaps(caps):
555 def encodecaps(caps):
558 """encode a bundle2 caps dictionary into a bytes blob"""
556 """encode a bundle2 caps dictionary into a bytes blob"""
559 chunks = []
557 chunks = []
560 for ca in sorted(caps):
558 for ca in sorted(caps):
561 vals = caps[ca]
559 vals = caps[ca]
562 ca = urlreq.quote(ca)
560 ca = urlreq.quote(ca)
563 vals = [urlreq.quote(v) for v in vals]
561 vals = [urlreq.quote(v) for v in vals]
564 if vals:
562 if vals:
565 ca = "%s=%s" % (ca, ','.join(vals))
563 ca = "%s=%s" % (ca, ','.join(vals))
566 chunks.append(ca)
564 chunks.append(ca)
567 return '\n'.join(chunks)
565 return '\n'.join(chunks)
568
566
569 bundletypes = {
567 bundletypes = {
570 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
568 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
571 # since the unification ssh accepts a header but there
569 # since the unification ssh accepts a header but there
572 # is no capability signaling it.
570 # is no capability signaling it.
573 "HG20": (), # special-cased below
571 "HG20": (), # special-cased below
574 "HG10UN": ("HG10UN", 'UN'),
572 "HG10UN": ("HG10UN", 'UN'),
575 "HG10BZ": ("HG10", 'BZ'),
573 "HG10BZ": ("HG10", 'BZ'),
576 "HG10GZ": ("HG10GZ", 'GZ'),
574 "HG10GZ": ("HG10GZ", 'GZ'),
577 }
575 }
578
576
579 # hgweb uses this list to communicate its preferred type
577 # hgweb uses this list to communicate its preferred type
580 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
578 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
581
579
582 class bundle20(object):
580 class bundle20(object):
583 """represent an outgoing bundle2 container
581 """represent an outgoing bundle2 container
584
582
585 Use the `addparam` method to add stream level parameter. and `newpart` to
583 Use the `addparam` method to add stream level parameter. and `newpart` to
586 populate it. Then call `getchunks` to retrieve all the binary chunks of
584 populate it. Then call `getchunks` to retrieve all the binary chunks of
587 data that compose the bundle2 container."""
585 data that compose the bundle2 container."""
588
586
589 _magicstring = 'HG20'
587 _magicstring = 'HG20'
590
588
591 def __init__(self, ui, capabilities=()):
589 def __init__(self, ui, capabilities=()):
592 self.ui = ui
590 self.ui = ui
593 self._params = []
591 self._params = []
594 self._parts = []
592 self._parts = []
595 self.capabilities = dict(capabilities)
593 self.capabilities = dict(capabilities)
596 self._compengine = util.compengines.forbundletype('UN')
594 self._compengine = util.compengines.forbundletype('UN')
597 self._compopts = None
595 self._compopts = None
598
596
599 def setcompression(self, alg, compopts=None):
597 def setcompression(self, alg, compopts=None):
600 """setup core part compression to <alg>"""
598 """setup core part compression to <alg>"""
601 if alg in (None, 'UN'):
599 if alg in (None, 'UN'):
602 return
600 return
603 assert not any(n.lower() == 'compression' for n, v in self._params)
601 assert not any(n.lower() == 'compression' for n, v in self._params)
604 self.addparam('Compression', alg)
602 self.addparam('Compression', alg)
605 self._compengine = util.compengines.forbundletype(alg)
603 self._compengine = util.compengines.forbundletype(alg)
606 self._compopts = compopts
604 self._compopts = compopts
607
605
608 @property
606 @property
609 def nbparts(self):
607 def nbparts(self):
610 """total number of parts added to the bundler"""
608 """total number of parts added to the bundler"""
611 return len(self._parts)
609 return len(self._parts)
612
610
613 # methods used to defines the bundle2 content
611 # methods used to defines the bundle2 content
614 def addparam(self, name, value=None):
612 def addparam(self, name, value=None):
615 """add a stream level parameter"""
613 """add a stream level parameter"""
616 if not name:
614 if not name:
617 raise ValueError(r'empty parameter name')
615 raise ValueError(r'empty parameter name')
618 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
616 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
619 raise ValueError(r'non letter first character: %s' % name)
617 raise ValueError(r'non letter first character: %s' % name)
620 self._params.append((name, value))
618 self._params.append((name, value))
621
619
622 def addpart(self, part):
620 def addpart(self, part):
623 """add a new part to the bundle2 container
621 """add a new part to the bundle2 container
624
622
625 Parts contains the actual applicative payload."""
623 Parts contains the actual applicative payload."""
626 assert part.id is None
624 assert part.id is None
627 part.id = len(self._parts) # very cheap counter
625 part.id = len(self._parts) # very cheap counter
628 self._parts.append(part)
626 self._parts.append(part)
629
627
630 def newpart(self, typeid, *args, **kwargs):
628 def newpart(self, typeid, *args, **kwargs):
631 """create a new part and add it to the containers
629 """create a new part and add it to the containers
632
630
633 As the part is directly added to the containers. For now, this means
631 As the part is directly added to the containers. For now, this means
634 that any failure to properly initialize the part after calling
632 that any failure to properly initialize the part after calling
635 ``newpart`` should result in a failure of the whole bundling process.
633 ``newpart`` should result in a failure of the whole bundling process.
636
634
637 You can still fall back to manually create and add if you need better
635 You can still fall back to manually create and add if you need better
638 control."""
636 control."""
639 part = bundlepart(typeid, *args, **kwargs)
637 part = bundlepart(typeid, *args, **kwargs)
640 self.addpart(part)
638 self.addpart(part)
641 return part
639 return part
642
640
643 # methods used to generate the bundle2 stream
641 # methods used to generate the bundle2 stream
644 def getchunks(self):
642 def getchunks(self):
645 if self.ui.debugflag:
643 if self.ui.debugflag:
646 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
644 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
647 if self._params:
645 if self._params:
648 msg.append(' (%i params)' % len(self._params))
646 msg.append(' (%i params)' % len(self._params))
649 msg.append(' %i parts total\n' % len(self._parts))
647 msg.append(' %i parts total\n' % len(self._parts))
650 self.ui.debug(''.join(msg))
648 self.ui.debug(''.join(msg))
651 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
649 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
652 yield self._magicstring
650 yield self._magicstring
653 param = self._paramchunk()
651 param = self._paramchunk()
654 outdebug(self.ui, 'bundle parameter: %s' % param)
652 outdebug(self.ui, 'bundle parameter: %s' % param)
655 yield _pack(_fstreamparamsize, len(param))
653 yield _pack(_fstreamparamsize, len(param))
656 if param:
654 if param:
657 yield param
655 yield param
658 for chunk in self._compengine.compressstream(self._getcorechunk(),
656 for chunk in self._compengine.compressstream(self._getcorechunk(),
659 self._compopts):
657 self._compopts):
660 yield chunk
658 yield chunk
661
659
662 def _paramchunk(self):
660 def _paramchunk(self):
663 """return a encoded version of all stream parameters"""
661 """return a encoded version of all stream parameters"""
664 blocks = []
662 blocks = []
665 for par, value in self._params:
663 for par, value in self._params:
666 par = urlreq.quote(par)
664 par = urlreq.quote(par)
667 if value is not None:
665 if value is not None:
668 value = urlreq.quote(value)
666 value = urlreq.quote(value)
669 par = '%s=%s' % (par, value)
667 par = '%s=%s' % (par, value)
670 blocks.append(par)
668 blocks.append(par)
671 return ' '.join(blocks)
669 return ' '.join(blocks)
672
670
673 def _getcorechunk(self):
671 def _getcorechunk(self):
674 """yield chunk for the core part of the bundle
672 """yield chunk for the core part of the bundle
675
673
676 (all but headers and parameters)"""
674 (all but headers and parameters)"""
677 outdebug(self.ui, 'start of parts')
675 outdebug(self.ui, 'start of parts')
678 for part in self._parts:
676 for part in self._parts:
679 outdebug(self.ui, 'bundle part: "%s"' % part.type)
677 outdebug(self.ui, 'bundle part: "%s"' % part.type)
680 for chunk in part.getchunks(ui=self.ui):
678 for chunk in part.getchunks(ui=self.ui):
681 yield chunk
679 yield chunk
682 outdebug(self.ui, 'end of bundle')
680 outdebug(self.ui, 'end of bundle')
683 yield _pack(_fpartheadersize, 0)
681 yield _pack(_fpartheadersize, 0)
684
682
685
683
686 def salvageoutput(self):
684 def salvageoutput(self):
687 """return a list with a copy of all output parts in the bundle
685 """return a list with a copy of all output parts in the bundle
688
686
689 This is meant to be used during error handling to make sure we preserve
687 This is meant to be used during error handling to make sure we preserve
690 server output"""
688 server output"""
691 salvaged = []
689 salvaged = []
692 for part in self._parts:
690 for part in self._parts:
693 if part.type.startswith('output'):
691 if part.type.startswith('output'):
694 salvaged.append(part.copy())
692 salvaged.append(part.copy())
695 return salvaged
693 return salvaged
696
694
697
695
698 class unpackermixin(object):
696 class unpackermixin(object):
699 """A mixin to extract bytes and struct data from a stream"""
697 """A mixin to extract bytes and struct data from a stream"""
700
698
701 def __init__(self, fp):
699 def __init__(self, fp):
702 self._fp = fp
700 self._fp = fp
703
701
704 def _unpack(self, format):
702 def _unpack(self, format):
705 """unpack this struct format from the stream
703 """unpack this struct format from the stream
706
704
707 This method is meant for internal usage by the bundle2 protocol only.
705 This method is meant for internal usage by the bundle2 protocol only.
708 They directly manipulate the low level stream including bundle2 level
706 They directly manipulate the low level stream including bundle2 level
709 instruction.
707 instruction.
710
708
711 Do not use it to implement higher-level logic or methods."""
709 Do not use it to implement higher-level logic or methods."""
712 data = self._readexact(struct.calcsize(format))
710 data = self._readexact(struct.calcsize(format))
713 return _unpack(format, data)
711 return _unpack(format, data)
714
712
715 def _readexact(self, size):
713 def _readexact(self, size):
716 """read exactly <size> bytes from the stream
714 """read exactly <size> bytes from the stream
717
715
718 This method is meant for internal usage by the bundle2 protocol only.
716 This method is meant for internal usage by the bundle2 protocol only.
719 They directly manipulate the low level stream including bundle2 level
717 They directly manipulate the low level stream including bundle2 level
720 instruction.
718 instruction.
721
719
722 Do not use it to implement higher-level logic or methods."""
720 Do not use it to implement higher-level logic or methods."""
723 return changegroup.readexactly(self._fp, size)
721 return changegroup.readexactly(self._fp, size)
724
722
725 def getunbundler(ui, fp, magicstring=None):
723 def getunbundler(ui, fp, magicstring=None):
726 """return a valid unbundler object for a given magicstring"""
724 """return a valid unbundler object for a given magicstring"""
727 if magicstring is None:
725 if magicstring is None:
728 magicstring = changegroup.readexactly(fp, 4)
726 magicstring = changegroup.readexactly(fp, 4)
729 magic, version = magicstring[0:2], magicstring[2:4]
727 magic, version = magicstring[0:2], magicstring[2:4]
730 if magic != 'HG':
728 if magic != 'HG':
731 ui.debug(
729 ui.debug(
732 "error: invalid magic: %r (version %r), should be 'HG'\n"
730 "error: invalid magic: %r (version %r), should be 'HG'\n"
733 % (magic, version))
731 % (magic, version))
734 raise error.Abort(_('not a Mercurial bundle'))
732 raise error.Abort(_('not a Mercurial bundle'))
735 unbundlerclass = formatmap.get(version)
733 unbundlerclass = formatmap.get(version)
736 if unbundlerclass is None:
734 if unbundlerclass is None:
737 raise error.Abort(_('unknown bundle version %s') % version)
735 raise error.Abort(_('unknown bundle version %s') % version)
738 unbundler = unbundlerclass(ui, fp)
736 unbundler = unbundlerclass(ui, fp)
739 indebug(ui, 'start processing of %s stream' % magicstring)
737 indebug(ui, 'start processing of %s stream' % magicstring)
740 return unbundler
738 return unbundler
741
739
742 class unbundle20(unpackermixin):
740 class unbundle20(unpackermixin):
743 """interpret a bundle2 stream
741 """interpret a bundle2 stream
744
742
745 This class is fed with a binary stream and yields parts through its
743 This class is fed with a binary stream and yields parts through its
746 `iterparts` methods."""
744 `iterparts` methods."""
747
745
748 _magicstring = 'HG20'
746 _magicstring = 'HG20'
749
747
750 def __init__(self, ui, fp):
748 def __init__(self, ui, fp):
751 """If header is specified, we do not read it out of the stream."""
749 """If header is specified, we do not read it out of the stream."""
752 self.ui = ui
750 self.ui = ui
753 self._compengine = util.compengines.forbundletype('UN')
751 self._compengine = util.compengines.forbundletype('UN')
754 self._compressed = None
752 self._compressed = None
755 super(unbundle20, self).__init__(fp)
753 super(unbundle20, self).__init__(fp)
756
754
757 @util.propertycache
755 @util.propertycache
758 def params(self):
756 def params(self):
759 """dictionary of stream level parameters"""
757 """dictionary of stream level parameters"""
760 indebug(self.ui, 'reading bundle2 stream parameters')
758 indebug(self.ui, 'reading bundle2 stream parameters')
761 params = {}
759 params = {}
762 paramssize = self._unpack(_fstreamparamsize)[0]
760 paramssize = self._unpack(_fstreamparamsize)[0]
763 if paramssize < 0:
761 if paramssize < 0:
764 raise error.BundleValueError('negative bundle param size: %i'
762 raise error.BundleValueError('negative bundle param size: %i'
765 % paramssize)
763 % paramssize)
766 if paramssize:
764 if paramssize:
767 params = self._readexact(paramssize)
765 params = self._readexact(paramssize)
768 params = self._processallparams(params)
766 params = self._processallparams(params)
769 return params
767 return params
770
768
771 def _processallparams(self, paramsblock):
769 def _processallparams(self, paramsblock):
772 """"""
770 """"""
773 params = util.sortdict()
771 params = util.sortdict()
774 for p in paramsblock.split(' '):
772 for p in paramsblock.split(' '):
775 p = p.split('=', 1)
773 p = p.split('=', 1)
776 p = [urlreq.unquote(i) for i in p]
774 p = [urlreq.unquote(i) for i in p]
777 if len(p) < 2:
775 if len(p) < 2:
778 p.append(None)
776 p.append(None)
779 self._processparam(*p)
777 self._processparam(*p)
780 params[p[0]] = p[1]
778 params[p[0]] = p[1]
781 return params
779 return params
782
780
783
781
784 def _processparam(self, name, value):
782 def _processparam(self, name, value):
785 """process a parameter, applying its effect if needed
783 """process a parameter, applying its effect if needed
786
784
787 Parameter starting with a lower case letter are advisory and will be
785 Parameter starting with a lower case letter are advisory and will be
788 ignored when unknown. Those starting with an upper case letter are
786 ignored when unknown. Those starting with an upper case letter are
789 mandatory and will this function will raise a KeyError when unknown.
787 mandatory and will this function will raise a KeyError when unknown.
790
788
791 Note: no option are currently supported. Any input will be either
789 Note: no option are currently supported. Any input will be either
792 ignored or failing.
790 ignored or failing.
793 """
791 """
794 if not name:
792 if not name:
795 raise ValueError(r'empty parameter name')
793 raise ValueError(r'empty parameter name')
796 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
794 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
797 raise ValueError(r'non letter first character: %s' % name)
795 raise ValueError(r'non letter first character: %s' % name)
798 try:
796 try:
799 handler = b2streamparamsmap[name.lower()]
797 handler = b2streamparamsmap[name.lower()]
800 except KeyError:
798 except KeyError:
801 if name[0:1].islower():
799 if name[0:1].islower():
802 indebug(self.ui, "ignoring unknown parameter %s" % name)
800 indebug(self.ui, "ignoring unknown parameter %s" % name)
803 else:
801 else:
804 raise error.BundleUnknownFeatureError(params=(name,))
802 raise error.BundleUnknownFeatureError(params=(name,))
805 else:
803 else:
806 handler(self, name, value)
804 handler(self, name, value)
807
805
808 def _forwardchunks(self):
806 def _forwardchunks(self):
809 """utility to transfer a bundle2 as binary
807 """utility to transfer a bundle2 as binary
810
808
811 This is made necessary by the fact the 'getbundle' command over 'ssh'
809 This is made necessary by the fact the 'getbundle' command over 'ssh'
812 have no way to know then the reply end, relying on the bundle to be
810 have no way to know then the reply end, relying on the bundle to be
813 interpreted to know its end. This is terrible and we are sorry, but we
811 interpreted to know its end. This is terrible and we are sorry, but we
814 needed to move forward to get general delta enabled.
812 needed to move forward to get general delta enabled.
815 """
813 """
816 yield self._magicstring
814 yield self._magicstring
817 assert 'params' not in vars(self)
815 assert 'params' not in vars(self)
818 paramssize = self._unpack(_fstreamparamsize)[0]
816 paramssize = self._unpack(_fstreamparamsize)[0]
819 if paramssize < 0:
817 if paramssize < 0:
820 raise error.BundleValueError('negative bundle param size: %i'
818 raise error.BundleValueError('negative bundle param size: %i'
821 % paramssize)
819 % paramssize)
822 yield _pack(_fstreamparamsize, paramssize)
820 yield _pack(_fstreamparamsize, paramssize)
823 if paramssize:
821 if paramssize:
824 params = self._readexact(paramssize)
822 params = self._readexact(paramssize)
825 self._processallparams(params)
823 self._processallparams(params)
826 yield params
824 yield params
827 assert self._compengine.bundletype == 'UN'
825 assert self._compengine.bundletype == 'UN'
828 # From there, payload might need to be decompressed
826 # From there, payload might need to be decompressed
829 self._fp = self._compengine.decompressorreader(self._fp)
827 self._fp = self._compengine.decompressorreader(self._fp)
830 emptycount = 0
828 emptycount = 0
831 while emptycount < 2:
829 while emptycount < 2:
832 # so we can brainlessly loop
830 # so we can brainlessly loop
833 assert _fpartheadersize == _fpayloadsize
831 assert _fpartheadersize == _fpayloadsize
834 size = self._unpack(_fpartheadersize)[0]
832 size = self._unpack(_fpartheadersize)[0]
835 yield _pack(_fpartheadersize, size)
833 yield _pack(_fpartheadersize, size)
836 if size:
834 if size:
837 emptycount = 0
835 emptycount = 0
838 else:
836 else:
839 emptycount += 1
837 emptycount += 1
840 continue
838 continue
841 if size == flaginterrupt:
839 if size == flaginterrupt:
842 continue
840 continue
843 elif size < 0:
841 elif size < 0:
844 raise error.BundleValueError('negative chunk size: %i')
842 raise error.BundleValueError('negative chunk size: %i')
845 yield self._readexact(size)
843 yield self._readexact(size)
846
844
847
845
848 def iterparts(self):
846 def iterparts(self):
849 """yield all parts contained in the stream"""
847 """yield all parts contained in the stream"""
850 # make sure param have been loaded
848 # make sure param have been loaded
851 self.params
849 self.params
852 # From there, payload need to be decompressed
850 # From there, payload need to be decompressed
853 self._fp = self._compengine.decompressorreader(self._fp)
851 self._fp = self._compengine.decompressorreader(self._fp)
854 indebug(self.ui, 'start extraction of bundle2 parts')
852 indebug(self.ui, 'start extraction of bundle2 parts')
855 headerblock = self._readpartheader()
853 headerblock = self._readpartheader()
856 while headerblock is not None:
854 while headerblock is not None:
857 part = unbundlepart(self.ui, headerblock, self._fp)
855 part = unbundlepart(self.ui, headerblock, self._fp)
858 yield part
856 yield part
859 # Seek to the end of the part to force it's consumption so the next
857 # Seek to the end of the part to force it's consumption so the next
860 # part can be read. But then seek back to the beginning so the
858 # part can be read. But then seek back to the beginning so the
861 # code consuming this generator has a part that starts at 0.
859 # code consuming this generator has a part that starts at 0.
862 part.seek(0, 2)
860 part.seek(0, 2)
863 part.seek(0)
861 part.seek(0)
864 headerblock = self._readpartheader()
862 headerblock = self._readpartheader()
865 indebug(self.ui, 'end of bundle2 stream')
863 indebug(self.ui, 'end of bundle2 stream')
866
864
867 def _readpartheader(self):
865 def _readpartheader(self):
868 """reads a part header size and return the bytes blob
866 """reads a part header size and return the bytes blob
869
867
870 returns None if empty"""
868 returns None if empty"""
871 headersize = self._unpack(_fpartheadersize)[0]
869 headersize = self._unpack(_fpartheadersize)[0]
872 if headersize < 0:
870 if headersize < 0:
873 raise error.BundleValueError('negative part header size: %i'
871 raise error.BundleValueError('negative part header size: %i'
874 % headersize)
872 % headersize)
875 indebug(self.ui, 'part header size: %i' % headersize)
873 indebug(self.ui, 'part header size: %i' % headersize)
876 if headersize:
874 if headersize:
877 return self._readexact(headersize)
875 return self._readexact(headersize)
878 return None
876 return None
879
877
880 def compressed(self):
878 def compressed(self):
881 self.params # load params
879 self.params # load params
882 return self._compressed
880 return self._compressed
883
881
884 def close(self):
882 def close(self):
885 """close underlying file"""
883 """close underlying file"""
886 if util.safehasattr(self._fp, 'close'):
884 if util.safehasattr(self._fp, 'close'):
887 return self._fp.close()
885 return self._fp.close()
888
886
889 formatmap = {'20': unbundle20}
887 formatmap = {'20': unbundle20}
890
888
891 b2streamparamsmap = {}
889 b2streamparamsmap = {}
892
890
893 def b2streamparamhandler(name):
891 def b2streamparamhandler(name):
894 """register a handler for a stream level parameter"""
892 """register a handler for a stream level parameter"""
895 def decorator(func):
893 def decorator(func):
896 assert name not in formatmap
894 assert name not in formatmap
897 b2streamparamsmap[name] = func
895 b2streamparamsmap[name] = func
898 return func
896 return func
899 return decorator
897 return decorator
900
898
901 @b2streamparamhandler('compression')
899 @b2streamparamhandler('compression')
902 def processcompression(unbundler, param, value):
900 def processcompression(unbundler, param, value):
903 """read compression parameter and install payload decompression"""
901 """read compression parameter and install payload decompression"""
904 if value not in util.compengines.supportedbundletypes:
902 if value not in util.compengines.supportedbundletypes:
905 raise error.BundleUnknownFeatureError(params=(param,),
903 raise error.BundleUnknownFeatureError(params=(param,),
906 values=(value,))
904 values=(value,))
907 unbundler._compengine = util.compengines.forbundletype(value)
905 unbundler._compengine = util.compengines.forbundletype(value)
908 if value is not None:
906 if value is not None:
909 unbundler._compressed = True
907 unbundler._compressed = True
910
908
911 class bundlepart(object):
909 class bundlepart(object):
912 """A bundle2 part contains application level payload
910 """A bundle2 part contains application level payload
913
911
914 The part `type` is used to route the part to the application level
912 The part `type` is used to route the part to the application level
915 handler.
913 handler.
916
914
917 The part payload is contained in ``part.data``. It could be raw bytes or a
915 The part payload is contained in ``part.data``. It could be raw bytes or a
918 generator of byte chunks.
916 generator of byte chunks.
919
917
920 You can add parameters to the part using the ``addparam`` method.
918 You can add parameters to the part using the ``addparam`` method.
921 Parameters can be either mandatory (default) or advisory. Remote side
919 Parameters can be either mandatory (default) or advisory. Remote side
922 should be able to safely ignore the advisory ones.
920 should be able to safely ignore the advisory ones.
923
921
924 Both data and parameters cannot be modified after the generation has begun.
922 Both data and parameters cannot be modified after the generation has begun.
925 """
923 """
926
924
927 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
925 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
928 data='', mandatory=True):
926 data='', mandatory=True):
929 validateparttype(parttype)
927 validateparttype(parttype)
930 self.id = None
928 self.id = None
931 self.type = parttype
929 self.type = parttype
932 self._data = data
930 self._data = data
933 self._mandatoryparams = list(mandatoryparams)
931 self._mandatoryparams = list(mandatoryparams)
934 self._advisoryparams = list(advisoryparams)
932 self._advisoryparams = list(advisoryparams)
935 # checking for duplicated entries
933 # checking for duplicated entries
936 self._seenparams = set()
934 self._seenparams = set()
937 for pname, __ in self._mandatoryparams + self._advisoryparams:
935 for pname, __ in self._mandatoryparams + self._advisoryparams:
938 if pname in self._seenparams:
936 if pname in self._seenparams:
939 raise error.ProgrammingError('duplicated params: %s' % pname)
937 raise error.ProgrammingError('duplicated params: %s' % pname)
940 self._seenparams.add(pname)
938 self._seenparams.add(pname)
941 # status of the part's generation:
939 # status of the part's generation:
942 # - None: not started,
940 # - None: not started,
943 # - False: currently generated,
941 # - False: currently generated,
944 # - True: generation done.
942 # - True: generation done.
945 self._generated = None
943 self._generated = None
946 self.mandatory = mandatory
944 self.mandatory = mandatory
947
945
948 def __repr__(self):
946 def __repr__(self):
949 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
947 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
950 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
948 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
951 % (cls, id(self), self.id, self.type, self.mandatory))
949 % (cls, id(self), self.id, self.type, self.mandatory))
952
950
953 def copy(self):
951 def copy(self):
954 """return a copy of the part
952 """return a copy of the part
955
953
956 The new part have the very same content but no partid assigned yet.
954 The new part have the very same content but no partid assigned yet.
957 Parts with generated data cannot be copied."""
955 Parts with generated data cannot be copied."""
958 assert not util.safehasattr(self.data, 'next')
956 assert not util.safehasattr(self.data, 'next')
959 return self.__class__(self.type, self._mandatoryparams,
957 return self.__class__(self.type, self._mandatoryparams,
960 self._advisoryparams, self._data, self.mandatory)
958 self._advisoryparams, self._data, self.mandatory)
961
959
962 # methods used to defines the part content
960 # methods used to defines the part content
963 @property
961 @property
964 def data(self):
962 def data(self):
965 return self._data
963 return self._data
966
964
967 @data.setter
965 @data.setter
968 def data(self, data):
966 def data(self, data):
969 if self._generated is not None:
967 if self._generated is not None:
970 raise error.ReadOnlyPartError('part is being generated')
968 raise error.ReadOnlyPartError('part is being generated')
971 self._data = data
969 self._data = data
972
970
973 @property
971 @property
974 def mandatoryparams(self):
972 def mandatoryparams(self):
975 # make it an immutable tuple to force people through ``addparam``
973 # make it an immutable tuple to force people through ``addparam``
976 return tuple(self._mandatoryparams)
974 return tuple(self._mandatoryparams)
977
975
978 @property
976 @property
979 def advisoryparams(self):
977 def advisoryparams(self):
980 # make it an immutable tuple to force people through ``addparam``
978 # make it an immutable tuple to force people through ``addparam``
981 return tuple(self._advisoryparams)
979 return tuple(self._advisoryparams)
982
980
983 def addparam(self, name, value='', mandatory=True):
981 def addparam(self, name, value='', mandatory=True):
984 """add a parameter to the part
982 """add a parameter to the part
985
983
986 If 'mandatory' is set to True, the remote handler must claim support
984 If 'mandatory' is set to True, the remote handler must claim support
987 for this parameter or the unbundling will be aborted.
985 for this parameter or the unbundling will be aborted.
988
986
989 The 'name' and 'value' cannot exceed 255 bytes each.
987 The 'name' and 'value' cannot exceed 255 bytes each.
990 """
988 """
991 if self._generated is not None:
989 if self._generated is not None:
992 raise error.ReadOnlyPartError('part is being generated')
990 raise error.ReadOnlyPartError('part is being generated')
993 if name in self._seenparams:
991 if name in self._seenparams:
994 raise ValueError('duplicated params: %s' % name)
992 raise ValueError('duplicated params: %s' % name)
995 self._seenparams.add(name)
993 self._seenparams.add(name)
996 params = self._advisoryparams
994 params = self._advisoryparams
997 if mandatory:
995 if mandatory:
998 params = self._mandatoryparams
996 params = self._mandatoryparams
999 params.append((name, value))
997 params.append((name, value))
1000
998
1001 # methods used to generates the bundle2 stream
999 # methods used to generates the bundle2 stream
1002 def getchunks(self, ui):
1000 def getchunks(self, ui):
1003 if self._generated is not None:
1001 if self._generated is not None:
1004 raise error.ProgrammingError('part can only be consumed once')
1002 raise error.ProgrammingError('part can only be consumed once')
1005 self._generated = False
1003 self._generated = False
1006
1004
1007 if ui.debugflag:
1005 if ui.debugflag:
1008 msg = ['bundle2-output-part: "%s"' % self.type]
1006 msg = ['bundle2-output-part: "%s"' % self.type]
1009 if not self.mandatory:
1007 if not self.mandatory:
1010 msg.append(' (advisory)')
1008 msg.append(' (advisory)')
1011 nbmp = len(self.mandatoryparams)
1009 nbmp = len(self.mandatoryparams)
1012 nbap = len(self.advisoryparams)
1010 nbap = len(self.advisoryparams)
1013 if nbmp or nbap:
1011 if nbmp or nbap:
1014 msg.append(' (params:')
1012 msg.append(' (params:')
1015 if nbmp:
1013 if nbmp:
1016 msg.append(' %i mandatory' % nbmp)
1014 msg.append(' %i mandatory' % nbmp)
1017 if nbap:
1015 if nbap:
1018 msg.append(' %i advisory' % nbmp)
1016 msg.append(' %i advisory' % nbmp)
1019 msg.append(')')
1017 msg.append(')')
1020 if not self.data:
1018 if not self.data:
1021 msg.append(' empty payload')
1019 msg.append(' empty payload')
1022 elif (util.safehasattr(self.data, 'next')
1020 elif (util.safehasattr(self.data, 'next')
1023 or util.safehasattr(self.data, '__next__')):
1021 or util.safehasattr(self.data, '__next__')):
1024 msg.append(' streamed payload')
1022 msg.append(' streamed payload')
1025 else:
1023 else:
1026 msg.append(' %i bytes payload' % len(self.data))
1024 msg.append(' %i bytes payload' % len(self.data))
1027 msg.append('\n')
1025 msg.append('\n')
1028 ui.debug(''.join(msg))
1026 ui.debug(''.join(msg))
1029
1027
1030 #### header
1028 #### header
1031 if self.mandatory:
1029 if self.mandatory:
1032 parttype = self.type.upper()
1030 parttype = self.type.upper()
1033 else:
1031 else:
1034 parttype = self.type.lower()
1032 parttype = self.type.lower()
1035 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1033 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1036 ## parttype
1034 ## parttype
1037 header = [_pack(_fparttypesize, len(parttype)),
1035 header = [_pack(_fparttypesize, len(parttype)),
1038 parttype, _pack(_fpartid, self.id),
1036 parttype, _pack(_fpartid, self.id),
1039 ]
1037 ]
1040 ## parameters
1038 ## parameters
1041 # count
1039 # count
1042 manpar = self.mandatoryparams
1040 manpar = self.mandatoryparams
1043 advpar = self.advisoryparams
1041 advpar = self.advisoryparams
1044 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1042 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1045 # size
1043 # size
1046 parsizes = []
1044 parsizes = []
1047 for key, value in manpar:
1045 for key, value in manpar:
1048 parsizes.append(len(key))
1046 parsizes.append(len(key))
1049 parsizes.append(len(value))
1047 parsizes.append(len(value))
1050 for key, value in advpar:
1048 for key, value in advpar:
1051 parsizes.append(len(key))
1049 parsizes.append(len(key))
1052 parsizes.append(len(value))
1050 parsizes.append(len(value))
1053 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1051 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1054 header.append(paramsizes)
1052 header.append(paramsizes)
1055 # key, value
1053 # key, value
1056 for key, value in manpar:
1054 for key, value in manpar:
1057 header.append(key)
1055 header.append(key)
1058 header.append(value)
1056 header.append(value)
1059 for key, value in advpar:
1057 for key, value in advpar:
1060 header.append(key)
1058 header.append(key)
1061 header.append(value)
1059 header.append(value)
1062 ## finalize header
1060 ## finalize header
1063 try:
1061 try:
1064 headerchunk = ''.join(header)
1062 headerchunk = ''.join(header)
1065 except TypeError:
1063 except TypeError:
1066 raise TypeError(r'Found a non-bytes trying to '
1064 raise TypeError(r'Found a non-bytes trying to '
1067 r'build bundle part header: %r' % header)
1065 r'build bundle part header: %r' % header)
1068 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1066 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1069 yield _pack(_fpartheadersize, len(headerchunk))
1067 yield _pack(_fpartheadersize, len(headerchunk))
1070 yield headerchunk
1068 yield headerchunk
1071 ## payload
1069 ## payload
1072 try:
1070 try:
1073 for chunk in self._payloadchunks():
1071 for chunk in self._payloadchunks():
1074 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1072 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1075 yield _pack(_fpayloadsize, len(chunk))
1073 yield _pack(_fpayloadsize, len(chunk))
1076 yield chunk
1074 yield chunk
1077 except GeneratorExit:
1075 except GeneratorExit:
1078 # GeneratorExit means that nobody is listening for our
1076 # GeneratorExit means that nobody is listening for our
1079 # results anyway, so just bail quickly rather than trying
1077 # results anyway, so just bail quickly rather than trying
1080 # to produce an error part.
1078 # to produce an error part.
1081 ui.debug('bundle2-generatorexit\n')
1079 ui.debug('bundle2-generatorexit\n')
1082 raise
1080 raise
1083 except BaseException as exc:
1081 except BaseException as exc:
1084 bexc = util.forcebytestr(exc)
1082 bexc = util.forcebytestr(exc)
1085 # backup exception data for later
1083 # backup exception data for later
1086 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1084 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1087 % bexc)
1085 % bexc)
1088 tb = sys.exc_info()[2]
1086 tb = sys.exc_info()[2]
1089 msg = 'unexpected error: %s' % bexc
1087 msg = 'unexpected error: %s' % bexc
1090 interpart = bundlepart('error:abort', [('message', msg)],
1088 interpart = bundlepart('error:abort', [('message', msg)],
1091 mandatory=False)
1089 mandatory=False)
1092 interpart.id = 0
1090 interpart.id = 0
1093 yield _pack(_fpayloadsize, -1)
1091 yield _pack(_fpayloadsize, -1)
1094 for chunk in interpart.getchunks(ui=ui):
1092 for chunk in interpart.getchunks(ui=ui):
1095 yield chunk
1093 yield chunk
1096 outdebug(ui, 'closing payload chunk')
1094 outdebug(ui, 'closing payload chunk')
1097 # abort current part payload
1095 # abort current part payload
1098 yield _pack(_fpayloadsize, 0)
1096 yield _pack(_fpayloadsize, 0)
1099 pycompat.raisewithtb(exc, tb)
1097 pycompat.raisewithtb(exc, tb)
1100 # end of payload
1098 # end of payload
1101 outdebug(ui, 'closing payload chunk')
1099 outdebug(ui, 'closing payload chunk')
1102 yield _pack(_fpayloadsize, 0)
1100 yield _pack(_fpayloadsize, 0)
1103 self._generated = True
1101 self._generated = True
1104
1102
1105 def _payloadchunks(self):
1103 def _payloadchunks(self):
1106 """yield chunks of a the part payload
1104 """yield chunks of a the part payload
1107
1105
1108 Exists to handle the different methods to provide data to a part."""
1106 Exists to handle the different methods to provide data to a part."""
1109 # we only support fixed size data now.
1107 # we only support fixed size data now.
1110 # This will be improved in the future.
1108 # This will be improved in the future.
1111 if (util.safehasattr(self.data, 'next')
1109 if (util.safehasattr(self.data, 'next')
1112 or util.safehasattr(self.data, '__next__')):
1110 or util.safehasattr(self.data, '__next__')):
1113 buff = util.chunkbuffer(self.data)
1111 buff = util.chunkbuffer(self.data)
1114 chunk = buff.read(preferedchunksize)
1112 chunk = buff.read(preferedchunksize)
1115 while chunk:
1113 while chunk:
1116 yield chunk
1114 yield chunk
1117 chunk = buff.read(preferedchunksize)
1115 chunk = buff.read(preferedchunksize)
1118 elif len(self.data):
1116 elif len(self.data):
1119 yield self.data
1117 yield self.data
1120
1118
1121
1119
1122 flaginterrupt = -1
1120 flaginterrupt = -1
1123
1121
1124 class interrupthandler(unpackermixin):
1122 class interrupthandler(unpackermixin):
1125 """read one part and process it with restricted capability
1123 """read one part and process it with restricted capability
1126
1124
1127 This allows to transmit exception raised on the producer size during part
1125 This allows to transmit exception raised on the producer size during part
1128 iteration while the consumer is reading a part.
1126 iteration while the consumer is reading a part.
1129
1127
1130 Part processed in this manner only have access to a ui object,"""
1128 Part processed in this manner only have access to a ui object,"""
1131
1129
1132 def __init__(self, ui, fp):
1130 def __init__(self, ui, fp):
1133 super(interrupthandler, self).__init__(fp)
1131 super(interrupthandler, self).__init__(fp)
1134 self.ui = ui
1132 self.ui = ui
1135
1133
1136 def _readpartheader(self):
1134 def _readpartheader(self):
1137 """reads a part header size and return the bytes blob
1135 """reads a part header size and return the bytes blob
1138
1136
1139 returns None if empty"""
1137 returns None if empty"""
1140 headersize = self._unpack(_fpartheadersize)[0]
1138 headersize = self._unpack(_fpartheadersize)[0]
1141 if headersize < 0:
1139 if headersize < 0:
1142 raise error.BundleValueError('negative part header size: %i'
1140 raise error.BundleValueError('negative part header size: %i'
1143 % headersize)
1141 % headersize)
1144 indebug(self.ui, 'part header size: %i\n' % headersize)
1142 indebug(self.ui, 'part header size: %i\n' % headersize)
1145 if headersize:
1143 if headersize:
1146 return self._readexact(headersize)
1144 return self._readexact(headersize)
1147 return None
1145 return None
1148
1146
1149 def __call__(self):
1147 def __call__(self):
1150
1148
1151 self.ui.debug('bundle2-input-stream-interrupt:'
1149 self.ui.debug('bundle2-input-stream-interrupt:'
1152 ' opening out of band context\n')
1150 ' opening out of band context\n')
1153 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1151 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1154 headerblock = self._readpartheader()
1152 headerblock = self._readpartheader()
1155 if headerblock is None:
1153 if headerblock is None:
1156 indebug(self.ui, 'no part found during interruption.')
1154 indebug(self.ui, 'no part found during interruption.')
1157 return
1155 return
1158 part = unbundlepart(self.ui, headerblock, self._fp)
1156 part = unbundlepart(self.ui, headerblock, self._fp)
1159 op = interruptoperation(self.ui)
1157 op = interruptoperation(self.ui)
1160 hardabort = False
1158 hardabort = False
1161 try:
1159 try:
1162 _processpart(op, part)
1160 _processpart(op, part)
1163 except (SystemExit, KeyboardInterrupt):
1161 except (SystemExit, KeyboardInterrupt):
1164 hardabort = True
1162 hardabort = True
1165 raise
1163 raise
1166 finally:
1164 finally:
1167 if not hardabort:
1165 if not hardabort:
1168 part.seek(0, 2)
1166 part.seek(0, 2)
1169 self.ui.debug('bundle2-input-stream-interrupt:'
1167 self.ui.debug('bundle2-input-stream-interrupt:'
1170 ' closing out of band context\n')
1168 ' closing out of band context\n')
1171
1169
1172 class interruptoperation(object):
1170 class interruptoperation(object):
1173 """A limited operation to be use by part handler during interruption
1171 """A limited operation to be use by part handler during interruption
1174
1172
1175 It only have access to an ui object.
1173 It only have access to an ui object.
1176 """
1174 """
1177
1175
1178 def __init__(self, ui):
1176 def __init__(self, ui):
1179 self.ui = ui
1177 self.ui = ui
1180 self.reply = None
1178 self.reply = None
1181 self.captureoutput = False
1179 self.captureoutput = False
1182
1180
1183 @property
1181 @property
1184 def repo(self):
1182 def repo(self):
1185 raise error.ProgrammingError('no repo access from stream interruption')
1183 raise error.ProgrammingError('no repo access from stream interruption')
1186
1184
1187 def gettransaction(self):
1185 def gettransaction(self):
1188 raise TransactionUnavailable('no repo access from stream interruption')
1186 raise TransactionUnavailable('no repo access from stream interruption')
1189
1187
1190 class unbundlepart(unpackermixin):
1188 class unbundlepart(unpackermixin):
1191 """a bundle part read from a bundle"""
1189 """a bundle part read from a bundle"""
1192
1190
1193 def __init__(self, ui, header, fp):
1191 def __init__(self, ui, header, fp):
1194 super(unbundlepart, self).__init__(fp)
1192 super(unbundlepart, self).__init__(fp)
1195 self._seekable = (util.safehasattr(fp, 'seek') and
1193 self._seekable = (util.safehasattr(fp, 'seek') and
1196 util.safehasattr(fp, 'tell'))
1194 util.safehasattr(fp, 'tell'))
1197 self.ui = ui
1195 self.ui = ui
1198 # unbundle state attr
1196 # unbundle state attr
1199 self._headerdata = header
1197 self._headerdata = header
1200 self._headeroffset = 0
1198 self._headeroffset = 0
1201 self._initialized = False
1199 self._initialized = False
1202 self.consumed = False
1200 self.consumed = False
1203 # part data
1201 # part data
1204 self.id = None
1202 self.id = None
1205 self.type = None
1203 self.type = None
1206 self.mandatoryparams = None
1204 self.mandatoryparams = None
1207 self.advisoryparams = None
1205 self.advisoryparams = None
1208 self.params = None
1206 self.params = None
1209 self.mandatorykeys = ()
1207 self.mandatorykeys = ()
1210 self._payloadstream = None
1208 self._payloadstream = None
1211 self._readheader()
1209 self._readheader()
1212 self._mandatory = None
1210 self._mandatory = None
1213 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1211 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1214 self._pos = 0
1212 self._pos = 0
1215
1213
1216 def _fromheader(self, size):
1214 def _fromheader(self, size):
1217 """return the next <size> byte from the header"""
1215 """return the next <size> byte from the header"""
1218 offset = self._headeroffset
1216 offset = self._headeroffset
1219 data = self._headerdata[offset:(offset + size)]
1217 data = self._headerdata[offset:(offset + size)]
1220 self._headeroffset = offset + size
1218 self._headeroffset = offset + size
1221 return data
1219 return data
1222
1220
1223 def _unpackheader(self, format):
1221 def _unpackheader(self, format):
1224 """read given format from header
1222 """read given format from header
1225
1223
1226 This automatically compute the size of the format to read."""
1224 This automatically compute the size of the format to read."""
1227 data = self._fromheader(struct.calcsize(format))
1225 data = self._fromheader(struct.calcsize(format))
1228 return _unpack(format, data)
1226 return _unpack(format, data)
1229
1227
1230 def _initparams(self, mandatoryparams, advisoryparams):
1228 def _initparams(self, mandatoryparams, advisoryparams):
1231 """internal function to setup all logic related parameters"""
1229 """internal function to setup all logic related parameters"""
1232 # make it read only to prevent people touching it by mistake.
1230 # make it read only to prevent people touching it by mistake.
1233 self.mandatoryparams = tuple(mandatoryparams)
1231 self.mandatoryparams = tuple(mandatoryparams)
1234 self.advisoryparams = tuple(advisoryparams)
1232 self.advisoryparams = tuple(advisoryparams)
1235 # user friendly UI
1233 # user friendly UI
1236 self.params = util.sortdict(self.mandatoryparams)
1234 self.params = util.sortdict(self.mandatoryparams)
1237 self.params.update(self.advisoryparams)
1235 self.params.update(self.advisoryparams)
1238 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1236 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1239
1237
1240 def _payloadchunks(self, chunknum=0):
1238 def _payloadchunks(self, chunknum=0):
1241 '''seek to specified chunk and start yielding data'''
1239 '''seek to specified chunk and start yielding data'''
1242 if len(self._chunkindex) == 0:
1240 if len(self._chunkindex) == 0:
1243 assert chunknum == 0, 'Must start with chunk 0'
1241 assert chunknum == 0, 'Must start with chunk 0'
1244 self._chunkindex.append((0, self._tellfp()))
1242 self._chunkindex.append((0, self._tellfp()))
1245 else:
1243 else:
1246 assert chunknum < len(self._chunkindex), \
1244 assert chunknum < len(self._chunkindex), \
1247 'Unknown chunk %d' % chunknum
1245 'Unknown chunk %d' % chunknum
1248 self._seekfp(self._chunkindex[chunknum][1])
1246 self._seekfp(self._chunkindex[chunknum][1])
1249
1247
1250 pos = self._chunkindex[chunknum][0]
1248 pos = self._chunkindex[chunknum][0]
1251 payloadsize = self._unpack(_fpayloadsize)[0]
1249 payloadsize = self._unpack(_fpayloadsize)[0]
1252 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1250 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1253 while payloadsize:
1251 while payloadsize:
1254 if payloadsize == flaginterrupt:
1252 if payloadsize == flaginterrupt:
1255 # interruption detection, the handler will now read a
1253 # interruption detection, the handler will now read a
1256 # single part and process it.
1254 # single part and process it.
1257 interrupthandler(self.ui, self._fp)()
1255 interrupthandler(self.ui, self._fp)()
1258 elif payloadsize < 0:
1256 elif payloadsize < 0:
1259 msg = 'negative payload chunk size: %i' % payloadsize
1257 msg = 'negative payload chunk size: %i' % payloadsize
1260 raise error.BundleValueError(msg)
1258 raise error.BundleValueError(msg)
1261 else:
1259 else:
1262 result = self._readexact(payloadsize)
1260 result = self._readexact(payloadsize)
1263 chunknum += 1
1261 chunknum += 1
1264 pos += payloadsize
1262 pos += payloadsize
1265 if chunknum == len(self._chunkindex):
1263 if chunknum == len(self._chunkindex):
1266 self._chunkindex.append((pos, self._tellfp()))
1264 self._chunkindex.append((pos, self._tellfp()))
1267 yield result
1265 yield result
1268 payloadsize = self._unpack(_fpayloadsize)[0]
1266 payloadsize = self._unpack(_fpayloadsize)[0]
1269 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1267 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1270
1268
1271 def _findchunk(self, pos):
1269 def _findchunk(self, pos):
1272 '''for a given payload position, return a chunk number and offset'''
1270 '''for a given payload position, return a chunk number and offset'''
1273 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1271 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1274 if ppos == pos:
1272 if ppos == pos:
1275 return chunk, 0
1273 return chunk, 0
1276 elif ppos > pos:
1274 elif ppos > pos:
1277 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1275 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1278 raise ValueError('Unknown chunk')
1276 raise ValueError('Unknown chunk')
1279
1277
1280 def _readheader(self):
1278 def _readheader(self):
1281 """read the header and setup the object"""
1279 """read the header and setup the object"""
1282 typesize = self._unpackheader(_fparttypesize)[0]
1280 typesize = self._unpackheader(_fparttypesize)[0]
1283 self.type = self._fromheader(typesize)
1281 self.type = self._fromheader(typesize)
1284 indebug(self.ui, 'part type: "%s"' % self.type)
1282 indebug(self.ui, 'part type: "%s"' % self.type)
1285 self.id = self._unpackheader(_fpartid)[0]
1283 self.id = self._unpackheader(_fpartid)[0]
1286 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1284 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1287 # extract mandatory bit from type
1285 # extract mandatory bit from type
1288 self.mandatory = (self.type != self.type.lower())
1286 self.mandatory = (self.type != self.type.lower())
1289 self.type = self.type.lower()
1287 self.type = self.type.lower()
1290 ## reading parameters
1288 ## reading parameters
1291 # param count
1289 # param count
1292 mancount, advcount = self._unpackheader(_fpartparamcount)
1290 mancount, advcount = self._unpackheader(_fpartparamcount)
1293 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1291 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1294 # param size
1292 # param size
1295 fparamsizes = _makefpartparamsizes(mancount + advcount)
1293 fparamsizes = _makefpartparamsizes(mancount + advcount)
1296 paramsizes = self._unpackheader(fparamsizes)
1294 paramsizes = self._unpackheader(fparamsizes)
1297 # make it a list of couple again
1295 # make it a list of couple again
1298 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1296 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1299 # split mandatory from advisory
1297 # split mandatory from advisory
1300 mansizes = paramsizes[:mancount]
1298 mansizes = paramsizes[:mancount]
1301 advsizes = paramsizes[mancount:]
1299 advsizes = paramsizes[mancount:]
1302 # retrieve param value
1300 # retrieve param value
1303 manparams = []
1301 manparams = []
1304 for key, value in mansizes:
1302 for key, value in mansizes:
1305 manparams.append((self._fromheader(key), self._fromheader(value)))
1303 manparams.append((self._fromheader(key), self._fromheader(value)))
1306 advparams = []
1304 advparams = []
1307 for key, value in advsizes:
1305 for key, value in advsizes:
1308 advparams.append((self._fromheader(key), self._fromheader(value)))
1306 advparams.append((self._fromheader(key), self._fromheader(value)))
1309 self._initparams(manparams, advparams)
1307 self._initparams(manparams, advparams)
1310 ## part payload
1308 ## part payload
1311 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1309 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1312 # we read the data, tell it
1310 # we read the data, tell it
1313 self._initialized = True
1311 self._initialized = True
1314
1312
1315 def read(self, size=None):
1313 def read(self, size=None):
1316 """read payload data"""
1314 """read payload data"""
1317 if not self._initialized:
1315 if not self._initialized:
1318 self._readheader()
1316 self._readheader()
1319 if size is None:
1317 if size is None:
1320 data = self._payloadstream.read()
1318 data = self._payloadstream.read()
1321 else:
1319 else:
1322 data = self._payloadstream.read(size)
1320 data = self._payloadstream.read(size)
1323 self._pos += len(data)
1321 self._pos += len(data)
1324 if size is None or len(data) < size:
1322 if size is None or len(data) < size:
1325 if not self.consumed and self._pos:
1323 if not self.consumed and self._pos:
1326 self.ui.debug('bundle2-input-part: total payload size %i\n'
1324 self.ui.debug('bundle2-input-part: total payload size %i\n'
1327 % self._pos)
1325 % self._pos)
1328 self.consumed = True
1326 self.consumed = True
1329 return data
1327 return data
1330
1328
1331 def tell(self):
1329 def tell(self):
1332 return self._pos
1330 return self._pos
1333
1331
1334 def seek(self, offset, whence=0):
1332 def seek(self, offset, whence=0):
1335 if whence == 0:
1333 if whence == 0:
1336 newpos = offset
1334 newpos = offset
1337 elif whence == 1:
1335 elif whence == 1:
1338 newpos = self._pos + offset
1336 newpos = self._pos + offset
1339 elif whence == 2:
1337 elif whence == 2:
1340 if not self.consumed:
1338 if not self.consumed:
1341 self.read()
1339 self.read()
1342 newpos = self._chunkindex[-1][0] - offset
1340 newpos = self._chunkindex[-1][0] - offset
1343 else:
1341 else:
1344 raise ValueError('Unknown whence value: %r' % (whence,))
1342 raise ValueError('Unknown whence value: %r' % (whence,))
1345
1343
1346 if newpos > self._chunkindex[-1][0] and not self.consumed:
1344 if newpos > self._chunkindex[-1][0] and not self.consumed:
1347 self.read()
1345 self.read()
1348 if not 0 <= newpos <= self._chunkindex[-1][0]:
1346 if not 0 <= newpos <= self._chunkindex[-1][0]:
1349 raise ValueError('Offset out of range')
1347 raise ValueError('Offset out of range')
1350
1348
1351 if self._pos != newpos:
1349 if self._pos != newpos:
1352 chunk, internaloffset = self._findchunk(newpos)
1350 chunk, internaloffset = self._findchunk(newpos)
1353 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1351 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1354 adjust = self.read(internaloffset)
1352 adjust = self.read(internaloffset)
1355 if len(adjust) != internaloffset:
1353 if len(adjust) != internaloffset:
1356 raise error.Abort(_('Seek failed\n'))
1354 raise error.Abort(_('Seek failed\n'))
1357 self._pos = newpos
1355 self._pos = newpos
1358
1356
1359 def _seekfp(self, offset, whence=0):
1357 def _seekfp(self, offset, whence=0):
1360 """move the underlying file pointer
1358 """move the underlying file pointer
1361
1359
1362 This method is meant for internal usage by the bundle2 protocol only.
1360 This method is meant for internal usage by the bundle2 protocol only.
1363 They directly manipulate the low level stream including bundle2 level
1361 They directly manipulate the low level stream including bundle2 level
1364 instruction.
1362 instruction.
1365
1363
1366 Do not use it to implement higher-level logic or methods."""
1364 Do not use it to implement higher-level logic or methods."""
1367 if self._seekable:
1365 if self._seekable:
1368 return self._fp.seek(offset, whence)
1366 return self._fp.seek(offset, whence)
1369 else:
1367 else:
1370 raise NotImplementedError(_('File pointer is not seekable'))
1368 raise NotImplementedError(_('File pointer is not seekable'))
1371
1369
1372 def _tellfp(self):
1370 def _tellfp(self):
1373 """return the file offset, or None if file is not seekable
1371 """return the file offset, or None if file is not seekable
1374
1372
1375 This method is meant for internal usage by the bundle2 protocol only.
1373 This method is meant for internal usage by the bundle2 protocol only.
1376 They directly manipulate the low level stream including bundle2 level
1374 They directly manipulate the low level stream including bundle2 level
1377 instruction.
1375 instruction.
1378
1376
1379 Do not use it to implement higher-level logic or methods."""
1377 Do not use it to implement higher-level logic or methods."""
1380 if self._seekable:
1378 if self._seekable:
1381 try:
1379 try:
1382 return self._fp.tell()
1380 return self._fp.tell()
1383 except IOError as e:
1381 except IOError as e:
1384 if e.errno == errno.ESPIPE:
1382 if e.errno == errno.ESPIPE:
1385 self._seekable = False
1383 self._seekable = False
1386 else:
1384 else:
1387 raise
1385 raise
1388 return None
1386 return None
1389
1387
1390 # These are only the static capabilities.
1388 # These are only the static capabilities.
1391 # Check the 'getrepocaps' function for the rest.
1389 # Check the 'getrepocaps' function for the rest.
1392 capabilities = {'HG20': (),
1390 capabilities = {'HG20': (),
1393 'error': ('abort', 'unsupportedcontent', 'pushraced',
1391 'error': ('abort', 'unsupportedcontent', 'pushraced',
1394 'pushkey'),
1392 'pushkey'),
1395 'listkeys': (),
1393 'listkeys': (),
1396 'pushkey': (),
1394 'pushkey': (),
1397 'digests': tuple(sorted(util.DIGESTS.keys())),
1395 'digests': tuple(sorted(util.DIGESTS.keys())),
1398 'remote-changegroup': ('http', 'https'),
1396 'remote-changegroup': ('http', 'https'),
1399 'hgtagsfnodes': (),
1397 'hgtagsfnodes': (),
1400 }
1398 }
1401
1399
1402 def getrepocaps(repo, allowpushback=False):
1400 def getrepocaps(repo, allowpushback=False):
1403 """return the bundle2 capabilities for a given repo
1401 """return the bundle2 capabilities for a given repo
1404
1402
1405 Exists to allow extensions (like evolution) to mutate the capabilities.
1403 Exists to allow extensions (like evolution) to mutate the capabilities.
1406 """
1404 """
1407 caps = capabilities.copy()
1405 caps = capabilities.copy()
1408 caps['changegroup'] = tuple(sorted(
1406 caps['changegroup'] = tuple(sorted(
1409 changegroup.supportedincomingversions(repo)))
1407 changegroup.supportedincomingversions(repo)))
1410 if obsolete.isenabled(repo, obsolete.exchangeopt):
1408 if obsolete.isenabled(repo, obsolete.exchangeopt):
1411 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1409 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1412 caps['obsmarkers'] = supportedformat
1410 caps['obsmarkers'] = supportedformat
1413 if allowpushback:
1411 if allowpushback:
1414 caps['pushback'] = ()
1412 caps['pushback'] = ()
1415 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1413 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1416 if cpmode == 'check-related':
1414 if cpmode == 'check-related':
1417 caps['checkheads'] = ('related',)
1415 caps['checkheads'] = ('related',)
1418 return caps
1416 return caps
1419
1417
1420 def bundle2caps(remote):
1418 def bundle2caps(remote):
1421 """return the bundle capabilities of a peer as dict"""
1419 """return the bundle capabilities of a peer as dict"""
1422 raw = remote.capable('bundle2')
1420 raw = remote.capable('bundle2')
1423 if not raw and raw != '':
1421 if not raw and raw != '':
1424 return {}
1422 return {}
1425 capsblob = urlreq.unquote(remote.capable('bundle2'))
1423 capsblob = urlreq.unquote(remote.capable('bundle2'))
1426 return decodecaps(capsblob)
1424 return decodecaps(capsblob)
1427
1425
1428 def obsmarkersversion(caps):
1426 def obsmarkersversion(caps):
1429 """extract the list of supported obsmarkers versions from a bundle2caps dict
1427 """extract the list of supported obsmarkers versions from a bundle2caps dict
1430 """
1428 """
1431 obscaps = caps.get('obsmarkers', ())
1429 obscaps = caps.get('obsmarkers', ())
1432 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1430 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1433
1431
1434 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1432 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1435 vfs=None, compression=None, compopts=None):
1433 vfs=None, compression=None, compopts=None):
1436 if bundletype.startswith('HG10'):
1434 if bundletype.startswith('HG10'):
1437 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1435 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1438 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1436 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1439 compression=compression, compopts=compopts)
1437 compression=compression, compopts=compopts)
1440 elif not bundletype.startswith('HG20'):
1438 elif not bundletype.startswith('HG20'):
1441 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1439 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1442
1440
1443 caps = {}
1441 caps = {}
1444 if 'obsolescence' in opts:
1442 if 'obsolescence' in opts:
1445 caps['obsmarkers'] = ('V1',)
1443 caps['obsmarkers'] = ('V1',)
1446 bundle = bundle20(ui, caps)
1444 bundle = bundle20(ui, caps)
1447 bundle.setcompression(compression, compopts)
1445 bundle.setcompression(compression, compopts)
1448 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1446 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1449 chunkiter = bundle.getchunks()
1447 chunkiter = bundle.getchunks()
1450
1448
1451 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1449 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1452
1450
1453 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1451 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1454 # We should eventually reconcile this logic with the one behind
1452 # We should eventually reconcile this logic with the one behind
1455 # 'exchange.getbundle2partsgenerator'.
1453 # 'exchange.getbundle2partsgenerator'.
1456 #
1454 #
1457 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1455 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1458 # different right now. So we keep them separated for now for the sake of
1456 # different right now. So we keep them separated for now for the sake of
1459 # simplicity.
1457 # simplicity.
1460
1458
1461 # we always want a changegroup in such bundle
1459 # we always want a changegroup in such bundle
1462 cgversion = opts.get('cg.version')
1460 cgversion = opts.get('cg.version')
1463 if cgversion is None:
1461 if cgversion is None:
1464 cgversion = changegroup.safeversion(repo)
1462 cgversion = changegroup.safeversion(repo)
1465 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1463 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1466 part = bundler.newpart('changegroup', data=cg.getchunks())
1464 part = bundler.newpart('changegroup', data=cg.getchunks())
1467 part.addparam('version', cg.version)
1465 part.addparam('version', cg.version)
1468 if 'clcount' in cg.extras:
1466 if 'clcount' in cg.extras:
1469 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1467 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1470 mandatory=False)
1468 mandatory=False)
1471 if opts.get('phases') and repo.revs('%ln and secret()',
1469 if opts.get('phases') and repo.revs('%ln and secret()',
1472 outgoing.missingheads):
1470 outgoing.missingheads):
1473 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1471 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1474
1472
1475 addparttagsfnodescache(repo, bundler, outgoing)
1473 addparttagsfnodescache(repo, bundler, outgoing)
1476
1474
1477 if opts.get('obsolescence', False):
1475 if opts.get('obsolescence', False):
1478 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1476 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1479 buildobsmarkerspart(bundler, obsmarkers)
1477 buildobsmarkerspart(bundler, obsmarkers)
1480
1478
1481 if opts.get('phases', False):
1479 if opts.get('phases', False):
1482 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1480 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1483 phasedata = []
1481 phasedata = phases.binaryencode(headsbyphase)
1484 for phase in phases.allphases:
1482 bundler.newpart('phase-heads', data=phasedata)
1485 for head in headsbyphase[phase]:
1486 phasedata.append(_fphasesentry.pack(phase, head))
1487 bundler.newpart('phase-heads', data=''.join(phasedata))
1488
1483
1489 def addparttagsfnodescache(repo, bundler, outgoing):
1484 def addparttagsfnodescache(repo, bundler, outgoing):
1490 # we include the tags fnode cache for the bundle changeset
1485 # we include the tags fnode cache for the bundle changeset
1491 # (as an optional parts)
1486 # (as an optional parts)
1492 cache = tags.hgtagsfnodescache(repo.unfiltered())
1487 cache = tags.hgtagsfnodescache(repo.unfiltered())
1493 chunks = []
1488 chunks = []
1494
1489
1495 # .hgtags fnodes are only relevant for head changesets. While we could
1490 # .hgtags fnodes are only relevant for head changesets. While we could
1496 # transfer values for all known nodes, there will likely be little to
1491 # transfer values for all known nodes, there will likely be little to
1497 # no benefit.
1492 # no benefit.
1498 #
1493 #
1499 # We don't bother using a generator to produce output data because
1494 # We don't bother using a generator to produce output data because
1500 # a) we only have 40 bytes per head and even esoteric numbers of heads
1495 # a) we only have 40 bytes per head and even esoteric numbers of heads
1501 # consume little memory (1M heads is 40MB) b) we don't want to send the
1496 # consume little memory (1M heads is 40MB) b) we don't want to send the
1502 # part if we don't have entries and knowing if we have entries requires
1497 # part if we don't have entries and knowing if we have entries requires
1503 # cache lookups.
1498 # cache lookups.
1504 for node in outgoing.missingheads:
1499 for node in outgoing.missingheads:
1505 # Don't compute missing, as this may slow down serving.
1500 # Don't compute missing, as this may slow down serving.
1506 fnode = cache.getfnode(node, computemissing=False)
1501 fnode = cache.getfnode(node, computemissing=False)
1507 if fnode is not None:
1502 if fnode is not None:
1508 chunks.extend([node, fnode])
1503 chunks.extend([node, fnode])
1509
1504
1510 if chunks:
1505 if chunks:
1511 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1506 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1512
1507
1513 def buildobsmarkerspart(bundler, markers):
1508 def buildobsmarkerspart(bundler, markers):
1514 """add an obsmarker part to the bundler with <markers>
1509 """add an obsmarker part to the bundler with <markers>
1515
1510
1516 No part is created if markers is empty.
1511 No part is created if markers is empty.
1517 Raises ValueError if the bundler doesn't support any known obsmarker format.
1512 Raises ValueError if the bundler doesn't support any known obsmarker format.
1518 """
1513 """
1519 if not markers:
1514 if not markers:
1520 return None
1515 return None
1521
1516
1522 remoteversions = obsmarkersversion(bundler.capabilities)
1517 remoteversions = obsmarkersversion(bundler.capabilities)
1523 version = obsolete.commonversion(remoteversions)
1518 version = obsolete.commonversion(remoteversions)
1524 if version is None:
1519 if version is None:
1525 raise ValueError('bundler does not support common obsmarker format')
1520 raise ValueError('bundler does not support common obsmarker format')
1526 stream = obsolete.encodemarkers(markers, True, version=version)
1521 stream = obsolete.encodemarkers(markers, True, version=version)
1527 return bundler.newpart('obsmarkers', data=stream)
1522 return bundler.newpart('obsmarkers', data=stream)
1528
1523
1529 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1524 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1530 compopts=None):
1525 compopts=None):
1531 """Write a bundle file and return its filename.
1526 """Write a bundle file and return its filename.
1532
1527
1533 Existing files will not be overwritten.
1528 Existing files will not be overwritten.
1534 If no filename is specified, a temporary file is created.
1529 If no filename is specified, a temporary file is created.
1535 bz2 compression can be turned off.
1530 bz2 compression can be turned off.
1536 The bundle file will be deleted in case of errors.
1531 The bundle file will be deleted in case of errors.
1537 """
1532 """
1538
1533
1539 if bundletype == "HG20":
1534 if bundletype == "HG20":
1540 bundle = bundle20(ui)
1535 bundle = bundle20(ui)
1541 bundle.setcompression(compression, compopts)
1536 bundle.setcompression(compression, compopts)
1542 part = bundle.newpart('changegroup', data=cg.getchunks())
1537 part = bundle.newpart('changegroup', data=cg.getchunks())
1543 part.addparam('version', cg.version)
1538 part.addparam('version', cg.version)
1544 if 'clcount' in cg.extras:
1539 if 'clcount' in cg.extras:
1545 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1540 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1546 mandatory=False)
1541 mandatory=False)
1547 chunkiter = bundle.getchunks()
1542 chunkiter = bundle.getchunks()
1548 else:
1543 else:
1549 # compression argument is only for the bundle2 case
1544 # compression argument is only for the bundle2 case
1550 assert compression is None
1545 assert compression is None
1551 if cg.version != '01':
1546 if cg.version != '01':
1552 raise error.Abort(_('old bundle types only supports v1 '
1547 raise error.Abort(_('old bundle types only supports v1 '
1553 'changegroups'))
1548 'changegroups'))
1554 header, comp = bundletypes[bundletype]
1549 header, comp = bundletypes[bundletype]
1555 if comp not in util.compengines.supportedbundletypes:
1550 if comp not in util.compengines.supportedbundletypes:
1556 raise error.Abort(_('unknown stream compression type: %s')
1551 raise error.Abort(_('unknown stream compression type: %s')
1557 % comp)
1552 % comp)
1558 compengine = util.compengines.forbundletype(comp)
1553 compengine = util.compengines.forbundletype(comp)
1559 def chunkiter():
1554 def chunkiter():
1560 yield header
1555 yield header
1561 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1556 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1562 yield chunk
1557 yield chunk
1563 chunkiter = chunkiter()
1558 chunkiter = chunkiter()
1564
1559
1565 # parse the changegroup data, otherwise we will block
1560 # parse the changegroup data, otherwise we will block
1566 # in case of sshrepo because we don't know the end of the stream
1561 # in case of sshrepo because we don't know the end of the stream
1567 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1562 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1568
1563
1569 def combinechangegroupresults(op):
1564 def combinechangegroupresults(op):
1570 """logic to combine 0 or more addchangegroup results into one"""
1565 """logic to combine 0 or more addchangegroup results into one"""
1571 results = [r.get('return', 0)
1566 results = [r.get('return', 0)
1572 for r in op.records['changegroup']]
1567 for r in op.records['changegroup']]
1573 changedheads = 0
1568 changedheads = 0
1574 result = 1
1569 result = 1
1575 for ret in results:
1570 for ret in results:
1576 # If any changegroup result is 0, return 0
1571 # If any changegroup result is 0, return 0
1577 if ret == 0:
1572 if ret == 0:
1578 result = 0
1573 result = 0
1579 break
1574 break
1580 if ret < -1:
1575 if ret < -1:
1581 changedheads += ret + 1
1576 changedheads += ret + 1
1582 elif ret > 1:
1577 elif ret > 1:
1583 changedheads += ret - 1
1578 changedheads += ret - 1
1584 if changedheads > 0:
1579 if changedheads > 0:
1585 result = 1 + changedheads
1580 result = 1 + changedheads
1586 elif changedheads < 0:
1581 elif changedheads < 0:
1587 result = -1 + changedheads
1582 result = -1 + changedheads
1588 return result
1583 return result
1589
1584
1590 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1585 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1591 'targetphase'))
1586 'targetphase'))
1592 def handlechangegroup(op, inpart):
1587 def handlechangegroup(op, inpart):
1593 """apply a changegroup part on the repo
1588 """apply a changegroup part on the repo
1594
1589
1595 This is a very early implementation that will massive rework before being
1590 This is a very early implementation that will massive rework before being
1596 inflicted to any end-user.
1591 inflicted to any end-user.
1597 """
1592 """
1598 tr = op.gettransaction()
1593 tr = op.gettransaction()
1599 unpackerversion = inpart.params.get('version', '01')
1594 unpackerversion = inpart.params.get('version', '01')
1600 # We should raise an appropriate exception here
1595 # We should raise an appropriate exception here
1601 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1596 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1602 # the source and url passed here are overwritten by the one contained in
1597 # the source and url passed here are overwritten by the one contained in
1603 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1598 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1604 nbchangesets = None
1599 nbchangesets = None
1605 if 'nbchanges' in inpart.params:
1600 if 'nbchanges' in inpart.params:
1606 nbchangesets = int(inpart.params.get('nbchanges'))
1601 nbchangesets = int(inpart.params.get('nbchanges'))
1607 if ('treemanifest' in inpart.params and
1602 if ('treemanifest' in inpart.params and
1608 'treemanifest' not in op.repo.requirements):
1603 'treemanifest' not in op.repo.requirements):
1609 if len(op.repo.changelog) != 0:
1604 if len(op.repo.changelog) != 0:
1610 raise error.Abort(_(
1605 raise error.Abort(_(
1611 "bundle contains tree manifests, but local repo is "
1606 "bundle contains tree manifests, but local repo is "
1612 "non-empty and does not use tree manifests"))
1607 "non-empty and does not use tree manifests"))
1613 op.repo.requirements.add('treemanifest')
1608 op.repo.requirements.add('treemanifest')
1614 op.repo._applyopenerreqs()
1609 op.repo._applyopenerreqs()
1615 op.repo._writerequirements()
1610 op.repo._writerequirements()
1616 extrakwargs = {}
1611 extrakwargs = {}
1617 targetphase = inpart.params.get('targetphase')
1612 targetphase = inpart.params.get('targetphase')
1618 if targetphase is not None:
1613 if targetphase is not None:
1619 extrakwargs['targetphase'] = int(targetphase)
1614 extrakwargs['targetphase'] = int(targetphase)
1620 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1615 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1621 expectedtotal=nbchangesets, **extrakwargs)
1616 expectedtotal=nbchangesets, **extrakwargs)
1622 if op.reply is not None:
1617 if op.reply is not None:
1623 # This is definitely not the final form of this
1618 # This is definitely not the final form of this
1624 # return. But one need to start somewhere.
1619 # return. But one need to start somewhere.
1625 part = op.reply.newpart('reply:changegroup', mandatory=False)
1620 part = op.reply.newpart('reply:changegroup', mandatory=False)
1626 part.addparam(
1621 part.addparam(
1627 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1622 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1628 part.addparam('return', '%i' % ret, mandatory=False)
1623 part.addparam('return', '%i' % ret, mandatory=False)
1629 assert not inpart.read()
1624 assert not inpart.read()
1630
1625
1631 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1626 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1632 ['digest:%s' % k for k in util.DIGESTS.keys()])
1627 ['digest:%s' % k for k in util.DIGESTS.keys()])
1633 @parthandler('remote-changegroup', _remotechangegroupparams)
1628 @parthandler('remote-changegroup', _remotechangegroupparams)
1634 def handleremotechangegroup(op, inpart):
1629 def handleremotechangegroup(op, inpart):
1635 """apply a bundle10 on the repo, given an url and validation information
1630 """apply a bundle10 on the repo, given an url and validation information
1636
1631
1637 All the information about the remote bundle to import are given as
1632 All the information about the remote bundle to import are given as
1638 parameters. The parameters include:
1633 parameters. The parameters include:
1639 - url: the url to the bundle10.
1634 - url: the url to the bundle10.
1640 - size: the bundle10 file size. It is used to validate what was
1635 - size: the bundle10 file size. It is used to validate what was
1641 retrieved by the client matches the server knowledge about the bundle.
1636 retrieved by the client matches the server knowledge about the bundle.
1642 - digests: a space separated list of the digest types provided as
1637 - digests: a space separated list of the digest types provided as
1643 parameters.
1638 parameters.
1644 - digest:<digest-type>: the hexadecimal representation of the digest with
1639 - digest:<digest-type>: the hexadecimal representation of the digest with
1645 that name. Like the size, it is used to validate what was retrieved by
1640 that name. Like the size, it is used to validate what was retrieved by
1646 the client matches what the server knows about the bundle.
1641 the client matches what the server knows about the bundle.
1647
1642
1648 When multiple digest types are given, all of them are checked.
1643 When multiple digest types are given, all of them are checked.
1649 """
1644 """
1650 try:
1645 try:
1651 raw_url = inpart.params['url']
1646 raw_url = inpart.params['url']
1652 except KeyError:
1647 except KeyError:
1653 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1648 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1654 parsed_url = util.url(raw_url)
1649 parsed_url = util.url(raw_url)
1655 if parsed_url.scheme not in capabilities['remote-changegroup']:
1650 if parsed_url.scheme not in capabilities['remote-changegroup']:
1656 raise error.Abort(_('remote-changegroup does not support %s urls') %
1651 raise error.Abort(_('remote-changegroup does not support %s urls') %
1657 parsed_url.scheme)
1652 parsed_url.scheme)
1658
1653
1659 try:
1654 try:
1660 size = int(inpart.params['size'])
1655 size = int(inpart.params['size'])
1661 except ValueError:
1656 except ValueError:
1662 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1657 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1663 % 'size')
1658 % 'size')
1664 except KeyError:
1659 except KeyError:
1665 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1660 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1666
1661
1667 digests = {}
1662 digests = {}
1668 for typ in inpart.params.get('digests', '').split():
1663 for typ in inpart.params.get('digests', '').split():
1669 param = 'digest:%s' % typ
1664 param = 'digest:%s' % typ
1670 try:
1665 try:
1671 value = inpart.params[param]
1666 value = inpart.params[param]
1672 except KeyError:
1667 except KeyError:
1673 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1668 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1674 param)
1669 param)
1675 digests[typ] = value
1670 digests[typ] = value
1676
1671
1677 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1672 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1678
1673
1679 tr = op.gettransaction()
1674 tr = op.gettransaction()
1680 from . import exchange
1675 from . import exchange
1681 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1676 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1682 if not isinstance(cg, changegroup.cg1unpacker):
1677 if not isinstance(cg, changegroup.cg1unpacker):
1683 raise error.Abort(_('%s: not a bundle version 1.0') %
1678 raise error.Abort(_('%s: not a bundle version 1.0') %
1684 util.hidepassword(raw_url))
1679 util.hidepassword(raw_url))
1685 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1680 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1686 if op.reply is not None:
1681 if op.reply is not None:
1687 # This is definitely not the final form of this
1682 # This is definitely not the final form of this
1688 # return. But one need to start somewhere.
1683 # return. But one need to start somewhere.
1689 part = op.reply.newpart('reply:changegroup')
1684 part = op.reply.newpart('reply:changegroup')
1690 part.addparam(
1685 part.addparam(
1691 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1686 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1692 part.addparam('return', '%i' % ret, mandatory=False)
1687 part.addparam('return', '%i' % ret, mandatory=False)
1693 try:
1688 try:
1694 real_part.validate()
1689 real_part.validate()
1695 except error.Abort as e:
1690 except error.Abort as e:
1696 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1691 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1697 (util.hidepassword(raw_url), str(e)))
1692 (util.hidepassword(raw_url), str(e)))
1698 assert not inpart.read()
1693 assert not inpart.read()
1699
1694
1700 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1695 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1701 def handlereplychangegroup(op, inpart):
1696 def handlereplychangegroup(op, inpart):
1702 ret = int(inpart.params['return'])
1697 ret = int(inpart.params['return'])
1703 replyto = int(inpart.params['in-reply-to'])
1698 replyto = int(inpart.params['in-reply-to'])
1704 op.records.add('changegroup', {'return': ret}, replyto)
1699 op.records.add('changegroup', {'return': ret}, replyto)
1705
1700
1706 @parthandler('check:heads')
1701 @parthandler('check:heads')
1707 def handlecheckheads(op, inpart):
1702 def handlecheckheads(op, inpart):
1708 """check that head of the repo did not change
1703 """check that head of the repo did not change
1709
1704
1710 This is used to detect a push race when using unbundle.
1705 This is used to detect a push race when using unbundle.
1711 This replaces the "heads" argument of unbundle."""
1706 This replaces the "heads" argument of unbundle."""
1712 h = inpart.read(20)
1707 h = inpart.read(20)
1713 heads = []
1708 heads = []
1714 while len(h) == 20:
1709 while len(h) == 20:
1715 heads.append(h)
1710 heads.append(h)
1716 h = inpart.read(20)
1711 h = inpart.read(20)
1717 assert not h
1712 assert not h
1718 # Trigger a transaction so that we are guaranteed to have the lock now.
1713 # Trigger a transaction so that we are guaranteed to have the lock now.
1719 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1714 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1720 op.gettransaction()
1715 op.gettransaction()
1721 if sorted(heads) != sorted(op.repo.heads()):
1716 if sorted(heads) != sorted(op.repo.heads()):
1722 raise error.PushRaced('repository changed while pushing - '
1717 raise error.PushRaced('repository changed while pushing - '
1723 'please try again')
1718 'please try again')
1724
1719
1725 @parthandler('check:updated-heads')
1720 @parthandler('check:updated-heads')
1726 def handlecheckupdatedheads(op, inpart):
1721 def handlecheckupdatedheads(op, inpart):
1727 """check for race on the heads touched by a push
1722 """check for race on the heads touched by a push
1728
1723
1729 This is similar to 'check:heads' but focus on the heads actually updated
1724 This is similar to 'check:heads' but focus on the heads actually updated
1730 during the push. If other activities happen on unrelated heads, it is
1725 during the push. If other activities happen on unrelated heads, it is
1731 ignored.
1726 ignored.
1732
1727
1733 This allow server with high traffic to avoid push contention as long as
1728 This allow server with high traffic to avoid push contention as long as
1734 unrelated parts of the graph are involved."""
1729 unrelated parts of the graph are involved."""
1735 h = inpart.read(20)
1730 h = inpart.read(20)
1736 heads = []
1731 heads = []
1737 while len(h) == 20:
1732 while len(h) == 20:
1738 heads.append(h)
1733 heads.append(h)
1739 h = inpart.read(20)
1734 h = inpart.read(20)
1740 assert not h
1735 assert not h
1741 # trigger a transaction so that we are guaranteed to have the lock now.
1736 # trigger a transaction so that we are guaranteed to have the lock now.
1742 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1737 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1743 op.gettransaction()
1738 op.gettransaction()
1744
1739
1745 currentheads = set()
1740 currentheads = set()
1746 for ls in op.repo.branchmap().itervalues():
1741 for ls in op.repo.branchmap().itervalues():
1747 currentheads.update(ls)
1742 currentheads.update(ls)
1748
1743
1749 for h in heads:
1744 for h in heads:
1750 if h not in currentheads:
1745 if h not in currentheads:
1751 raise error.PushRaced('repository changed while pushing - '
1746 raise error.PushRaced('repository changed while pushing - '
1752 'please try again')
1747 'please try again')
1753
1748
1754 @parthandler('output')
1749 @parthandler('output')
1755 def handleoutput(op, inpart):
1750 def handleoutput(op, inpart):
1756 """forward output captured on the server to the client"""
1751 """forward output captured on the server to the client"""
1757 for line in inpart.read().splitlines():
1752 for line in inpart.read().splitlines():
1758 op.ui.status(_('remote: %s\n') % line)
1753 op.ui.status(_('remote: %s\n') % line)
1759
1754
1760 @parthandler('replycaps')
1755 @parthandler('replycaps')
1761 def handlereplycaps(op, inpart):
1756 def handlereplycaps(op, inpart):
1762 """Notify that a reply bundle should be created
1757 """Notify that a reply bundle should be created
1763
1758
1764 The payload contains the capabilities information for the reply"""
1759 The payload contains the capabilities information for the reply"""
1765 caps = decodecaps(inpart.read())
1760 caps = decodecaps(inpart.read())
1766 if op.reply is None:
1761 if op.reply is None:
1767 op.reply = bundle20(op.ui, caps)
1762 op.reply = bundle20(op.ui, caps)
1768
1763
1769 class AbortFromPart(error.Abort):
1764 class AbortFromPart(error.Abort):
1770 """Sub-class of Abort that denotes an error from a bundle2 part."""
1765 """Sub-class of Abort that denotes an error from a bundle2 part."""
1771
1766
1772 @parthandler('error:abort', ('message', 'hint'))
1767 @parthandler('error:abort', ('message', 'hint'))
1773 def handleerrorabort(op, inpart):
1768 def handleerrorabort(op, inpart):
1774 """Used to transmit abort error over the wire"""
1769 """Used to transmit abort error over the wire"""
1775 raise AbortFromPart(inpart.params['message'],
1770 raise AbortFromPart(inpart.params['message'],
1776 hint=inpart.params.get('hint'))
1771 hint=inpart.params.get('hint'))
1777
1772
1778 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1773 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1779 'in-reply-to'))
1774 'in-reply-to'))
1780 def handleerrorpushkey(op, inpart):
1775 def handleerrorpushkey(op, inpart):
1781 """Used to transmit failure of a mandatory pushkey over the wire"""
1776 """Used to transmit failure of a mandatory pushkey over the wire"""
1782 kwargs = {}
1777 kwargs = {}
1783 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1778 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1784 value = inpart.params.get(name)
1779 value = inpart.params.get(name)
1785 if value is not None:
1780 if value is not None:
1786 kwargs[name] = value
1781 kwargs[name] = value
1787 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1782 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1788
1783
1789 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1784 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1790 def handleerrorunsupportedcontent(op, inpart):
1785 def handleerrorunsupportedcontent(op, inpart):
1791 """Used to transmit unknown content error over the wire"""
1786 """Used to transmit unknown content error over the wire"""
1792 kwargs = {}
1787 kwargs = {}
1793 parttype = inpart.params.get('parttype')
1788 parttype = inpart.params.get('parttype')
1794 if parttype is not None:
1789 if parttype is not None:
1795 kwargs['parttype'] = parttype
1790 kwargs['parttype'] = parttype
1796 params = inpart.params.get('params')
1791 params = inpart.params.get('params')
1797 if params is not None:
1792 if params is not None:
1798 kwargs['params'] = params.split('\0')
1793 kwargs['params'] = params.split('\0')
1799
1794
1800 raise error.BundleUnknownFeatureError(**kwargs)
1795 raise error.BundleUnknownFeatureError(**kwargs)
1801
1796
1802 @parthandler('error:pushraced', ('message',))
1797 @parthandler('error:pushraced', ('message',))
1803 def handleerrorpushraced(op, inpart):
1798 def handleerrorpushraced(op, inpart):
1804 """Used to transmit push race error over the wire"""
1799 """Used to transmit push race error over the wire"""
1805 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1800 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1806
1801
1807 @parthandler('listkeys', ('namespace',))
1802 @parthandler('listkeys', ('namespace',))
1808 def handlelistkeys(op, inpart):
1803 def handlelistkeys(op, inpart):
1809 """retrieve pushkey namespace content stored in a bundle2"""
1804 """retrieve pushkey namespace content stored in a bundle2"""
1810 namespace = inpart.params['namespace']
1805 namespace = inpart.params['namespace']
1811 r = pushkey.decodekeys(inpart.read())
1806 r = pushkey.decodekeys(inpart.read())
1812 op.records.add('listkeys', (namespace, r))
1807 op.records.add('listkeys', (namespace, r))
1813
1808
1814 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1809 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1815 def handlepushkey(op, inpart):
1810 def handlepushkey(op, inpart):
1816 """process a pushkey request"""
1811 """process a pushkey request"""
1817 dec = pushkey.decode
1812 dec = pushkey.decode
1818 namespace = dec(inpart.params['namespace'])
1813 namespace = dec(inpart.params['namespace'])
1819 key = dec(inpart.params['key'])
1814 key = dec(inpart.params['key'])
1820 old = dec(inpart.params['old'])
1815 old = dec(inpart.params['old'])
1821 new = dec(inpart.params['new'])
1816 new = dec(inpart.params['new'])
1822 # Grab the transaction to ensure that we have the lock before performing the
1817 # Grab the transaction to ensure that we have the lock before performing the
1823 # pushkey.
1818 # pushkey.
1824 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1819 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1825 op.gettransaction()
1820 op.gettransaction()
1826 ret = op.repo.pushkey(namespace, key, old, new)
1821 ret = op.repo.pushkey(namespace, key, old, new)
1827 record = {'namespace': namespace,
1822 record = {'namespace': namespace,
1828 'key': key,
1823 'key': key,
1829 'old': old,
1824 'old': old,
1830 'new': new}
1825 'new': new}
1831 op.records.add('pushkey', record)
1826 op.records.add('pushkey', record)
1832 if op.reply is not None:
1827 if op.reply is not None:
1833 rpart = op.reply.newpart('reply:pushkey')
1828 rpart = op.reply.newpart('reply:pushkey')
1834 rpart.addparam(
1829 rpart.addparam(
1835 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1830 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1836 rpart.addparam('return', '%i' % ret, mandatory=False)
1831 rpart.addparam('return', '%i' % ret, mandatory=False)
1837 if inpart.mandatory and not ret:
1832 if inpart.mandatory and not ret:
1838 kwargs = {}
1833 kwargs = {}
1839 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1834 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1840 if key in inpart.params:
1835 if key in inpart.params:
1841 kwargs[key] = inpart.params[key]
1836 kwargs[key] = inpart.params[key]
1842 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1837 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1843
1838
1844 def _readphaseheads(inpart):
1839 def _readphaseheads(inpart):
1845 headsbyphase = [[] for i in phases.allphases]
1840 headsbyphase = [[] for i in phases.allphases]
1846 entrysize = _fphasesentry.size
1841 entrysize = phases._fphasesentry.size
1847 while True:
1842 while True:
1848 entry = inpart.read(entrysize)
1843 entry = inpart.read(entrysize)
1849 if len(entry) < entrysize:
1844 if len(entry) < entrysize:
1850 if entry:
1845 if entry:
1851 raise error.Abort(_('bad phase-heads bundle part'))
1846 raise error.Abort(_('bad phase-heads bundle part'))
1852 break
1847 break
1853 phase, node = _fphasesentry.unpack(entry)
1848 phase, node = phases._fphasesentry.unpack(entry)
1854 headsbyphase[phase].append(node)
1849 headsbyphase[phase].append(node)
1855 return headsbyphase
1850 return headsbyphase
1856
1851
1857 @parthandler('phase-heads')
1852 @parthandler('phase-heads')
1858 def handlephases(op, inpart):
1853 def handlephases(op, inpart):
1859 """apply phases from bundle part to repo"""
1854 """apply phases from bundle part to repo"""
1860 headsbyphase = _readphaseheads(inpart)
1855 headsbyphase = _readphaseheads(inpart)
1861 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1856 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1862 op.records.add('phase-heads', {})
1857 op.records.add('phase-heads', {})
1863
1858
1864 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1859 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1865 def handlepushkeyreply(op, inpart):
1860 def handlepushkeyreply(op, inpart):
1866 """retrieve the result of a pushkey request"""
1861 """retrieve the result of a pushkey request"""
1867 ret = int(inpart.params['return'])
1862 ret = int(inpart.params['return'])
1868 partid = int(inpart.params['in-reply-to'])
1863 partid = int(inpart.params['in-reply-to'])
1869 op.records.add('pushkey', {'return': ret}, partid)
1864 op.records.add('pushkey', {'return': ret}, partid)
1870
1865
1871 @parthandler('obsmarkers')
1866 @parthandler('obsmarkers')
1872 def handleobsmarker(op, inpart):
1867 def handleobsmarker(op, inpart):
1873 """add a stream of obsmarkers to the repo"""
1868 """add a stream of obsmarkers to the repo"""
1874 tr = op.gettransaction()
1869 tr = op.gettransaction()
1875 markerdata = inpart.read()
1870 markerdata = inpart.read()
1876 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1871 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1877 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1872 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1878 % len(markerdata))
1873 % len(markerdata))
1879 # The mergemarkers call will crash if marker creation is not enabled.
1874 # The mergemarkers call will crash if marker creation is not enabled.
1880 # we want to avoid this if the part is advisory.
1875 # we want to avoid this if the part is advisory.
1881 if not inpart.mandatory and op.repo.obsstore.readonly:
1876 if not inpart.mandatory and op.repo.obsstore.readonly:
1882 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1877 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1883 return
1878 return
1884 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1879 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1885 op.repo.invalidatevolatilesets()
1880 op.repo.invalidatevolatilesets()
1886 if new:
1881 if new:
1887 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1882 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1888 op.records.add('obsmarkers', {'new': new})
1883 op.records.add('obsmarkers', {'new': new})
1889 if op.reply is not None:
1884 if op.reply is not None:
1890 rpart = op.reply.newpart('reply:obsmarkers')
1885 rpart = op.reply.newpart('reply:obsmarkers')
1891 rpart.addparam(
1886 rpart.addparam(
1892 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1887 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1893 rpart.addparam('new', '%i' % new, mandatory=False)
1888 rpart.addparam('new', '%i' % new, mandatory=False)
1894
1889
1895
1890
1896 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1891 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1897 def handleobsmarkerreply(op, inpart):
1892 def handleobsmarkerreply(op, inpart):
1898 """retrieve the result of a pushkey request"""
1893 """retrieve the result of a pushkey request"""
1899 ret = int(inpart.params['new'])
1894 ret = int(inpart.params['new'])
1900 partid = int(inpart.params['in-reply-to'])
1895 partid = int(inpart.params['in-reply-to'])
1901 op.records.add('obsmarkers', {'new': ret}, partid)
1896 op.records.add('obsmarkers', {'new': ret}, partid)
1902
1897
1903 @parthandler('hgtagsfnodes')
1898 @parthandler('hgtagsfnodes')
1904 def handlehgtagsfnodes(op, inpart):
1899 def handlehgtagsfnodes(op, inpart):
1905 """Applies .hgtags fnodes cache entries to the local repo.
1900 """Applies .hgtags fnodes cache entries to the local repo.
1906
1901
1907 Payload is pairs of 20 byte changeset nodes and filenodes.
1902 Payload is pairs of 20 byte changeset nodes and filenodes.
1908 """
1903 """
1909 # Grab the transaction so we ensure that we have the lock at this point.
1904 # Grab the transaction so we ensure that we have the lock at this point.
1910 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1905 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1911 op.gettransaction()
1906 op.gettransaction()
1912 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1907 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1913
1908
1914 count = 0
1909 count = 0
1915 while True:
1910 while True:
1916 node = inpart.read(20)
1911 node = inpart.read(20)
1917 fnode = inpart.read(20)
1912 fnode = inpart.read(20)
1918 if len(node) < 20 or len(fnode) < 20:
1913 if len(node) < 20 or len(fnode) < 20:
1919 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1914 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1920 break
1915 break
1921 cache.setfnode(node, fnode)
1916 cache.setfnode(node, fnode)
1922 count += 1
1917 count += 1
1923
1918
1924 cache.write()
1919 cache.write()
1925 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1920 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1926
1921
1927 @parthandler('pushvars')
1922 @parthandler('pushvars')
1928 def bundle2getvars(op, part):
1923 def bundle2getvars(op, part):
1929 '''unbundle a bundle2 containing shellvars on the server'''
1924 '''unbundle a bundle2 containing shellvars on the server'''
1930 # An option to disable unbundling on server-side for security reasons
1925 # An option to disable unbundling on server-side for security reasons
1931 if op.ui.configbool('push', 'pushvars.server'):
1926 if op.ui.configbool('push', 'pushvars.server'):
1932 hookargs = {}
1927 hookargs = {}
1933 for key, value in part.advisoryparams:
1928 for key, value in part.advisoryparams:
1934 key = key.upper()
1929 key = key.upper()
1935 # We want pushed variables to have USERVAR_ prepended so we know
1930 # We want pushed variables to have USERVAR_ prepended so we know
1936 # they came from the --pushvar flag.
1931 # they came from the --pushvar flag.
1937 key = "USERVAR_" + key
1932 key = "USERVAR_" + key
1938 hookargs[key] = value
1933 hookargs[key] = value
1939 op.addhookargs(hookargs)
1934 op.addhookargs(hookargs)
@@ -1,596 +1,611 b''
1 """ Mercurial phases support code
1 """ Mercurial phases support code
2
2
3 ---
3 ---
4
4
5 Copyright 2011 Pierre-Yves David <pierre-yves.david@ens-lyon.org>
5 Copyright 2011 Pierre-Yves David <pierre-yves.david@ens-lyon.org>
6 Logilab SA <contact@logilab.fr>
6 Logilab SA <contact@logilab.fr>
7 Augie Fackler <durin42@gmail.com>
7 Augie Fackler <durin42@gmail.com>
8
8
9 This software may be used and distributed according to the terms
9 This software may be used and distributed according to the terms
10 of the GNU General Public License version 2 or any later version.
10 of the GNU General Public License version 2 or any later version.
11
11
12 ---
12 ---
13
13
14 This module implements most phase logic in mercurial.
14 This module implements most phase logic in mercurial.
15
15
16
16
17 Basic Concept
17 Basic Concept
18 =============
18 =============
19
19
20 A 'changeset phase' is an indicator that tells us how a changeset is
20 A 'changeset phase' is an indicator that tells us how a changeset is
21 manipulated and communicated. The details of each phase is described
21 manipulated and communicated. The details of each phase is described
22 below, here we describe the properties they have in common.
22 below, here we describe the properties they have in common.
23
23
24 Like bookmarks, phases are not stored in history and thus are not
24 Like bookmarks, phases are not stored in history and thus are not
25 permanent and leave no audit trail.
25 permanent and leave no audit trail.
26
26
27 First, no changeset can be in two phases at once. Phases are ordered,
27 First, no changeset can be in two phases at once. Phases are ordered,
28 so they can be considered from lowest to highest. The default, lowest
28 so they can be considered from lowest to highest. The default, lowest
29 phase is 'public' - this is the normal phase of existing changesets. A
29 phase is 'public' - this is the normal phase of existing changesets. A
30 child changeset can not be in a lower phase than its parents.
30 child changeset can not be in a lower phase than its parents.
31
31
32 These phases share a hierarchy of traits:
32 These phases share a hierarchy of traits:
33
33
34 immutable shared
34 immutable shared
35 public: X X
35 public: X X
36 draft: X
36 draft: X
37 secret:
37 secret:
38
38
39 Local commits are draft by default.
39 Local commits are draft by default.
40
40
41 Phase Movement and Exchange
41 Phase Movement and Exchange
42 ===========================
42 ===========================
43
43
44 Phase data is exchanged by pushkey on pull and push. Some servers have
44 Phase data is exchanged by pushkey on pull and push. Some servers have
45 a publish option set, we call such a server a "publishing server".
45 a publish option set, we call such a server a "publishing server".
46 Pushing a draft changeset to a publishing server changes the phase to
46 Pushing a draft changeset to a publishing server changes the phase to
47 public.
47 public.
48
48
49 A small list of fact/rules define the exchange of phase:
49 A small list of fact/rules define the exchange of phase:
50
50
51 * old client never changes server states
51 * old client never changes server states
52 * pull never changes server states
52 * pull never changes server states
53 * publish and old server changesets are seen as public by client
53 * publish and old server changesets are seen as public by client
54 * any secret changeset seen in another repository is lowered to at
54 * any secret changeset seen in another repository is lowered to at
55 least draft
55 least draft
56
56
57 Here is the final table summing up the 49 possible use cases of phase
57 Here is the final table summing up the 49 possible use cases of phase
58 exchange:
58 exchange:
59
59
60 server
60 server
61 old publish non-publish
61 old publish non-publish
62 N X N D P N D P
62 N X N D P N D P
63 old client
63 old client
64 pull
64 pull
65 N - X/X - X/D X/P - X/D X/P
65 N - X/X - X/D X/P - X/D X/P
66 X - X/X - X/D X/P - X/D X/P
66 X - X/X - X/D X/P - X/D X/P
67 push
67 push
68 X X/X X/X X/P X/P X/P X/D X/D X/P
68 X X/X X/X X/P X/P X/P X/D X/D X/P
69 new client
69 new client
70 pull
70 pull
71 N - P/X - P/D P/P - D/D P/P
71 N - P/X - P/D P/P - D/D P/P
72 D - P/X - P/D P/P - D/D P/P
72 D - P/X - P/D P/P - D/D P/P
73 P - P/X - P/D P/P - P/D P/P
73 P - P/X - P/D P/P - P/D P/P
74 push
74 push
75 D P/X P/X P/P P/P P/P D/D D/D P/P
75 D P/X P/X P/P P/P P/P D/D D/D P/P
76 P P/X P/X P/P P/P P/P P/P P/P P/P
76 P P/X P/X P/P P/P P/P P/P P/P P/P
77
77
78 Legend:
78 Legend:
79
79
80 A/B = final state on client / state on server
80 A/B = final state on client / state on server
81
81
82 * N = new/not present,
82 * N = new/not present,
83 * P = public,
83 * P = public,
84 * D = draft,
84 * D = draft,
85 * X = not tracked (i.e., the old client or server has no internal
85 * X = not tracked (i.e., the old client or server has no internal
86 way of recording the phase.)
86 way of recording the phase.)
87
87
88 passive = only pushes
88 passive = only pushes
89
89
90
90
91 A cell here can be read like this:
91 A cell here can be read like this:
92
92
93 "When a new client pushes a draft changeset (D) to a publishing
93 "When a new client pushes a draft changeset (D) to a publishing
94 server where it's not present (N), it's marked public on both
94 server where it's not present (N), it's marked public on both
95 sides (P/P)."
95 sides (P/P)."
96
96
97 Note: old client behave as a publishing server with draft only content
97 Note: old client behave as a publishing server with draft only content
98 - other people see it as public
98 - other people see it as public
99 - content is pushed as draft
99 - content is pushed as draft
100
100
101 """
101 """
102
102
103 from __future__ import absolute_import
103 from __future__ import absolute_import
104
104
105 import errno
105 import errno
106 import struct
106
107
107 from .i18n import _
108 from .i18n import _
108 from .node import (
109 from .node import (
109 bin,
110 bin,
110 hex,
111 hex,
111 nullid,
112 nullid,
112 nullrev,
113 nullrev,
113 short,
114 short,
114 )
115 )
115 from . import (
116 from . import (
116 error,
117 error,
117 smartset,
118 smartset,
118 txnutil,
119 txnutil,
119 util,
120 util,
120 )
121 )
121
122
123 _fphasesentry = struct.Struct('>i20s')
124
122 allphases = public, draft, secret = range(3)
125 allphases = public, draft, secret = range(3)
123 trackedphases = allphases[1:]
126 trackedphases = allphases[1:]
124 phasenames = ['public', 'draft', 'secret']
127 phasenames = ['public', 'draft', 'secret']
125
128
126 def _readroots(repo, phasedefaults=None):
129 def _readroots(repo, phasedefaults=None):
127 """Read phase roots from disk
130 """Read phase roots from disk
128
131
129 phasedefaults is a list of fn(repo, roots) callable, which are
132 phasedefaults is a list of fn(repo, roots) callable, which are
130 executed if the phase roots file does not exist. When phases are
133 executed if the phase roots file does not exist. When phases are
131 being initialized on an existing repository, this could be used to
134 being initialized on an existing repository, this could be used to
132 set selected changesets phase to something else than public.
135 set selected changesets phase to something else than public.
133
136
134 Return (roots, dirty) where dirty is true if roots differ from
137 Return (roots, dirty) where dirty is true if roots differ from
135 what is being stored.
138 what is being stored.
136 """
139 """
137 repo = repo.unfiltered()
140 repo = repo.unfiltered()
138 dirty = False
141 dirty = False
139 roots = [set() for i in allphases]
142 roots = [set() for i in allphases]
140 try:
143 try:
141 f, pending = txnutil.trypending(repo.root, repo.svfs, 'phaseroots')
144 f, pending = txnutil.trypending(repo.root, repo.svfs, 'phaseroots')
142 try:
145 try:
143 for line in f:
146 for line in f:
144 phase, nh = line.split()
147 phase, nh = line.split()
145 roots[int(phase)].add(bin(nh))
148 roots[int(phase)].add(bin(nh))
146 finally:
149 finally:
147 f.close()
150 f.close()
148 except IOError as inst:
151 except IOError as inst:
149 if inst.errno != errno.ENOENT:
152 if inst.errno != errno.ENOENT:
150 raise
153 raise
151 if phasedefaults:
154 if phasedefaults:
152 for f in phasedefaults:
155 for f in phasedefaults:
153 roots = f(repo, roots)
156 roots = f(repo, roots)
154 dirty = True
157 dirty = True
155 return roots, dirty
158 return roots, dirty
156
159
160 def binaryencode(phasemapping):
161 """encode a 'phase -> nodes' mapping into a binary stream
162
163 Since phases are integer the mapping is actually a python list:
164 [[PUBLIC_HEADS], [DRAFTS_HEADS], [SECRET_HEADS]]
165 """
166 binarydata = []
167 for phase, nodes in enumerate(phasemapping):
168 for head in nodes:
169 binarydata.append(_fphasesentry.pack(phase, head))
170 return ''.join(binarydata)
171
157 def _trackphasechange(data, rev, old, new):
172 def _trackphasechange(data, rev, old, new):
158 """add a phase move the <data> dictionnary
173 """add a phase move the <data> dictionnary
159
174
160 If data is None, nothing happens.
175 If data is None, nothing happens.
161 """
176 """
162 if data is None:
177 if data is None:
163 return
178 return
164 existing = data.get(rev)
179 existing = data.get(rev)
165 if existing is not None:
180 if existing is not None:
166 old = existing[0]
181 old = existing[0]
167 data[rev] = (old, new)
182 data[rev] = (old, new)
168
183
169 class phasecache(object):
184 class phasecache(object):
170 def __init__(self, repo, phasedefaults, _load=True):
185 def __init__(self, repo, phasedefaults, _load=True):
171 if _load:
186 if _load:
172 # Cheap trick to allow shallow-copy without copy module
187 # Cheap trick to allow shallow-copy without copy module
173 self.phaseroots, self.dirty = _readroots(repo, phasedefaults)
188 self.phaseroots, self.dirty = _readroots(repo, phasedefaults)
174 self._phaserevs = None
189 self._phaserevs = None
175 self._phasesets = None
190 self._phasesets = None
176 self.filterunknown(repo)
191 self.filterunknown(repo)
177 self.opener = repo.svfs
192 self.opener = repo.svfs
178
193
179 def getrevset(self, repo, phases):
194 def getrevset(self, repo, phases):
180 """return a smartset for the given phases"""
195 """return a smartset for the given phases"""
181 self.loadphaserevs(repo) # ensure phase's sets are loaded
196 self.loadphaserevs(repo) # ensure phase's sets are loaded
182
197
183 if self._phasesets and all(self._phasesets[p] is not None
198 if self._phasesets and all(self._phasesets[p] is not None
184 for p in phases):
199 for p in phases):
185 # fast path - use _phasesets
200 # fast path - use _phasesets
186 revs = self._phasesets[phases[0]]
201 revs = self._phasesets[phases[0]]
187 if len(phases) > 1:
202 if len(phases) > 1:
188 revs = revs.copy() # only copy when needed
203 revs = revs.copy() # only copy when needed
189 for p in phases[1:]:
204 for p in phases[1:]:
190 revs.update(self._phasesets[p])
205 revs.update(self._phasesets[p])
191 if repo.changelog.filteredrevs:
206 if repo.changelog.filteredrevs:
192 revs = revs - repo.changelog.filteredrevs
207 revs = revs - repo.changelog.filteredrevs
193 return smartset.baseset(revs)
208 return smartset.baseset(revs)
194 else:
209 else:
195 # slow path - enumerate all revisions
210 # slow path - enumerate all revisions
196 phase = self.phase
211 phase = self.phase
197 revs = (r for r in repo if phase(repo, r) in phases)
212 revs = (r for r in repo if phase(repo, r) in phases)
198 return smartset.generatorset(revs, iterasc=True)
213 return smartset.generatorset(revs, iterasc=True)
199
214
200 def copy(self):
215 def copy(self):
201 # Shallow copy meant to ensure isolation in
216 # Shallow copy meant to ensure isolation in
202 # advance/retractboundary(), nothing more.
217 # advance/retractboundary(), nothing more.
203 ph = self.__class__(None, None, _load=False)
218 ph = self.__class__(None, None, _load=False)
204 ph.phaseroots = self.phaseroots[:]
219 ph.phaseroots = self.phaseroots[:]
205 ph.dirty = self.dirty
220 ph.dirty = self.dirty
206 ph.opener = self.opener
221 ph.opener = self.opener
207 ph._phaserevs = self._phaserevs
222 ph._phaserevs = self._phaserevs
208 ph._phasesets = self._phasesets
223 ph._phasesets = self._phasesets
209 return ph
224 return ph
210
225
211 def replace(self, phcache):
226 def replace(self, phcache):
212 """replace all values in 'self' with content of phcache"""
227 """replace all values in 'self' with content of phcache"""
213 for a in ('phaseroots', 'dirty', 'opener', '_phaserevs', '_phasesets'):
228 for a in ('phaseroots', 'dirty', 'opener', '_phaserevs', '_phasesets'):
214 setattr(self, a, getattr(phcache, a))
229 setattr(self, a, getattr(phcache, a))
215
230
216 def _getphaserevsnative(self, repo):
231 def _getphaserevsnative(self, repo):
217 repo = repo.unfiltered()
232 repo = repo.unfiltered()
218 nativeroots = []
233 nativeroots = []
219 for phase in trackedphases:
234 for phase in trackedphases:
220 nativeroots.append(map(repo.changelog.rev, self.phaseroots[phase]))
235 nativeroots.append(map(repo.changelog.rev, self.phaseroots[phase]))
221 return repo.changelog.computephases(nativeroots)
236 return repo.changelog.computephases(nativeroots)
222
237
223 def _computephaserevspure(self, repo):
238 def _computephaserevspure(self, repo):
224 repo = repo.unfiltered()
239 repo = repo.unfiltered()
225 revs = [public] * len(repo.changelog)
240 revs = [public] * len(repo.changelog)
226 self._phaserevs = revs
241 self._phaserevs = revs
227 self._populatephaseroots(repo)
242 self._populatephaseroots(repo)
228 for phase in trackedphases:
243 for phase in trackedphases:
229 roots = list(map(repo.changelog.rev, self.phaseroots[phase]))
244 roots = list(map(repo.changelog.rev, self.phaseroots[phase]))
230 if roots:
245 if roots:
231 for rev in roots:
246 for rev in roots:
232 revs[rev] = phase
247 revs[rev] = phase
233 for rev in repo.changelog.descendants(roots):
248 for rev in repo.changelog.descendants(roots):
234 revs[rev] = phase
249 revs[rev] = phase
235
250
236 def loadphaserevs(self, repo):
251 def loadphaserevs(self, repo):
237 """ensure phase information is loaded in the object"""
252 """ensure phase information is loaded in the object"""
238 if self._phaserevs is None:
253 if self._phaserevs is None:
239 try:
254 try:
240 res = self._getphaserevsnative(repo)
255 res = self._getphaserevsnative(repo)
241 self._phaserevs, self._phasesets = res
256 self._phaserevs, self._phasesets = res
242 except AttributeError:
257 except AttributeError:
243 self._computephaserevspure(repo)
258 self._computephaserevspure(repo)
244
259
245 def invalidate(self):
260 def invalidate(self):
246 self._phaserevs = None
261 self._phaserevs = None
247 self._phasesets = None
262 self._phasesets = None
248
263
249 def _populatephaseroots(self, repo):
264 def _populatephaseroots(self, repo):
250 """Fills the _phaserevs cache with phases for the roots.
265 """Fills the _phaserevs cache with phases for the roots.
251 """
266 """
252 cl = repo.changelog
267 cl = repo.changelog
253 phaserevs = self._phaserevs
268 phaserevs = self._phaserevs
254 for phase in trackedphases:
269 for phase in trackedphases:
255 roots = map(cl.rev, self.phaseroots[phase])
270 roots = map(cl.rev, self.phaseroots[phase])
256 for root in roots:
271 for root in roots:
257 phaserevs[root] = phase
272 phaserevs[root] = phase
258
273
259 def phase(self, repo, rev):
274 def phase(self, repo, rev):
260 # We need a repo argument here to be able to build _phaserevs
275 # We need a repo argument here to be able to build _phaserevs
261 # if necessary. The repository instance is not stored in
276 # if necessary. The repository instance is not stored in
262 # phasecache to avoid reference cycles. The changelog instance
277 # phasecache to avoid reference cycles. The changelog instance
263 # is not stored because it is a filecache() property and can
278 # is not stored because it is a filecache() property and can
264 # be replaced without us being notified.
279 # be replaced without us being notified.
265 if rev == nullrev:
280 if rev == nullrev:
266 return public
281 return public
267 if rev < nullrev:
282 if rev < nullrev:
268 raise ValueError(_('cannot lookup negative revision'))
283 raise ValueError(_('cannot lookup negative revision'))
269 if self._phaserevs is None or rev >= len(self._phaserevs):
284 if self._phaserevs is None or rev >= len(self._phaserevs):
270 self.invalidate()
285 self.invalidate()
271 self.loadphaserevs(repo)
286 self.loadphaserevs(repo)
272 return self._phaserevs[rev]
287 return self._phaserevs[rev]
273
288
274 def write(self):
289 def write(self):
275 if not self.dirty:
290 if not self.dirty:
276 return
291 return
277 f = self.opener('phaseroots', 'w', atomictemp=True, checkambig=True)
292 f = self.opener('phaseroots', 'w', atomictemp=True, checkambig=True)
278 try:
293 try:
279 self._write(f)
294 self._write(f)
280 finally:
295 finally:
281 f.close()
296 f.close()
282
297
283 def _write(self, fp):
298 def _write(self, fp):
284 for phase, roots in enumerate(self.phaseroots):
299 for phase, roots in enumerate(self.phaseroots):
285 for h in roots:
300 for h in roots:
286 fp.write('%i %s\n' % (phase, hex(h)))
301 fp.write('%i %s\n' % (phase, hex(h)))
287 self.dirty = False
302 self.dirty = False
288
303
289 def _updateroots(self, phase, newroots, tr):
304 def _updateroots(self, phase, newroots, tr):
290 self.phaseroots[phase] = newroots
305 self.phaseroots[phase] = newroots
291 self.invalidate()
306 self.invalidate()
292 self.dirty = True
307 self.dirty = True
293
308
294 tr.addfilegenerator('phase', ('phaseroots',), self._write)
309 tr.addfilegenerator('phase', ('phaseroots',), self._write)
295 tr.hookargs['phases_moved'] = '1'
310 tr.hookargs['phases_moved'] = '1'
296
311
297 def registernew(self, repo, tr, targetphase, nodes):
312 def registernew(self, repo, tr, targetphase, nodes):
298 repo = repo.unfiltered()
313 repo = repo.unfiltered()
299 self._retractboundary(repo, tr, targetphase, nodes)
314 self._retractboundary(repo, tr, targetphase, nodes)
300 if tr is not None and 'phases' in tr.changes:
315 if tr is not None and 'phases' in tr.changes:
301 phasetracking = tr.changes['phases']
316 phasetracking = tr.changes['phases']
302 torev = repo.changelog.rev
317 torev = repo.changelog.rev
303 phase = self.phase
318 phase = self.phase
304 for n in nodes:
319 for n in nodes:
305 rev = torev(n)
320 rev = torev(n)
306 revphase = phase(repo, rev)
321 revphase = phase(repo, rev)
307 _trackphasechange(phasetracking, rev, None, revphase)
322 _trackphasechange(phasetracking, rev, None, revphase)
308 repo.invalidatevolatilesets()
323 repo.invalidatevolatilesets()
309
324
310 def advanceboundary(self, repo, tr, targetphase, nodes):
325 def advanceboundary(self, repo, tr, targetphase, nodes):
311 """Set all 'nodes' to phase 'targetphase'
326 """Set all 'nodes' to phase 'targetphase'
312
327
313 Nodes with a phase lower than 'targetphase' are not affected.
328 Nodes with a phase lower than 'targetphase' are not affected.
314 """
329 """
315 # Be careful to preserve shallow-copied values: do not update
330 # Be careful to preserve shallow-copied values: do not update
316 # phaseroots values, replace them.
331 # phaseroots values, replace them.
317 if tr is None:
332 if tr is None:
318 phasetracking = None
333 phasetracking = None
319 else:
334 else:
320 phasetracking = tr.changes.get('phases')
335 phasetracking = tr.changes.get('phases')
321
336
322 repo = repo.unfiltered()
337 repo = repo.unfiltered()
323
338
324 delroots = [] # set of root deleted by this path
339 delroots = [] # set of root deleted by this path
325 for phase in xrange(targetphase + 1, len(allphases)):
340 for phase in xrange(targetphase + 1, len(allphases)):
326 # filter nodes that are not in a compatible phase already
341 # filter nodes that are not in a compatible phase already
327 nodes = [n for n in nodes
342 nodes = [n for n in nodes
328 if self.phase(repo, repo[n].rev()) >= phase]
343 if self.phase(repo, repo[n].rev()) >= phase]
329 if not nodes:
344 if not nodes:
330 break # no roots to move anymore
345 break # no roots to move anymore
331
346
332 olds = self.phaseroots[phase]
347 olds = self.phaseroots[phase]
333
348
334 affected = repo.revs('%ln::%ln', olds, nodes)
349 affected = repo.revs('%ln::%ln', olds, nodes)
335 for r in affected:
350 for r in affected:
336 _trackphasechange(phasetracking, r, self.phase(repo, r),
351 _trackphasechange(phasetracking, r, self.phase(repo, r),
337 targetphase)
352 targetphase)
338
353
339 roots = set(ctx.node() for ctx in repo.set(
354 roots = set(ctx.node() for ctx in repo.set(
340 'roots((%ln::) - %ld)', olds, affected))
355 'roots((%ln::) - %ld)', olds, affected))
341 if olds != roots:
356 if olds != roots:
342 self._updateroots(phase, roots, tr)
357 self._updateroots(phase, roots, tr)
343 # some roots may need to be declared for lower phases
358 # some roots may need to be declared for lower phases
344 delroots.extend(olds - roots)
359 delroots.extend(olds - roots)
345 # declare deleted root in the target phase
360 # declare deleted root in the target phase
346 if targetphase != 0:
361 if targetphase != 0:
347 self._retractboundary(repo, tr, targetphase, delroots)
362 self._retractboundary(repo, tr, targetphase, delroots)
348 repo.invalidatevolatilesets()
363 repo.invalidatevolatilesets()
349
364
350 def retractboundary(self, repo, tr, targetphase, nodes):
365 def retractboundary(self, repo, tr, targetphase, nodes):
351 oldroots = self.phaseroots[:targetphase + 1]
366 oldroots = self.phaseroots[:targetphase + 1]
352 if tr is None:
367 if tr is None:
353 phasetracking = None
368 phasetracking = None
354 else:
369 else:
355 phasetracking = tr.changes.get('phases')
370 phasetracking = tr.changes.get('phases')
356 repo = repo.unfiltered()
371 repo = repo.unfiltered()
357 if (self._retractboundary(repo, tr, targetphase, nodes)
372 if (self._retractboundary(repo, tr, targetphase, nodes)
358 and phasetracking is not None):
373 and phasetracking is not None):
359
374
360 # find the affected revisions
375 # find the affected revisions
361 new = self.phaseroots[targetphase]
376 new = self.phaseroots[targetphase]
362 old = oldroots[targetphase]
377 old = oldroots[targetphase]
363 affected = set(repo.revs('(%ln::) - (%ln::)', new, old))
378 affected = set(repo.revs('(%ln::) - (%ln::)', new, old))
364
379
365 # find the phase of the affected revision
380 # find the phase of the affected revision
366 for phase in xrange(targetphase, -1, -1):
381 for phase in xrange(targetphase, -1, -1):
367 if phase:
382 if phase:
368 roots = oldroots[phase]
383 roots = oldroots[phase]
369 revs = set(repo.revs('%ln::%ld', roots, affected))
384 revs = set(repo.revs('%ln::%ld', roots, affected))
370 affected -= revs
385 affected -= revs
371 else: # public phase
386 else: # public phase
372 revs = affected
387 revs = affected
373 for r in revs:
388 for r in revs:
374 _trackphasechange(phasetracking, r, phase, targetphase)
389 _trackphasechange(phasetracking, r, phase, targetphase)
375 repo.invalidatevolatilesets()
390 repo.invalidatevolatilesets()
376
391
377 def _retractboundary(self, repo, tr, targetphase, nodes):
392 def _retractboundary(self, repo, tr, targetphase, nodes):
378 # Be careful to preserve shallow-copied values: do not update
393 # Be careful to preserve shallow-copied values: do not update
379 # phaseroots values, replace them.
394 # phaseroots values, replace them.
380
395
381 repo = repo.unfiltered()
396 repo = repo.unfiltered()
382 currentroots = self.phaseroots[targetphase]
397 currentroots = self.phaseroots[targetphase]
383 finalroots = oldroots = set(currentroots)
398 finalroots = oldroots = set(currentroots)
384 newroots = [n for n in nodes
399 newroots = [n for n in nodes
385 if self.phase(repo, repo[n].rev()) < targetphase]
400 if self.phase(repo, repo[n].rev()) < targetphase]
386 if newroots:
401 if newroots:
387
402
388 if nullid in newroots:
403 if nullid in newroots:
389 raise error.Abort(_('cannot change null revision phase'))
404 raise error.Abort(_('cannot change null revision phase'))
390 currentroots = currentroots.copy()
405 currentroots = currentroots.copy()
391 currentroots.update(newroots)
406 currentroots.update(newroots)
392
407
393 # Only compute new roots for revs above the roots that are being
408 # Only compute new roots for revs above the roots that are being
394 # retracted.
409 # retracted.
395 minnewroot = min(repo[n].rev() for n in newroots)
410 minnewroot = min(repo[n].rev() for n in newroots)
396 aboveroots = [n for n in currentroots
411 aboveroots = [n for n in currentroots
397 if repo[n].rev() >= minnewroot]
412 if repo[n].rev() >= minnewroot]
398 updatedroots = repo.set('roots(%ln::)', aboveroots)
413 updatedroots = repo.set('roots(%ln::)', aboveroots)
399
414
400 finalroots = set(n for n in currentroots if repo[n].rev() <
415 finalroots = set(n for n in currentroots if repo[n].rev() <
401 minnewroot)
416 minnewroot)
402 finalroots.update(ctx.node() for ctx in updatedroots)
417 finalroots.update(ctx.node() for ctx in updatedroots)
403 if finalroots != oldroots:
418 if finalroots != oldroots:
404 self._updateroots(targetphase, finalroots, tr)
419 self._updateroots(targetphase, finalroots, tr)
405 return True
420 return True
406 return False
421 return False
407
422
408 def filterunknown(self, repo):
423 def filterunknown(self, repo):
409 """remove unknown nodes from the phase boundary
424 """remove unknown nodes from the phase boundary
410
425
411 Nothing is lost as unknown nodes only hold data for their descendants.
426 Nothing is lost as unknown nodes only hold data for their descendants.
412 """
427 """
413 filtered = False
428 filtered = False
414 nodemap = repo.changelog.nodemap # to filter unknown nodes
429 nodemap = repo.changelog.nodemap # to filter unknown nodes
415 for phase, nodes in enumerate(self.phaseroots):
430 for phase, nodes in enumerate(self.phaseroots):
416 missing = sorted(node for node in nodes if node not in nodemap)
431 missing = sorted(node for node in nodes if node not in nodemap)
417 if missing:
432 if missing:
418 for mnode in missing:
433 for mnode in missing:
419 repo.ui.debug(
434 repo.ui.debug(
420 'removing unknown node %s from %i-phase boundary\n'
435 'removing unknown node %s from %i-phase boundary\n'
421 % (short(mnode), phase))
436 % (short(mnode), phase))
422 nodes.symmetric_difference_update(missing)
437 nodes.symmetric_difference_update(missing)
423 filtered = True
438 filtered = True
424 if filtered:
439 if filtered:
425 self.dirty = True
440 self.dirty = True
426 # filterunknown is called by repo.destroyed, we may have no changes in
441 # filterunknown is called by repo.destroyed, we may have no changes in
427 # root but phaserevs contents is certainly invalid (or at least we
442 # root but phaserevs contents is certainly invalid (or at least we
428 # have not proper way to check that). related to issue 3858.
443 # have not proper way to check that). related to issue 3858.
429 #
444 #
430 # The other caller is __init__ that have no _phaserevs initialized
445 # The other caller is __init__ that have no _phaserevs initialized
431 # anyway. If this change we should consider adding a dedicated
446 # anyway. If this change we should consider adding a dedicated
432 # "destroyed" function to phasecache or a proper cache key mechanism
447 # "destroyed" function to phasecache or a proper cache key mechanism
433 # (see branchmap one)
448 # (see branchmap one)
434 self.invalidate()
449 self.invalidate()
435
450
436 def advanceboundary(repo, tr, targetphase, nodes):
451 def advanceboundary(repo, tr, targetphase, nodes):
437 """Add nodes to a phase changing other nodes phases if necessary.
452 """Add nodes to a phase changing other nodes phases if necessary.
438
453
439 This function move boundary *forward* this means that all nodes
454 This function move boundary *forward* this means that all nodes
440 are set in the target phase or kept in a *lower* phase.
455 are set in the target phase or kept in a *lower* phase.
441
456
442 Simplify boundary to contains phase roots only."""
457 Simplify boundary to contains phase roots only."""
443 phcache = repo._phasecache.copy()
458 phcache = repo._phasecache.copy()
444 phcache.advanceboundary(repo, tr, targetphase, nodes)
459 phcache.advanceboundary(repo, tr, targetphase, nodes)
445 repo._phasecache.replace(phcache)
460 repo._phasecache.replace(phcache)
446
461
447 def retractboundary(repo, tr, targetphase, nodes):
462 def retractboundary(repo, tr, targetphase, nodes):
448 """Set nodes back to a phase changing other nodes phases if
463 """Set nodes back to a phase changing other nodes phases if
449 necessary.
464 necessary.
450
465
451 This function move boundary *backward* this means that all nodes
466 This function move boundary *backward* this means that all nodes
452 are set in the target phase or kept in a *higher* phase.
467 are set in the target phase or kept in a *higher* phase.
453
468
454 Simplify boundary to contains phase roots only."""
469 Simplify boundary to contains phase roots only."""
455 phcache = repo._phasecache.copy()
470 phcache = repo._phasecache.copy()
456 phcache.retractboundary(repo, tr, targetphase, nodes)
471 phcache.retractboundary(repo, tr, targetphase, nodes)
457 repo._phasecache.replace(phcache)
472 repo._phasecache.replace(phcache)
458
473
459 def registernew(repo, tr, targetphase, nodes):
474 def registernew(repo, tr, targetphase, nodes):
460 """register a new revision and its phase
475 """register a new revision and its phase
461
476
462 Code adding revisions to the repository should use this function to
477 Code adding revisions to the repository should use this function to
463 set new changeset in their target phase (or higher).
478 set new changeset in their target phase (or higher).
464 """
479 """
465 phcache = repo._phasecache.copy()
480 phcache = repo._phasecache.copy()
466 phcache.registernew(repo, tr, targetphase, nodes)
481 phcache.registernew(repo, tr, targetphase, nodes)
467 repo._phasecache.replace(phcache)
482 repo._phasecache.replace(phcache)
468
483
469 def listphases(repo):
484 def listphases(repo):
470 """List phases root for serialization over pushkey"""
485 """List phases root for serialization over pushkey"""
471 # Use ordered dictionary so behavior is deterministic.
486 # Use ordered dictionary so behavior is deterministic.
472 keys = util.sortdict()
487 keys = util.sortdict()
473 value = '%i' % draft
488 value = '%i' % draft
474 for root in repo._phasecache.phaseroots[draft]:
489 for root in repo._phasecache.phaseroots[draft]:
475 keys[hex(root)] = value
490 keys[hex(root)] = value
476
491
477 if repo.publishing():
492 if repo.publishing():
478 # Add an extra data to let remote know we are a publishing
493 # Add an extra data to let remote know we are a publishing
479 # repo. Publishing repo can't just pretend they are old repo.
494 # repo. Publishing repo can't just pretend they are old repo.
480 # When pushing to a publishing repo, the client still need to
495 # When pushing to a publishing repo, the client still need to
481 # push phase boundary
496 # push phase boundary
482 #
497 #
483 # Push do not only push changeset. It also push phase data.
498 # Push do not only push changeset. It also push phase data.
484 # New phase data may apply to common changeset which won't be
499 # New phase data may apply to common changeset which won't be
485 # push (as they are common). Here is a very simple example:
500 # push (as they are common). Here is a very simple example:
486 #
501 #
487 # 1) repo A push changeset X as draft to repo B
502 # 1) repo A push changeset X as draft to repo B
488 # 2) repo B make changeset X public
503 # 2) repo B make changeset X public
489 # 3) repo B push to repo A. X is not pushed but the data that
504 # 3) repo B push to repo A. X is not pushed but the data that
490 # X as now public should
505 # X as now public should
491 #
506 #
492 # The server can't handle it on it's own as it has no idea of
507 # The server can't handle it on it's own as it has no idea of
493 # client phase data.
508 # client phase data.
494 keys['publishing'] = 'True'
509 keys['publishing'] = 'True'
495 return keys
510 return keys
496
511
497 def pushphase(repo, nhex, oldphasestr, newphasestr):
512 def pushphase(repo, nhex, oldphasestr, newphasestr):
498 """List phases root for serialization over pushkey"""
513 """List phases root for serialization over pushkey"""
499 repo = repo.unfiltered()
514 repo = repo.unfiltered()
500 with repo.lock():
515 with repo.lock():
501 currentphase = repo[nhex].phase()
516 currentphase = repo[nhex].phase()
502 newphase = abs(int(newphasestr)) # let's avoid negative index surprise
517 newphase = abs(int(newphasestr)) # let's avoid negative index surprise
503 oldphase = abs(int(oldphasestr)) # let's avoid negative index surprise
518 oldphase = abs(int(oldphasestr)) # let's avoid negative index surprise
504 if currentphase == oldphase and newphase < oldphase:
519 if currentphase == oldphase and newphase < oldphase:
505 with repo.transaction('pushkey-phase') as tr:
520 with repo.transaction('pushkey-phase') as tr:
506 advanceboundary(repo, tr, newphase, [bin(nhex)])
521 advanceboundary(repo, tr, newphase, [bin(nhex)])
507 return True
522 return True
508 elif currentphase == newphase:
523 elif currentphase == newphase:
509 # raced, but got correct result
524 # raced, but got correct result
510 return True
525 return True
511 else:
526 else:
512 return False
527 return False
513
528
514 def subsetphaseheads(repo, subset):
529 def subsetphaseheads(repo, subset):
515 """Finds the phase heads for a subset of a history
530 """Finds the phase heads for a subset of a history
516
531
517 Returns a list indexed by phase number where each item is a list of phase
532 Returns a list indexed by phase number where each item is a list of phase
518 head nodes.
533 head nodes.
519 """
534 """
520 cl = repo.changelog
535 cl = repo.changelog
521
536
522 headsbyphase = [[] for i in allphases]
537 headsbyphase = [[] for i in allphases]
523 # No need to keep track of secret phase; any heads in the subset that
538 # No need to keep track of secret phase; any heads in the subset that
524 # are not mentioned are implicitly secret.
539 # are not mentioned are implicitly secret.
525 for phase in allphases[:-1]:
540 for phase in allphases[:-1]:
526 revset = "heads(%%ln & %s())" % phasenames[phase]
541 revset = "heads(%%ln & %s())" % phasenames[phase]
527 headsbyphase[phase] = [cl.node(r) for r in repo.revs(revset, subset)]
542 headsbyphase[phase] = [cl.node(r) for r in repo.revs(revset, subset)]
528 return headsbyphase
543 return headsbyphase
529
544
530 def updatephases(repo, tr, headsbyphase):
545 def updatephases(repo, tr, headsbyphase):
531 """Updates the repo with the given phase heads"""
546 """Updates the repo with the given phase heads"""
532 # Now advance phase boundaries of all but secret phase
547 # Now advance phase boundaries of all but secret phase
533 for phase in allphases[:-1]:
548 for phase in allphases[:-1]:
534 advanceboundary(repo, tr, phase, headsbyphase[phase])
549 advanceboundary(repo, tr, phase, headsbyphase[phase])
535
550
536 def analyzeremotephases(repo, subset, roots):
551 def analyzeremotephases(repo, subset, roots):
537 """Compute phases heads and root in a subset of node from root dict
552 """Compute phases heads and root in a subset of node from root dict
538
553
539 * subset is heads of the subset
554 * subset is heads of the subset
540 * roots is {<nodeid> => phase} mapping. key and value are string.
555 * roots is {<nodeid> => phase} mapping. key and value are string.
541
556
542 Accept unknown element input
557 Accept unknown element input
543 """
558 """
544 repo = repo.unfiltered()
559 repo = repo.unfiltered()
545 # build list from dictionary
560 # build list from dictionary
546 draftroots = []
561 draftroots = []
547 nodemap = repo.changelog.nodemap # to filter unknown nodes
562 nodemap = repo.changelog.nodemap # to filter unknown nodes
548 for nhex, phase in roots.iteritems():
563 for nhex, phase in roots.iteritems():
549 if nhex == 'publishing': # ignore data related to publish option
564 if nhex == 'publishing': # ignore data related to publish option
550 continue
565 continue
551 node = bin(nhex)
566 node = bin(nhex)
552 phase = int(phase)
567 phase = int(phase)
553 if phase == public:
568 if phase == public:
554 if node != nullid:
569 if node != nullid:
555 repo.ui.warn(_('ignoring inconsistent public root'
570 repo.ui.warn(_('ignoring inconsistent public root'
556 ' from remote: %s\n') % nhex)
571 ' from remote: %s\n') % nhex)
557 elif phase == draft:
572 elif phase == draft:
558 if node in nodemap:
573 if node in nodemap:
559 draftroots.append(node)
574 draftroots.append(node)
560 else:
575 else:
561 repo.ui.warn(_('ignoring unexpected root from remote: %i %s\n')
576 repo.ui.warn(_('ignoring unexpected root from remote: %i %s\n')
562 % (phase, nhex))
577 % (phase, nhex))
563 # compute heads
578 # compute heads
564 publicheads = newheads(repo, subset, draftroots)
579 publicheads = newheads(repo, subset, draftroots)
565 return publicheads, draftroots
580 return publicheads, draftroots
566
581
567 def newheads(repo, heads, roots):
582 def newheads(repo, heads, roots):
568 """compute new head of a subset minus another
583 """compute new head of a subset minus another
569
584
570 * `heads`: define the first subset
585 * `heads`: define the first subset
571 * `roots`: define the second we subtract from the first"""
586 * `roots`: define the second we subtract from the first"""
572 repo = repo.unfiltered()
587 repo = repo.unfiltered()
573 revset = repo.set('heads((%ln + parents(%ln)) - (%ln::%ln))',
588 revset = repo.set('heads((%ln + parents(%ln)) - (%ln::%ln))',
574 heads, roots, roots, heads)
589 heads, roots, roots, heads)
575 return [c.node() for c in revset]
590 return [c.node() for c in revset]
576
591
577
592
578 def newcommitphase(ui):
593 def newcommitphase(ui):
579 """helper to get the target phase of new commit
594 """helper to get the target phase of new commit
580
595
581 Handle all possible values for the phases.new-commit options.
596 Handle all possible values for the phases.new-commit options.
582
597
583 """
598 """
584 v = ui.config('phases', 'new-commit', draft)
599 v = ui.config('phases', 'new-commit', draft)
585 try:
600 try:
586 return phasenames.index(v)
601 return phasenames.index(v)
587 except ValueError:
602 except ValueError:
588 try:
603 try:
589 return int(v)
604 return int(v)
590 except ValueError:
605 except ValueError:
591 msg = _("phases.new-commit: not a valid phase name ('%s')")
606 msg = _("phases.new-commit: not a valid phase name ('%s')")
592 raise error.ConfigError(msg % v)
607 raise error.ConfigError(msg % v)
593
608
594 def hassecret(repo):
609 def hassecret(repo):
595 """utility function that check if a repo have any secret changeset."""
610 """utility function that check if a repo have any secret changeset."""
596 return bool(repo._phasecache.phaseroots[2])
611 return bool(repo._phasecache.phaseroots[2])
General Comments 0
You need to be logged in to leave comments. Login now