##// END OF EJS Templates
bundles: turn nbchanges int into a bytestr using pycompat.bytestr...
Augie Fackler -
r34218:8e035802 default
parent child Browse files
Show More
@@ -1,1917 +1,1917 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 phases,
161 phases,
162 pushkey,
162 pushkey,
163 pycompat,
163 pycompat,
164 tags,
164 tags,
165 url,
165 url,
166 util,
166 util,
167 )
167 )
168
168
169 urlerr = util.urlerr
169 urlerr = util.urlerr
170 urlreq = util.urlreq
170 urlreq = util.urlreq
171
171
172 _pack = struct.pack
172 _pack = struct.pack
173 _unpack = struct.unpack
173 _unpack = struct.unpack
174
174
175 _fstreamparamsize = '>i'
175 _fstreamparamsize = '>i'
176 _fpartheadersize = '>i'
176 _fpartheadersize = '>i'
177 _fparttypesize = '>B'
177 _fparttypesize = '>B'
178 _fpartid = '>I'
178 _fpartid = '>I'
179 _fpayloadsize = '>i'
179 _fpayloadsize = '>i'
180 _fpartparamcount = '>BB'
180 _fpartparamcount = '>BB'
181
181
182 _fphasesentry = '>i20s'
182 _fphasesentry = '>i20s'
183
183
184 preferedchunksize = 4096
184 preferedchunksize = 4096
185
185
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187
187
188 def outdebug(ui, message):
188 def outdebug(ui, message):
189 """debug regarding output stream (bundling)"""
189 """debug regarding output stream (bundling)"""
190 if ui.configbool('devel', 'bundle2.debug'):
190 if ui.configbool('devel', 'bundle2.debug'):
191 ui.debug('bundle2-output: %s\n' % message)
191 ui.debug('bundle2-output: %s\n' % message)
192
192
193 def indebug(ui, message):
193 def indebug(ui, message):
194 """debug on input stream (unbundling)"""
194 """debug on input stream (unbundling)"""
195 if ui.configbool('devel', 'bundle2.debug'):
195 if ui.configbool('devel', 'bundle2.debug'):
196 ui.debug('bundle2-input: %s\n' % message)
196 ui.debug('bundle2-input: %s\n' % message)
197
197
198 def validateparttype(parttype):
198 def validateparttype(parttype):
199 """raise ValueError if a parttype contains invalid character"""
199 """raise ValueError if a parttype contains invalid character"""
200 if _parttypeforbidden.search(parttype):
200 if _parttypeforbidden.search(parttype):
201 raise ValueError(parttype)
201 raise ValueError(parttype)
202
202
203 def _makefpartparamsizes(nbparams):
203 def _makefpartparamsizes(nbparams):
204 """return a struct format to read part parameter sizes
204 """return a struct format to read part parameter sizes
205
205
206 The number parameters is variable so we need to build that format
206 The number parameters is variable so we need to build that format
207 dynamically.
207 dynamically.
208 """
208 """
209 return '>'+('BB'*nbparams)
209 return '>'+('BB'*nbparams)
210
210
211 parthandlermapping = {}
211 parthandlermapping = {}
212
212
213 def parthandler(parttype, params=()):
213 def parthandler(parttype, params=()):
214 """decorator that register a function as a bundle2 part handler
214 """decorator that register a function as a bundle2 part handler
215
215
216 eg::
216 eg::
217
217
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 def myparttypehandler(...):
219 def myparttypehandler(...):
220 '''process a part of type "my part".'''
220 '''process a part of type "my part".'''
221 ...
221 ...
222 """
222 """
223 validateparttype(parttype)
223 validateparttype(parttype)
224 def _decorator(func):
224 def _decorator(func):
225 lparttype = parttype.lower() # enforce lower case matching.
225 lparttype = parttype.lower() # enforce lower case matching.
226 assert lparttype not in parthandlermapping
226 assert lparttype not in parthandlermapping
227 parthandlermapping[lparttype] = func
227 parthandlermapping[lparttype] = func
228 func.params = frozenset(params)
228 func.params = frozenset(params)
229 return func
229 return func
230 return _decorator
230 return _decorator
231
231
232 class unbundlerecords(object):
232 class unbundlerecords(object):
233 """keep record of what happens during and unbundle
233 """keep record of what happens during and unbundle
234
234
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 category of record and obj is an arbitrary object.
236 category of record and obj is an arbitrary object.
237
237
238 `records['cat']` will return all entries of this category 'cat'.
238 `records['cat']` will return all entries of this category 'cat'.
239
239
240 Iterating on the object itself will yield `('category', obj)` tuples
240 Iterating on the object itself will yield `('category', obj)` tuples
241 for all entries.
241 for all entries.
242
242
243 All iterations happens in chronological order.
243 All iterations happens in chronological order.
244 """
244 """
245
245
246 def __init__(self):
246 def __init__(self):
247 self._categories = {}
247 self._categories = {}
248 self._sequences = []
248 self._sequences = []
249 self._replies = {}
249 self._replies = {}
250
250
251 def add(self, category, entry, inreplyto=None):
251 def add(self, category, entry, inreplyto=None):
252 """add a new record of a given category.
252 """add a new record of a given category.
253
253
254 The entry can then be retrieved in the list returned by
254 The entry can then be retrieved in the list returned by
255 self['category']."""
255 self['category']."""
256 self._categories.setdefault(category, []).append(entry)
256 self._categories.setdefault(category, []).append(entry)
257 self._sequences.append((category, entry))
257 self._sequences.append((category, entry))
258 if inreplyto is not None:
258 if inreplyto is not None:
259 self.getreplies(inreplyto).add(category, entry)
259 self.getreplies(inreplyto).add(category, entry)
260
260
261 def getreplies(self, partid):
261 def getreplies(self, partid):
262 """get the records that are replies to a specific part"""
262 """get the records that are replies to a specific part"""
263 return self._replies.setdefault(partid, unbundlerecords())
263 return self._replies.setdefault(partid, unbundlerecords())
264
264
265 def __getitem__(self, cat):
265 def __getitem__(self, cat):
266 return tuple(self._categories.get(cat, ()))
266 return tuple(self._categories.get(cat, ()))
267
267
268 def __iter__(self):
268 def __iter__(self):
269 return iter(self._sequences)
269 return iter(self._sequences)
270
270
271 def __len__(self):
271 def __len__(self):
272 return len(self._sequences)
272 return len(self._sequences)
273
273
274 def __nonzero__(self):
274 def __nonzero__(self):
275 return bool(self._sequences)
275 return bool(self._sequences)
276
276
277 __bool__ = __nonzero__
277 __bool__ = __nonzero__
278
278
279 class bundleoperation(object):
279 class bundleoperation(object):
280 """an object that represents a single bundling process
280 """an object that represents a single bundling process
281
281
282 Its purpose is to carry unbundle-related objects and states.
282 Its purpose is to carry unbundle-related objects and states.
283
283
284 A new object should be created at the beginning of each bundle processing.
284 A new object should be created at the beginning of each bundle processing.
285 The object is to be returned by the processing function.
285 The object is to be returned by the processing function.
286
286
287 The object has very little content now it will ultimately contain:
287 The object has very little content now it will ultimately contain:
288 * an access to the repo the bundle is applied to,
288 * an access to the repo the bundle is applied to,
289 * a ui object,
289 * a ui object,
290 * a way to retrieve a transaction to add changes to the repo,
290 * a way to retrieve a transaction to add changes to the repo,
291 * a way to record the result of processing each part,
291 * a way to record the result of processing each part,
292 * a way to construct a bundle response when applicable.
292 * a way to construct a bundle response when applicable.
293 """
293 """
294
294
295 def __init__(self, repo, transactiongetter, captureoutput=True):
295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 self.repo = repo
296 self.repo = repo
297 self.ui = repo.ui
297 self.ui = repo.ui
298 self.records = unbundlerecords()
298 self.records = unbundlerecords()
299 self.reply = None
299 self.reply = None
300 self.captureoutput = captureoutput
300 self.captureoutput = captureoutput
301 self.hookargs = {}
301 self.hookargs = {}
302 self._gettransaction = transactiongetter
302 self._gettransaction = transactiongetter
303
303
304 def gettransaction(self):
304 def gettransaction(self):
305 transaction = self._gettransaction()
305 transaction = self._gettransaction()
306
306
307 if self.hookargs:
307 if self.hookargs:
308 # the ones added to the transaction supercede those added
308 # the ones added to the transaction supercede those added
309 # to the operation.
309 # to the operation.
310 self.hookargs.update(transaction.hookargs)
310 self.hookargs.update(transaction.hookargs)
311 transaction.hookargs = self.hookargs
311 transaction.hookargs = self.hookargs
312
312
313 # mark the hookargs as flushed. further attempts to add to
313 # mark the hookargs as flushed. further attempts to add to
314 # hookargs will result in an abort.
314 # hookargs will result in an abort.
315 self.hookargs = None
315 self.hookargs = None
316
316
317 return transaction
317 return transaction
318
318
319 def addhookargs(self, hookargs):
319 def addhookargs(self, hookargs):
320 if self.hookargs is None:
320 if self.hookargs is None:
321 raise error.ProgrammingError('attempted to add hookargs to '
321 raise error.ProgrammingError('attempted to add hookargs to '
322 'operation after transaction started')
322 'operation after transaction started')
323 self.hookargs.update(hookargs)
323 self.hookargs.update(hookargs)
324
324
325 class TransactionUnavailable(RuntimeError):
325 class TransactionUnavailable(RuntimeError):
326 pass
326 pass
327
327
328 def _notransaction():
328 def _notransaction():
329 """default method to get a transaction while processing a bundle
329 """default method to get a transaction while processing a bundle
330
330
331 Raise an exception to highlight the fact that no transaction was expected
331 Raise an exception to highlight the fact that no transaction was expected
332 to be created"""
332 to be created"""
333 raise TransactionUnavailable()
333 raise TransactionUnavailable()
334
334
335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
336 # transform me into unbundler.apply() as soon as the freeze is lifted
336 # transform me into unbundler.apply() as soon as the freeze is lifted
337 if isinstance(unbundler, unbundle20):
337 if isinstance(unbundler, unbundle20):
338 tr.hookargs['bundle2'] = '1'
338 tr.hookargs['bundle2'] = '1'
339 if source is not None and 'source' not in tr.hookargs:
339 if source is not None and 'source' not in tr.hookargs:
340 tr.hookargs['source'] = source
340 tr.hookargs['source'] = source
341 if url is not None and 'url' not in tr.hookargs:
341 if url is not None and 'url' not in tr.hookargs:
342 tr.hookargs['url'] = url
342 tr.hookargs['url'] = url
343 return processbundle(repo, unbundler, lambda: tr)
343 return processbundle(repo, unbundler, lambda: tr)
344 else:
344 else:
345 # the transactiongetter won't be used, but we might as well set it
345 # the transactiongetter won't be used, but we might as well set it
346 op = bundleoperation(repo, lambda: tr)
346 op = bundleoperation(repo, lambda: tr)
347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
348 return op
348 return op
349
349
350 class partiterator(object):
350 class partiterator(object):
351 def __init__(self, repo, op, unbundler):
351 def __init__(self, repo, op, unbundler):
352 self.repo = repo
352 self.repo = repo
353 self.op = op
353 self.op = op
354 self.unbundler = unbundler
354 self.unbundler = unbundler
355 self.iterator = None
355 self.iterator = None
356 self.count = 0
356 self.count = 0
357
357
358 def __enter__(self):
358 def __enter__(self):
359 def func():
359 def func():
360 itr = enumerate(self.unbundler.iterparts())
360 itr = enumerate(self.unbundler.iterparts())
361 for count, p in itr:
361 for count, p in itr:
362 self.count = count
362 self.count = count
363 yield p
363 yield p
364 self.iterator = func()
364 self.iterator = func()
365 return self.iterator
365 return self.iterator
366
366
367 def __exit__(self, type, exc, tb):
367 def __exit__(self, type, exc, tb):
368 if not self.iterator:
368 if not self.iterator:
369 return
369 return
370
370
371 if exc:
371 if exc:
372 # Any exceptions seeking to the end of the bundle at this point are
372 # Any exceptions seeking to the end of the bundle at this point are
373 # almost certainly related to the underlying stream being bad.
373 # almost certainly related to the underlying stream being bad.
374 # And, chances are that the exception we're handling is related to
374 # And, chances are that the exception we're handling is related to
375 # getting in that bad state. So, we swallow the seeking error and
375 # getting in that bad state. So, we swallow the seeking error and
376 # re-raise the original error.
376 # re-raise the original error.
377 seekerror = False
377 seekerror = False
378 try:
378 try:
379 for part in self.iterator:
379 for part in self.iterator:
380 # consume the bundle content
380 # consume the bundle content
381 part.seek(0, 2)
381 part.seek(0, 2)
382 except Exception:
382 except Exception:
383 seekerror = True
383 seekerror = True
384
384
385 # Small hack to let caller code distinguish exceptions from bundle2
385 # Small hack to let caller code distinguish exceptions from bundle2
386 # processing from processing the old format. This is mostly needed
386 # processing from processing the old format. This is mostly needed
387 # to handle different return codes to unbundle according to the type
387 # to handle different return codes to unbundle according to the type
388 # of bundle. We should probably clean up or drop this return code
388 # of bundle. We should probably clean up or drop this return code
389 # craziness in a future version.
389 # craziness in a future version.
390 exc.duringunbundle2 = True
390 exc.duringunbundle2 = True
391 salvaged = []
391 salvaged = []
392 replycaps = None
392 replycaps = None
393 if self.op.reply is not None:
393 if self.op.reply is not None:
394 salvaged = self.op.reply.salvageoutput()
394 salvaged = self.op.reply.salvageoutput()
395 replycaps = self.op.reply.capabilities
395 replycaps = self.op.reply.capabilities
396 exc._replycaps = replycaps
396 exc._replycaps = replycaps
397 exc._bundle2salvagedoutput = salvaged
397 exc._bundle2salvagedoutput = salvaged
398
398
399 # Re-raising from a variable loses the original stack. So only use
399 # Re-raising from a variable loses the original stack. So only use
400 # that form if we need to.
400 # that form if we need to.
401 if seekerror:
401 if seekerror:
402 raise exc
402 raise exc
403
403
404 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
404 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
405 self.count)
405 self.count)
406
406
407 def processbundle(repo, unbundler, transactiongetter=None, op=None):
407 def processbundle(repo, unbundler, transactiongetter=None, op=None):
408 """This function process a bundle, apply effect to/from a repo
408 """This function process a bundle, apply effect to/from a repo
409
409
410 It iterates over each part then searches for and uses the proper handling
410 It iterates over each part then searches for and uses the proper handling
411 code to process the part. Parts are processed in order.
411 code to process the part. Parts are processed in order.
412
412
413 Unknown Mandatory part will abort the process.
413 Unknown Mandatory part will abort the process.
414
414
415 It is temporarily possible to provide a prebuilt bundleoperation to the
415 It is temporarily possible to provide a prebuilt bundleoperation to the
416 function. This is used to ensure output is properly propagated in case of
416 function. This is used to ensure output is properly propagated in case of
417 an error during the unbundling. This output capturing part will likely be
417 an error during the unbundling. This output capturing part will likely be
418 reworked and this ability will probably go away in the process.
418 reworked and this ability will probably go away in the process.
419 """
419 """
420 if op is None:
420 if op is None:
421 if transactiongetter is None:
421 if transactiongetter is None:
422 transactiongetter = _notransaction
422 transactiongetter = _notransaction
423 op = bundleoperation(repo, transactiongetter)
423 op = bundleoperation(repo, transactiongetter)
424 # todo:
424 # todo:
425 # - replace this is a init function soon.
425 # - replace this is a init function soon.
426 # - exception catching
426 # - exception catching
427 unbundler.params
427 unbundler.params
428 if repo.ui.debugflag:
428 if repo.ui.debugflag:
429 msg = ['bundle2-input-bundle:']
429 msg = ['bundle2-input-bundle:']
430 if unbundler.params:
430 if unbundler.params:
431 msg.append(' %i params' % len(unbundler.params))
431 msg.append(' %i params' % len(unbundler.params))
432 if op._gettransaction is None or op._gettransaction is _notransaction:
432 if op._gettransaction is None or op._gettransaction is _notransaction:
433 msg.append(' no-transaction')
433 msg.append(' no-transaction')
434 else:
434 else:
435 msg.append(' with-transaction')
435 msg.append(' with-transaction')
436 msg.append('\n')
436 msg.append('\n')
437 repo.ui.debug(''.join(msg))
437 repo.ui.debug(''.join(msg))
438
438
439 with partiterator(repo, op, unbundler) as parts:
439 with partiterator(repo, op, unbundler) as parts:
440 for part in parts:
440 for part in parts:
441 _processpart(op, part)
441 _processpart(op, part)
442
442
443 return op
443 return op
444
444
445 def _processchangegroup(op, cg, tr, source, url, **kwargs):
445 def _processchangegroup(op, cg, tr, source, url, **kwargs):
446 ret = cg.apply(op.repo, tr, source, url, **kwargs)
446 ret = cg.apply(op.repo, tr, source, url, **kwargs)
447 op.records.add('changegroup', {
447 op.records.add('changegroup', {
448 'return': ret,
448 'return': ret,
449 })
449 })
450 return ret
450 return ret
451
451
452 def _processpart(op, part):
452 def _processpart(op, part):
453 """process a single part from a bundle
453 """process a single part from a bundle
454
454
455 The part is guaranteed to have been fully consumed when the function exits
455 The part is guaranteed to have been fully consumed when the function exits
456 (even if an exception is raised)."""
456 (even if an exception is raised)."""
457 status = 'unknown' # used by debug output
457 status = 'unknown' # used by debug output
458 hardabort = False
458 hardabort = False
459 try:
459 try:
460 try:
460 try:
461 handler = parthandlermapping.get(part.type)
461 handler = parthandlermapping.get(part.type)
462 if handler is None:
462 if handler is None:
463 status = 'unsupported-type'
463 status = 'unsupported-type'
464 raise error.BundleUnknownFeatureError(parttype=part.type)
464 raise error.BundleUnknownFeatureError(parttype=part.type)
465 indebug(op.ui, 'found a handler for part %r' % part.type)
465 indebug(op.ui, 'found a handler for part %r' % part.type)
466 unknownparams = part.mandatorykeys - handler.params
466 unknownparams = part.mandatorykeys - handler.params
467 if unknownparams:
467 if unknownparams:
468 unknownparams = list(unknownparams)
468 unknownparams = list(unknownparams)
469 unknownparams.sort()
469 unknownparams.sort()
470 status = 'unsupported-params (%s)' % unknownparams
470 status = 'unsupported-params (%s)' % unknownparams
471 raise error.BundleUnknownFeatureError(parttype=part.type,
471 raise error.BundleUnknownFeatureError(parttype=part.type,
472 params=unknownparams)
472 params=unknownparams)
473 status = 'supported'
473 status = 'supported'
474 except error.BundleUnknownFeatureError as exc:
474 except error.BundleUnknownFeatureError as exc:
475 if part.mandatory: # mandatory parts
475 if part.mandatory: # mandatory parts
476 raise
476 raise
477 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
477 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
478 return # skip to part processing
478 return # skip to part processing
479 finally:
479 finally:
480 if op.ui.debugflag:
480 if op.ui.debugflag:
481 msg = ['bundle2-input-part: "%s"' % part.type]
481 msg = ['bundle2-input-part: "%s"' % part.type]
482 if not part.mandatory:
482 if not part.mandatory:
483 msg.append(' (advisory)')
483 msg.append(' (advisory)')
484 nbmp = len(part.mandatorykeys)
484 nbmp = len(part.mandatorykeys)
485 nbap = len(part.params) - nbmp
485 nbap = len(part.params) - nbmp
486 if nbmp or nbap:
486 if nbmp or nbap:
487 msg.append(' (params:')
487 msg.append(' (params:')
488 if nbmp:
488 if nbmp:
489 msg.append(' %i mandatory' % nbmp)
489 msg.append(' %i mandatory' % nbmp)
490 if nbap:
490 if nbap:
491 msg.append(' %i advisory' % nbmp)
491 msg.append(' %i advisory' % nbmp)
492 msg.append(')')
492 msg.append(')')
493 msg.append(' %s\n' % status)
493 msg.append(' %s\n' % status)
494 op.ui.debug(''.join(msg))
494 op.ui.debug(''.join(msg))
495
495
496 # handler is called outside the above try block so that we don't
496 # handler is called outside the above try block so that we don't
497 # risk catching KeyErrors from anything other than the
497 # risk catching KeyErrors from anything other than the
498 # parthandlermapping lookup (any KeyError raised by handler()
498 # parthandlermapping lookup (any KeyError raised by handler()
499 # itself represents a defect of a different variety).
499 # itself represents a defect of a different variety).
500 output = None
500 output = None
501 if op.captureoutput and op.reply is not None:
501 if op.captureoutput and op.reply is not None:
502 op.ui.pushbuffer(error=True, subproc=True)
502 op.ui.pushbuffer(error=True, subproc=True)
503 output = ''
503 output = ''
504 try:
504 try:
505 handler(op, part)
505 handler(op, part)
506 finally:
506 finally:
507 if output is not None:
507 if output is not None:
508 output = op.ui.popbuffer()
508 output = op.ui.popbuffer()
509 if output:
509 if output:
510 outpart = op.reply.newpart('output', data=output,
510 outpart = op.reply.newpart('output', data=output,
511 mandatory=False)
511 mandatory=False)
512 outpart.addparam(
512 outpart.addparam(
513 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
513 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
514 # If exiting or interrupted, do not attempt to seek the stream in the
514 # If exiting or interrupted, do not attempt to seek the stream in the
515 # finally block below. This makes abort faster.
515 # finally block below. This makes abort faster.
516 except (SystemExit, KeyboardInterrupt):
516 except (SystemExit, KeyboardInterrupt):
517 hardabort = True
517 hardabort = True
518 raise
518 raise
519 finally:
519 finally:
520 # consume the part content to not corrupt the stream.
520 # consume the part content to not corrupt the stream.
521 if not hardabort:
521 if not hardabort:
522 part.seek(0, 2)
522 part.seek(0, 2)
523
523
524
524
525 def decodecaps(blob):
525 def decodecaps(blob):
526 """decode a bundle2 caps bytes blob into a dictionary
526 """decode a bundle2 caps bytes blob into a dictionary
527
527
528 The blob is a list of capabilities (one per line)
528 The blob is a list of capabilities (one per line)
529 Capabilities may have values using a line of the form::
529 Capabilities may have values using a line of the form::
530
530
531 capability=value1,value2,value3
531 capability=value1,value2,value3
532
532
533 The values are always a list."""
533 The values are always a list."""
534 caps = {}
534 caps = {}
535 for line in blob.splitlines():
535 for line in blob.splitlines():
536 if not line:
536 if not line:
537 continue
537 continue
538 if '=' not in line:
538 if '=' not in line:
539 key, vals = line, ()
539 key, vals = line, ()
540 else:
540 else:
541 key, vals = line.split('=', 1)
541 key, vals = line.split('=', 1)
542 vals = vals.split(',')
542 vals = vals.split(',')
543 key = urlreq.unquote(key)
543 key = urlreq.unquote(key)
544 vals = [urlreq.unquote(v) for v in vals]
544 vals = [urlreq.unquote(v) for v in vals]
545 caps[key] = vals
545 caps[key] = vals
546 return caps
546 return caps
547
547
548 def encodecaps(caps):
548 def encodecaps(caps):
549 """encode a bundle2 caps dictionary into a bytes blob"""
549 """encode a bundle2 caps dictionary into a bytes blob"""
550 chunks = []
550 chunks = []
551 for ca in sorted(caps):
551 for ca in sorted(caps):
552 vals = caps[ca]
552 vals = caps[ca]
553 ca = urlreq.quote(ca)
553 ca = urlreq.quote(ca)
554 vals = [urlreq.quote(v) for v in vals]
554 vals = [urlreq.quote(v) for v in vals]
555 if vals:
555 if vals:
556 ca = "%s=%s" % (ca, ','.join(vals))
556 ca = "%s=%s" % (ca, ','.join(vals))
557 chunks.append(ca)
557 chunks.append(ca)
558 return '\n'.join(chunks)
558 return '\n'.join(chunks)
559
559
560 bundletypes = {
560 bundletypes = {
561 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
561 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
562 # since the unification ssh accepts a header but there
562 # since the unification ssh accepts a header but there
563 # is no capability signaling it.
563 # is no capability signaling it.
564 "HG20": (), # special-cased below
564 "HG20": (), # special-cased below
565 "HG10UN": ("HG10UN", 'UN'),
565 "HG10UN": ("HG10UN", 'UN'),
566 "HG10BZ": ("HG10", 'BZ'),
566 "HG10BZ": ("HG10", 'BZ'),
567 "HG10GZ": ("HG10GZ", 'GZ'),
567 "HG10GZ": ("HG10GZ", 'GZ'),
568 }
568 }
569
569
570 # hgweb uses this list to communicate its preferred type
570 # hgweb uses this list to communicate its preferred type
571 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
571 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
572
572
573 class bundle20(object):
573 class bundle20(object):
574 """represent an outgoing bundle2 container
574 """represent an outgoing bundle2 container
575
575
576 Use the `addparam` method to add stream level parameter. and `newpart` to
576 Use the `addparam` method to add stream level parameter. and `newpart` to
577 populate it. Then call `getchunks` to retrieve all the binary chunks of
577 populate it. Then call `getchunks` to retrieve all the binary chunks of
578 data that compose the bundle2 container."""
578 data that compose the bundle2 container."""
579
579
580 _magicstring = 'HG20'
580 _magicstring = 'HG20'
581
581
582 def __init__(self, ui, capabilities=()):
582 def __init__(self, ui, capabilities=()):
583 self.ui = ui
583 self.ui = ui
584 self._params = []
584 self._params = []
585 self._parts = []
585 self._parts = []
586 self.capabilities = dict(capabilities)
586 self.capabilities = dict(capabilities)
587 self._compengine = util.compengines.forbundletype('UN')
587 self._compengine = util.compengines.forbundletype('UN')
588 self._compopts = None
588 self._compopts = None
589
589
590 def setcompression(self, alg, compopts=None):
590 def setcompression(self, alg, compopts=None):
591 """setup core part compression to <alg>"""
591 """setup core part compression to <alg>"""
592 if alg in (None, 'UN'):
592 if alg in (None, 'UN'):
593 return
593 return
594 assert not any(n.lower() == 'compression' for n, v in self._params)
594 assert not any(n.lower() == 'compression' for n, v in self._params)
595 self.addparam('Compression', alg)
595 self.addparam('Compression', alg)
596 self._compengine = util.compengines.forbundletype(alg)
596 self._compengine = util.compengines.forbundletype(alg)
597 self._compopts = compopts
597 self._compopts = compopts
598
598
599 @property
599 @property
600 def nbparts(self):
600 def nbparts(self):
601 """total number of parts added to the bundler"""
601 """total number of parts added to the bundler"""
602 return len(self._parts)
602 return len(self._parts)
603
603
604 # methods used to defines the bundle2 content
604 # methods used to defines the bundle2 content
605 def addparam(self, name, value=None):
605 def addparam(self, name, value=None):
606 """add a stream level parameter"""
606 """add a stream level parameter"""
607 if not name:
607 if not name:
608 raise ValueError('empty parameter name')
608 raise ValueError('empty parameter name')
609 if name[0] not in pycompat.bytestr(string.ascii_letters):
609 if name[0] not in pycompat.bytestr(string.ascii_letters):
610 raise ValueError('non letter first character: %r' % name)
610 raise ValueError('non letter first character: %r' % name)
611 self._params.append((name, value))
611 self._params.append((name, value))
612
612
613 def addpart(self, part):
613 def addpart(self, part):
614 """add a new part to the bundle2 container
614 """add a new part to the bundle2 container
615
615
616 Parts contains the actual applicative payload."""
616 Parts contains the actual applicative payload."""
617 assert part.id is None
617 assert part.id is None
618 part.id = len(self._parts) # very cheap counter
618 part.id = len(self._parts) # very cheap counter
619 self._parts.append(part)
619 self._parts.append(part)
620
620
621 def newpart(self, typeid, *args, **kwargs):
621 def newpart(self, typeid, *args, **kwargs):
622 """create a new part and add it to the containers
622 """create a new part and add it to the containers
623
623
624 As the part is directly added to the containers. For now, this means
624 As the part is directly added to the containers. For now, this means
625 that any failure to properly initialize the part after calling
625 that any failure to properly initialize the part after calling
626 ``newpart`` should result in a failure of the whole bundling process.
626 ``newpart`` should result in a failure of the whole bundling process.
627
627
628 You can still fall back to manually create and add if you need better
628 You can still fall back to manually create and add if you need better
629 control."""
629 control."""
630 part = bundlepart(typeid, *args, **kwargs)
630 part = bundlepart(typeid, *args, **kwargs)
631 self.addpart(part)
631 self.addpart(part)
632 return part
632 return part
633
633
634 # methods used to generate the bundle2 stream
634 # methods used to generate the bundle2 stream
635 def getchunks(self):
635 def getchunks(self):
636 if self.ui.debugflag:
636 if self.ui.debugflag:
637 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
637 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
638 if self._params:
638 if self._params:
639 msg.append(' (%i params)' % len(self._params))
639 msg.append(' (%i params)' % len(self._params))
640 msg.append(' %i parts total\n' % len(self._parts))
640 msg.append(' %i parts total\n' % len(self._parts))
641 self.ui.debug(''.join(msg))
641 self.ui.debug(''.join(msg))
642 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
642 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
643 yield self._magicstring
643 yield self._magicstring
644 param = self._paramchunk()
644 param = self._paramchunk()
645 outdebug(self.ui, 'bundle parameter: %s' % param)
645 outdebug(self.ui, 'bundle parameter: %s' % param)
646 yield _pack(_fstreamparamsize, len(param))
646 yield _pack(_fstreamparamsize, len(param))
647 if param:
647 if param:
648 yield param
648 yield param
649 for chunk in self._compengine.compressstream(self._getcorechunk(),
649 for chunk in self._compengine.compressstream(self._getcorechunk(),
650 self._compopts):
650 self._compopts):
651 yield chunk
651 yield chunk
652
652
653 def _paramchunk(self):
653 def _paramchunk(self):
654 """return a encoded version of all stream parameters"""
654 """return a encoded version of all stream parameters"""
655 blocks = []
655 blocks = []
656 for par, value in self._params:
656 for par, value in self._params:
657 par = urlreq.quote(par)
657 par = urlreq.quote(par)
658 if value is not None:
658 if value is not None:
659 value = urlreq.quote(value)
659 value = urlreq.quote(value)
660 par = '%s=%s' % (par, value)
660 par = '%s=%s' % (par, value)
661 blocks.append(par)
661 blocks.append(par)
662 return ' '.join(blocks)
662 return ' '.join(blocks)
663
663
664 def _getcorechunk(self):
664 def _getcorechunk(self):
665 """yield chunk for the core part of the bundle
665 """yield chunk for the core part of the bundle
666
666
667 (all but headers and parameters)"""
667 (all but headers and parameters)"""
668 outdebug(self.ui, 'start of parts')
668 outdebug(self.ui, 'start of parts')
669 for part in self._parts:
669 for part in self._parts:
670 outdebug(self.ui, 'bundle part: "%s"' % part.type)
670 outdebug(self.ui, 'bundle part: "%s"' % part.type)
671 for chunk in part.getchunks(ui=self.ui):
671 for chunk in part.getchunks(ui=self.ui):
672 yield chunk
672 yield chunk
673 outdebug(self.ui, 'end of bundle')
673 outdebug(self.ui, 'end of bundle')
674 yield _pack(_fpartheadersize, 0)
674 yield _pack(_fpartheadersize, 0)
675
675
676
676
677 def salvageoutput(self):
677 def salvageoutput(self):
678 """return a list with a copy of all output parts in the bundle
678 """return a list with a copy of all output parts in the bundle
679
679
680 This is meant to be used during error handling to make sure we preserve
680 This is meant to be used during error handling to make sure we preserve
681 server output"""
681 server output"""
682 salvaged = []
682 salvaged = []
683 for part in self._parts:
683 for part in self._parts:
684 if part.type.startswith('output'):
684 if part.type.startswith('output'):
685 salvaged.append(part.copy())
685 salvaged.append(part.copy())
686 return salvaged
686 return salvaged
687
687
688
688
689 class unpackermixin(object):
689 class unpackermixin(object):
690 """A mixin to extract bytes and struct data from a stream"""
690 """A mixin to extract bytes and struct data from a stream"""
691
691
692 def __init__(self, fp):
692 def __init__(self, fp):
693 self._fp = fp
693 self._fp = fp
694
694
695 def _unpack(self, format):
695 def _unpack(self, format):
696 """unpack this struct format from the stream
696 """unpack this struct format from the stream
697
697
698 This method is meant for internal usage by the bundle2 protocol only.
698 This method is meant for internal usage by the bundle2 protocol only.
699 They directly manipulate the low level stream including bundle2 level
699 They directly manipulate the low level stream including bundle2 level
700 instruction.
700 instruction.
701
701
702 Do not use it to implement higher-level logic or methods."""
702 Do not use it to implement higher-level logic or methods."""
703 data = self._readexact(struct.calcsize(format))
703 data = self._readexact(struct.calcsize(format))
704 return _unpack(format, data)
704 return _unpack(format, data)
705
705
706 def _readexact(self, size):
706 def _readexact(self, size):
707 """read exactly <size> bytes from the stream
707 """read exactly <size> bytes from the stream
708
708
709 This method is meant for internal usage by the bundle2 protocol only.
709 This method is meant for internal usage by the bundle2 protocol only.
710 They directly manipulate the low level stream including bundle2 level
710 They directly manipulate the low level stream including bundle2 level
711 instruction.
711 instruction.
712
712
713 Do not use it to implement higher-level logic or methods."""
713 Do not use it to implement higher-level logic or methods."""
714 return changegroup.readexactly(self._fp, size)
714 return changegroup.readexactly(self._fp, size)
715
715
716 def getunbundler(ui, fp, magicstring=None):
716 def getunbundler(ui, fp, magicstring=None):
717 """return a valid unbundler object for a given magicstring"""
717 """return a valid unbundler object for a given magicstring"""
718 if magicstring is None:
718 if magicstring is None:
719 magicstring = changegroup.readexactly(fp, 4)
719 magicstring = changegroup.readexactly(fp, 4)
720 magic, version = magicstring[0:2], magicstring[2:4]
720 magic, version = magicstring[0:2], magicstring[2:4]
721 if magic != 'HG':
721 if magic != 'HG':
722 ui.debug(
722 ui.debug(
723 "error: invalid magic: %r (version %r), should be 'HG'\n"
723 "error: invalid magic: %r (version %r), should be 'HG'\n"
724 % (magic, version))
724 % (magic, version))
725 raise error.Abort(_('not a Mercurial bundle'))
725 raise error.Abort(_('not a Mercurial bundle'))
726 unbundlerclass = formatmap.get(version)
726 unbundlerclass = formatmap.get(version)
727 if unbundlerclass is None:
727 if unbundlerclass is None:
728 raise error.Abort(_('unknown bundle version %s') % version)
728 raise error.Abort(_('unknown bundle version %s') % version)
729 unbundler = unbundlerclass(ui, fp)
729 unbundler = unbundlerclass(ui, fp)
730 indebug(ui, 'start processing of %s stream' % magicstring)
730 indebug(ui, 'start processing of %s stream' % magicstring)
731 return unbundler
731 return unbundler
732
732
733 class unbundle20(unpackermixin):
733 class unbundle20(unpackermixin):
734 """interpret a bundle2 stream
734 """interpret a bundle2 stream
735
735
736 This class is fed with a binary stream and yields parts through its
736 This class is fed with a binary stream and yields parts through its
737 `iterparts` methods."""
737 `iterparts` methods."""
738
738
739 _magicstring = 'HG20'
739 _magicstring = 'HG20'
740
740
741 def __init__(self, ui, fp):
741 def __init__(self, ui, fp):
742 """If header is specified, we do not read it out of the stream."""
742 """If header is specified, we do not read it out of the stream."""
743 self.ui = ui
743 self.ui = ui
744 self._compengine = util.compengines.forbundletype('UN')
744 self._compengine = util.compengines.forbundletype('UN')
745 self._compressed = None
745 self._compressed = None
746 super(unbundle20, self).__init__(fp)
746 super(unbundle20, self).__init__(fp)
747
747
748 @util.propertycache
748 @util.propertycache
749 def params(self):
749 def params(self):
750 """dictionary of stream level parameters"""
750 """dictionary of stream level parameters"""
751 indebug(self.ui, 'reading bundle2 stream parameters')
751 indebug(self.ui, 'reading bundle2 stream parameters')
752 params = {}
752 params = {}
753 paramssize = self._unpack(_fstreamparamsize)[0]
753 paramssize = self._unpack(_fstreamparamsize)[0]
754 if paramssize < 0:
754 if paramssize < 0:
755 raise error.BundleValueError('negative bundle param size: %i'
755 raise error.BundleValueError('negative bundle param size: %i'
756 % paramssize)
756 % paramssize)
757 if paramssize:
757 if paramssize:
758 params = self._readexact(paramssize)
758 params = self._readexact(paramssize)
759 params = self._processallparams(params)
759 params = self._processallparams(params)
760 return params
760 return params
761
761
762 def _processallparams(self, paramsblock):
762 def _processallparams(self, paramsblock):
763 """"""
763 """"""
764 params = util.sortdict()
764 params = util.sortdict()
765 for p in paramsblock.split(' '):
765 for p in paramsblock.split(' '):
766 p = p.split('=', 1)
766 p = p.split('=', 1)
767 p = [urlreq.unquote(i) for i in p]
767 p = [urlreq.unquote(i) for i in p]
768 if len(p) < 2:
768 if len(p) < 2:
769 p.append(None)
769 p.append(None)
770 self._processparam(*p)
770 self._processparam(*p)
771 params[p[0]] = p[1]
771 params[p[0]] = p[1]
772 return params
772 return params
773
773
774
774
775 def _processparam(self, name, value):
775 def _processparam(self, name, value):
776 """process a parameter, applying its effect if needed
776 """process a parameter, applying its effect if needed
777
777
778 Parameter starting with a lower case letter are advisory and will be
778 Parameter starting with a lower case letter are advisory and will be
779 ignored when unknown. Those starting with an upper case letter are
779 ignored when unknown. Those starting with an upper case letter are
780 mandatory and will this function will raise a KeyError when unknown.
780 mandatory and will this function will raise a KeyError when unknown.
781
781
782 Note: no option are currently supported. Any input will be either
782 Note: no option are currently supported. Any input will be either
783 ignored or failing.
783 ignored or failing.
784 """
784 """
785 if not name:
785 if not name:
786 raise ValueError('empty parameter name')
786 raise ValueError('empty parameter name')
787 if name[0] not in pycompat.bytestr(string.ascii_letters):
787 if name[0] not in pycompat.bytestr(string.ascii_letters):
788 raise ValueError('non letter first character: %r' % name)
788 raise ValueError('non letter first character: %r' % name)
789 try:
789 try:
790 handler = b2streamparamsmap[name.lower()]
790 handler = b2streamparamsmap[name.lower()]
791 except KeyError:
791 except KeyError:
792 if name[0].islower():
792 if name[0].islower():
793 indebug(self.ui, "ignoring unknown parameter %r" % name)
793 indebug(self.ui, "ignoring unknown parameter %r" % name)
794 else:
794 else:
795 raise error.BundleUnknownFeatureError(params=(name,))
795 raise error.BundleUnknownFeatureError(params=(name,))
796 else:
796 else:
797 handler(self, name, value)
797 handler(self, name, value)
798
798
799 def _forwardchunks(self):
799 def _forwardchunks(self):
800 """utility to transfer a bundle2 as binary
800 """utility to transfer a bundle2 as binary
801
801
802 This is made necessary by the fact the 'getbundle' command over 'ssh'
802 This is made necessary by the fact the 'getbundle' command over 'ssh'
803 have no way to know then the reply end, relying on the bundle to be
803 have no way to know then the reply end, relying on the bundle to be
804 interpreted to know its end. This is terrible and we are sorry, but we
804 interpreted to know its end. This is terrible and we are sorry, but we
805 needed to move forward to get general delta enabled.
805 needed to move forward to get general delta enabled.
806 """
806 """
807 yield self._magicstring
807 yield self._magicstring
808 assert 'params' not in vars(self)
808 assert 'params' not in vars(self)
809 paramssize = self._unpack(_fstreamparamsize)[0]
809 paramssize = self._unpack(_fstreamparamsize)[0]
810 if paramssize < 0:
810 if paramssize < 0:
811 raise error.BundleValueError('negative bundle param size: %i'
811 raise error.BundleValueError('negative bundle param size: %i'
812 % paramssize)
812 % paramssize)
813 yield _pack(_fstreamparamsize, paramssize)
813 yield _pack(_fstreamparamsize, paramssize)
814 if paramssize:
814 if paramssize:
815 params = self._readexact(paramssize)
815 params = self._readexact(paramssize)
816 self._processallparams(params)
816 self._processallparams(params)
817 yield params
817 yield params
818 assert self._compengine.bundletype == 'UN'
818 assert self._compengine.bundletype == 'UN'
819 # From there, payload might need to be decompressed
819 # From there, payload might need to be decompressed
820 self._fp = self._compengine.decompressorreader(self._fp)
820 self._fp = self._compengine.decompressorreader(self._fp)
821 emptycount = 0
821 emptycount = 0
822 while emptycount < 2:
822 while emptycount < 2:
823 # so we can brainlessly loop
823 # so we can brainlessly loop
824 assert _fpartheadersize == _fpayloadsize
824 assert _fpartheadersize == _fpayloadsize
825 size = self._unpack(_fpartheadersize)[0]
825 size = self._unpack(_fpartheadersize)[0]
826 yield _pack(_fpartheadersize, size)
826 yield _pack(_fpartheadersize, size)
827 if size:
827 if size:
828 emptycount = 0
828 emptycount = 0
829 else:
829 else:
830 emptycount += 1
830 emptycount += 1
831 continue
831 continue
832 if size == flaginterrupt:
832 if size == flaginterrupt:
833 continue
833 continue
834 elif size < 0:
834 elif size < 0:
835 raise error.BundleValueError('negative chunk size: %i')
835 raise error.BundleValueError('negative chunk size: %i')
836 yield self._readexact(size)
836 yield self._readexact(size)
837
837
838
838
839 def iterparts(self):
839 def iterparts(self):
840 """yield all parts contained in the stream"""
840 """yield all parts contained in the stream"""
841 # make sure param have been loaded
841 # make sure param have been loaded
842 self.params
842 self.params
843 # From there, payload need to be decompressed
843 # From there, payload need to be decompressed
844 self._fp = self._compengine.decompressorreader(self._fp)
844 self._fp = self._compengine.decompressorreader(self._fp)
845 indebug(self.ui, 'start extraction of bundle2 parts')
845 indebug(self.ui, 'start extraction of bundle2 parts')
846 headerblock = self._readpartheader()
846 headerblock = self._readpartheader()
847 while headerblock is not None:
847 while headerblock is not None:
848 part = unbundlepart(self.ui, headerblock, self._fp)
848 part = unbundlepart(self.ui, headerblock, self._fp)
849 yield part
849 yield part
850 # Seek to the end of the part to force it's consumption so the next
850 # Seek to the end of the part to force it's consumption so the next
851 # part can be read. But then seek back to the beginning so the
851 # part can be read. But then seek back to the beginning so the
852 # code consuming this generator has a part that starts at 0.
852 # code consuming this generator has a part that starts at 0.
853 part.seek(0, 2)
853 part.seek(0, 2)
854 part.seek(0)
854 part.seek(0)
855 headerblock = self._readpartheader()
855 headerblock = self._readpartheader()
856 indebug(self.ui, 'end of bundle2 stream')
856 indebug(self.ui, 'end of bundle2 stream')
857
857
858 def _readpartheader(self):
858 def _readpartheader(self):
859 """reads a part header size and return the bytes blob
859 """reads a part header size and return the bytes blob
860
860
861 returns None if empty"""
861 returns None if empty"""
862 headersize = self._unpack(_fpartheadersize)[0]
862 headersize = self._unpack(_fpartheadersize)[0]
863 if headersize < 0:
863 if headersize < 0:
864 raise error.BundleValueError('negative part header size: %i'
864 raise error.BundleValueError('negative part header size: %i'
865 % headersize)
865 % headersize)
866 indebug(self.ui, 'part header size: %i' % headersize)
866 indebug(self.ui, 'part header size: %i' % headersize)
867 if headersize:
867 if headersize:
868 return self._readexact(headersize)
868 return self._readexact(headersize)
869 return None
869 return None
870
870
871 def compressed(self):
871 def compressed(self):
872 self.params # load params
872 self.params # load params
873 return self._compressed
873 return self._compressed
874
874
875 def close(self):
875 def close(self):
876 """close underlying file"""
876 """close underlying file"""
877 if util.safehasattr(self._fp, 'close'):
877 if util.safehasattr(self._fp, 'close'):
878 return self._fp.close()
878 return self._fp.close()
879
879
880 formatmap = {'20': unbundle20}
880 formatmap = {'20': unbundle20}
881
881
882 b2streamparamsmap = {}
882 b2streamparamsmap = {}
883
883
884 def b2streamparamhandler(name):
884 def b2streamparamhandler(name):
885 """register a handler for a stream level parameter"""
885 """register a handler for a stream level parameter"""
886 def decorator(func):
886 def decorator(func):
887 assert name not in formatmap
887 assert name not in formatmap
888 b2streamparamsmap[name] = func
888 b2streamparamsmap[name] = func
889 return func
889 return func
890 return decorator
890 return decorator
891
891
892 @b2streamparamhandler('compression')
892 @b2streamparamhandler('compression')
893 def processcompression(unbundler, param, value):
893 def processcompression(unbundler, param, value):
894 """read compression parameter and install payload decompression"""
894 """read compression parameter and install payload decompression"""
895 if value not in util.compengines.supportedbundletypes:
895 if value not in util.compengines.supportedbundletypes:
896 raise error.BundleUnknownFeatureError(params=(param,),
896 raise error.BundleUnknownFeatureError(params=(param,),
897 values=(value,))
897 values=(value,))
898 unbundler._compengine = util.compengines.forbundletype(value)
898 unbundler._compengine = util.compengines.forbundletype(value)
899 if value is not None:
899 if value is not None:
900 unbundler._compressed = True
900 unbundler._compressed = True
901
901
902 class bundlepart(object):
902 class bundlepart(object):
903 """A bundle2 part contains application level payload
903 """A bundle2 part contains application level payload
904
904
905 The part `type` is used to route the part to the application level
905 The part `type` is used to route the part to the application level
906 handler.
906 handler.
907
907
908 The part payload is contained in ``part.data``. It could be raw bytes or a
908 The part payload is contained in ``part.data``. It could be raw bytes or a
909 generator of byte chunks.
909 generator of byte chunks.
910
910
911 You can add parameters to the part using the ``addparam`` method.
911 You can add parameters to the part using the ``addparam`` method.
912 Parameters can be either mandatory (default) or advisory. Remote side
912 Parameters can be either mandatory (default) or advisory. Remote side
913 should be able to safely ignore the advisory ones.
913 should be able to safely ignore the advisory ones.
914
914
915 Both data and parameters cannot be modified after the generation has begun.
915 Both data and parameters cannot be modified after the generation has begun.
916 """
916 """
917
917
918 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
918 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
919 data='', mandatory=True):
919 data='', mandatory=True):
920 validateparttype(parttype)
920 validateparttype(parttype)
921 self.id = None
921 self.id = None
922 self.type = parttype
922 self.type = parttype
923 self._data = data
923 self._data = data
924 self._mandatoryparams = list(mandatoryparams)
924 self._mandatoryparams = list(mandatoryparams)
925 self._advisoryparams = list(advisoryparams)
925 self._advisoryparams = list(advisoryparams)
926 # checking for duplicated entries
926 # checking for duplicated entries
927 self._seenparams = set()
927 self._seenparams = set()
928 for pname, __ in self._mandatoryparams + self._advisoryparams:
928 for pname, __ in self._mandatoryparams + self._advisoryparams:
929 if pname in self._seenparams:
929 if pname in self._seenparams:
930 raise error.ProgrammingError('duplicated params: %s' % pname)
930 raise error.ProgrammingError('duplicated params: %s' % pname)
931 self._seenparams.add(pname)
931 self._seenparams.add(pname)
932 # status of the part's generation:
932 # status of the part's generation:
933 # - None: not started,
933 # - None: not started,
934 # - False: currently generated,
934 # - False: currently generated,
935 # - True: generation done.
935 # - True: generation done.
936 self._generated = None
936 self._generated = None
937 self.mandatory = mandatory
937 self.mandatory = mandatory
938
938
939 def __repr__(self):
939 def __repr__(self):
940 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
940 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
941 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
941 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
942 % (cls, id(self), self.id, self.type, self.mandatory))
942 % (cls, id(self), self.id, self.type, self.mandatory))
943
943
944 def copy(self):
944 def copy(self):
945 """return a copy of the part
945 """return a copy of the part
946
946
947 The new part have the very same content but no partid assigned yet.
947 The new part have the very same content but no partid assigned yet.
948 Parts with generated data cannot be copied."""
948 Parts with generated data cannot be copied."""
949 assert not util.safehasattr(self.data, 'next')
949 assert not util.safehasattr(self.data, 'next')
950 return self.__class__(self.type, self._mandatoryparams,
950 return self.__class__(self.type, self._mandatoryparams,
951 self._advisoryparams, self._data, self.mandatory)
951 self._advisoryparams, self._data, self.mandatory)
952
952
953 # methods used to defines the part content
953 # methods used to defines the part content
954 @property
954 @property
955 def data(self):
955 def data(self):
956 return self._data
956 return self._data
957
957
958 @data.setter
958 @data.setter
959 def data(self, data):
959 def data(self, data):
960 if self._generated is not None:
960 if self._generated is not None:
961 raise error.ReadOnlyPartError('part is being generated')
961 raise error.ReadOnlyPartError('part is being generated')
962 self._data = data
962 self._data = data
963
963
964 @property
964 @property
965 def mandatoryparams(self):
965 def mandatoryparams(self):
966 # make it an immutable tuple to force people through ``addparam``
966 # make it an immutable tuple to force people through ``addparam``
967 return tuple(self._mandatoryparams)
967 return tuple(self._mandatoryparams)
968
968
969 @property
969 @property
970 def advisoryparams(self):
970 def advisoryparams(self):
971 # make it an immutable tuple to force people through ``addparam``
971 # make it an immutable tuple to force people through ``addparam``
972 return tuple(self._advisoryparams)
972 return tuple(self._advisoryparams)
973
973
974 def addparam(self, name, value='', mandatory=True):
974 def addparam(self, name, value='', mandatory=True):
975 """add a parameter to the part
975 """add a parameter to the part
976
976
977 If 'mandatory' is set to True, the remote handler must claim support
977 If 'mandatory' is set to True, the remote handler must claim support
978 for this parameter or the unbundling will be aborted.
978 for this parameter or the unbundling will be aborted.
979
979
980 The 'name' and 'value' cannot exceed 255 bytes each.
980 The 'name' and 'value' cannot exceed 255 bytes each.
981 """
981 """
982 if self._generated is not None:
982 if self._generated is not None:
983 raise error.ReadOnlyPartError('part is being generated')
983 raise error.ReadOnlyPartError('part is being generated')
984 if name in self._seenparams:
984 if name in self._seenparams:
985 raise ValueError('duplicated params: %s' % name)
985 raise ValueError('duplicated params: %s' % name)
986 self._seenparams.add(name)
986 self._seenparams.add(name)
987 params = self._advisoryparams
987 params = self._advisoryparams
988 if mandatory:
988 if mandatory:
989 params = self._mandatoryparams
989 params = self._mandatoryparams
990 params.append((name, value))
990 params.append((name, value))
991
991
992 # methods used to generates the bundle2 stream
992 # methods used to generates the bundle2 stream
993 def getchunks(self, ui):
993 def getchunks(self, ui):
994 if self._generated is not None:
994 if self._generated is not None:
995 raise error.ProgrammingError('part can only be consumed once')
995 raise error.ProgrammingError('part can only be consumed once')
996 self._generated = False
996 self._generated = False
997
997
998 if ui.debugflag:
998 if ui.debugflag:
999 msg = ['bundle2-output-part: "%s"' % self.type]
999 msg = ['bundle2-output-part: "%s"' % self.type]
1000 if not self.mandatory:
1000 if not self.mandatory:
1001 msg.append(' (advisory)')
1001 msg.append(' (advisory)')
1002 nbmp = len(self.mandatoryparams)
1002 nbmp = len(self.mandatoryparams)
1003 nbap = len(self.advisoryparams)
1003 nbap = len(self.advisoryparams)
1004 if nbmp or nbap:
1004 if nbmp or nbap:
1005 msg.append(' (params:')
1005 msg.append(' (params:')
1006 if nbmp:
1006 if nbmp:
1007 msg.append(' %i mandatory' % nbmp)
1007 msg.append(' %i mandatory' % nbmp)
1008 if nbap:
1008 if nbap:
1009 msg.append(' %i advisory' % nbmp)
1009 msg.append(' %i advisory' % nbmp)
1010 msg.append(')')
1010 msg.append(')')
1011 if not self.data:
1011 if not self.data:
1012 msg.append(' empty payload')
1012 msg.append(' empty payload')
1013 elif util.safehasattr(self.data, 'next'):
1013 elif util.safehasattr(self.data, 'next'):
1014 msg.append(' streamed payload')
1014 msg.append(' streamed payload')
1015 else:
1015 else:
1016 msg.append(' %i bytes payload' % len(self.data))
1016 msg.append(' %i bytes payload' % len(self.data))
1017 msg.append('\n')
1017 msg.append('\n')
1018 ui.debug(''.join(msg))
1018 ui.debug(''.join(msg))
1019
1019
1020 #### header
1020 #### header
1021 if self.mandatory:
1021 if self.mandatory:
1022 parttype = self.type.upper()
1022 parttype = self.type.upper()
1023 else:
1023 else:
1024 parttype = self.type.lower()
1024 parttype = self.type.lower()
1025 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1025 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1026 ## parttype
1026 ## parttype
1027 header = [_pack(_fparttypesize, len(parttype)),
1027 header = [_pack(_fparttypesize, len(parttype)),
1028 parttype, _pack(_fpartid, self.id),
1028 parttype, _pack(_fpartid, self.id),
1029 ]
1029 ]
1030 ## parameters
1030 ## parameters
1031 # count
1031 # count
1032 manpar = self.mandatoryparams
1032 manpar = self.mandatoryparams
1033 advpar = self.advisoryparams
1033 advpar = self.advisoryparams
1034 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1034 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1035 # size
1035 # size
1036 parsizes = []
1036 parsizes = []
1037 for key, value in manpar:
1037 for key, value in manpar:
1038 parsizes.append(len(key))
1038 parsizes.append(len(key))
1039 parsizes.append(len(value))
1039 parsizes.append(len(value))
1040 for key, value in advpar:
1040 for key, value in advpar:
1041 parsizes.append(len(key))
1041 parsizes.append(len(key))
1042 parsizes.append(len(value))
1042 parsizes.append(len(value))
1043 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1043 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1044 header.append(paramsizes)
1044 header.append(paramsizes)
1045 # key, value
1045 # key, value
1046 for key, value in manpar:
1046 for key, value in manpar:
1047 header.append(key)
1047 header.append(key)
1048 header.append(value)
1048 header.append(value)
1049 for key, value in advpar:
1049 for key, value in advpar:
1050 header.append(key)
1050 header.append(key)
1051 header.append(value)
1051 header.append(value)
1052 ## finalize header
1052 ## finalize header
1053 headerchunk = ''.join(header)
1053 headerchunk = ''.join(header)
1054 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1054 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1055 yield _pack(_fpartheadersize, len(headerchunk))
1055 yield _pack(_fpartheadersize, len(headerchunk))
1056 yield headerchunk
1056 yield headerchunk
1057 ## payload
1057 ## payload
1058 try:
1058 try:
1059 for chunk in self._payloadchunks():
1059 for chunk in self._payloadchunks():
1060 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1060 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1061 yield _pack(_fpayloadsize, len(chunk))
1061 yield _pack(_fpayloadsize, len(chunk))
1062 yield chunk
1062 yield chunk
1063 except GeneratorExit:
1063 except GeneratorExit:
1064 # GeneratorExit means that nobody is listening for our
1064 # GeneratorExit means that nobody is listening for our
1065 # results anyway, so just bail quickly rather than trying
1065 # results anyway, so just bail quickly rather than trying
1066 # to produce an error part.
1066 # to produce an error part.
1067 ui.debug('bundle2-generatorexit\n')
1067 ui.debug('bundle2-generatorexit\n')
1068 raise
1068 raise
1069 except BaseException as exc:
1069 except BaseException as exc:
1070 bexc = util.forcebytestr(exc)
1070 bexc = util.forcebytestr(exc)
1071 # backup exception data for later
1071 # backup exception data for later
1072 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1072 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1073 % bexc)
1073 % bexc)
1074 tb = sys.exc_info()[2]
1074 tb = sys.exc_info()[2]
1075 msg = 'unexpected error: %s' % bexc
1075 msg = 'unexpected error: %s' % bexc
1076 interpart = bundlepart('error:abort', [('message', msg)],
1076 interpart = bundlepart('error:abort', [('message', msg)],
1077 mandatory=False)
1077 mandatory=False)
1078 interpart.id = 0
1078 interpart.id = 0
1079 yield _pack(_fpayloadsize, -1)
1079 yield _pack(_fpayloadsize, -1)
1080 for chunk in interpart.getchunks(ui=ui):
1080 for chunk in interpart.getchunks(ui=ui):
1081 yield chunk
1081 yield chunk
1082 outdebug(ui, 'closing payload chunk')
1082 outdebug(ui, 'closing payload chunk')
1083 # abort current part payload
1083 # abort current part payload
1084 yield _pack(_fpayloadsize, 0)
1084 yield _pack(_fpayloadsize, 0)
1085 pycompat.raisewithtb(exc, tb)
1085 pycompat.raisewithtb(exc, tb)
1086 # end of payload
1086 # end of payload
1087 outdebug(ui, 'closing payload chunk')
1087 outdebug(ui, 'closing payload chunk')
1088 yield _pack(_fpayloadsize, 0)
1088 yield _pack(_fpayloadsize, 0)
1089 self._generated = True
1089 self._generated = True
1090
1090
1091 def _payloadchunks(self):
1091 def _payloadchunks(self):
1092 """yield chunks of a the part payload
1092 """yield chunks of a the part payload
1093
1093
1094 Exists to handle the different methods to provide data to a part."""
1094 Exists to handle the different methods to provide data to a part."""
1095 # we only support fixed size data now.
1095 # we only support fixed size data now.
1096 # This will be improved in the future.
1096 # This will be improved in the future.
1097 if (util.safehasattr(self.data, 'next')
1097 if (util.safehasattr(self.data, 'next')
1098 or util.safehasattr(self.data, '__next__')):
1098 or util.safehasattr(self.data, '__next__')):
1099 buff = util.chunkbuffer(self.data)
1099 buff = util.chunkbuffer(self.data)
1100 chunk = buff.read(preferedchunksize)
1100 chunk = buff.read(preferedchunksize)
1101 while chunk:
1101 while chunk:
1102 yield chunk
1102 yield chunk
1103 chunk = buff.read(preferedchunksize)
1103 chunk = buff.read(preferedchunksize)
1104 elif len(self.data):
1104 elif len(self.data):
1105 yield self.data
1105 yield self.data
1106
1106
1107
1107
1108 flaginterrupt = -1
1108 flaginterrupt = -1
1109
1109
1110 class interrupthandler(unpackermixin):
1110 class interrupthandler(unpackermixin):
1111 """read one part and process it with restricted capability
1111 """read one part and process it with restricted capability
1112
1112
1113 This allows to transmit exception raised on the producer size during part
1113 This allows to transmit exception raised on the producer size during part
1114 iteration while the consumer is reading a part.
1114 iteration while the consumer is reading a part.
1115
1115
1116 Part processed in this manner only have access to a ui object,"""
1116 Part processed in this manner only have access to a ui object,"""
1117
1117
1118 def __init__(self, ui, fp):
1118 def __init__(self, ui, fp):
1119 super(interrupthandler, self).__init__(fp)
1119 super(interrupthandler, self).__init__(fp)
1120 self.ui = ui
1120 self.ui = ui
1121
1121
1122 def _readpartheader(self):
1122 def _readpartheader(self):
1123 """reads a part header size and return the bytes blob
1123 """reads a part header size and return the bytes blob
1124
1124
1125 returns None if empty"""
1125 returns None if empty"""
1126 headersize = self._unpack(_fpartheadersize)[0]
1126 headersize = self._unpack(_fpartheadersize)[0]
1127 if headersize < 0:
1127 if headersize < 0:
1128 raise error.BundleValueError('negative part header size: %i'
1128 raise error.BundleValueError('negative part header size: %i'
1129 % headersize)
1129 % headersize)
1130 indebug(self.ui, 'part header size: %i\n' % headersize)
1130 indebug(self.ui, 'part header size: %i\n' % headersize)
1131 if headersize:
1131 if headersize:
1132 return self._readexact(headersize)
1132 return self._readexact(headersize)
1133 return None
1133 return None
1134
1134
1135 def __call__(self):
1135 def __call__(self):
1136
1136
1137 self.ui.debug('bundle2-input-stream-interrupt:'
1137 self.ui.debug('bundle2-input-stream-interrupt:'
1138 ' opening out of band context\n')
1138 ' opening out of band context\n')
1139 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1139 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1140 headerblock = self._readpartheader()
1140 headerblock = self._readpartheader()
1141 if headerblock is None:
1141 if headerblock is None:
1142 indebug(self.ui, 'no part found during interruption.')
1142 indebug(self.ui, 'no part found during interruption.')
1143 return
1143 return
1144 part = unbundlepart(self.ui, headerblock, self._fp)
1144 part = unbundlepart(self.ui, headerblock, self._fp)
1145 op = interruptoperation(self.ui)
1145 op = interruptoperation(self.ui)
1146 _processpart(op, part)
1146 _processpart(op, part)
1147 self.ui.debug('bundle2-input-stream-interrupt:'
1147 self.ui.debug('bundle2-input-stream-interrupt:'
1148 ' closing out of band context\n')
1148 ' closing out of band context\n')
1149
1149
1150 class interruptoperation(object):
1150 class interruptoperation(object):
1151 """A limited operation to be use by part handler during interruption
1151 """A limited operation to be use by part handler during interruption
1152
1152
1153 It only have access to an ui object.
1153 It only have access to an ui object.
1154 """
1154 """
1155
1155
1156 def __init__(self, ui):
1156 def __init__(self, ui):
1157 self.ui = ui
1157 self.ui = ui
1158 self.reply = None
1158 self.reply = None
1159 self.captureoutput = False
1159 self.captureoutput = False
1160
1160
1161 @property
1161 @property
1162 def repo(self):
1162 def repo(self):
1163 raise error.ProgrammingError('no repo access from stream interruption')
1163 raise error.ProgrammingError('no repo access from stream interruption')
1164
1164
1165 def gettransaction(self):
1165 def gettransaction(self):
1166 raise TransactionUnavailable('no repo access from stream interruption')
1166 raise TransactionUnavailable('no repo access from stream interruption')
1167
1167
1168 class unbundlepart(unpackermixin):
1168 class unbundlepart(unpackermixin):
1169 """a bundle part read from a bundle"""
1169 """a bundle part read from a bundle"""
1170
1170
1171 def __init__(self, ui, header, fp):
1171 def __init__(self, ui, header, fp):
1172 super(unbundlepart, self).__init__(fp)
1172 super(unbundlepart, self).__init__(fp)
1173 self._seekable = (util.safehasattr(fp, 'seek') and
1173 self._seekable = (util.safehasattr(fp, 'seek') and
1174 util.safehasattr(fp, 'tell'))
1174 util.safehasattr(fp, 'tell'))
1175 self.ui = ui
1175 self.ui = ui
1176 # unbundle state attr
1176 # unbundle state attr
1177 self._headerdata = header
1177 self._headerdata = header
1178 self._headeroffset = 0
1178 self._headeroffset = 0
1179 self._initialized = False
1179 self._initialized = False
1180 self.consumed = False
1180 self.consumed = False
1181 # part data
1181 # part data
1182 self.id = None
1182 self.id = None
1183 self.type = None
1183 self.type = None
1184 self.mandatoryparams = None
1184 self.mandatoryparams = None
1185 self.advisoryparams = None
1185 self.advisoryparams = None
1186 self.params = None
1186 self.params = None
1187 self.mandatorykeys = ()
1187 self.mandatorykeys = ()
1188 self._payloadstream = None
1188 self._payloadstream = None
1189 self._readheader()
1189 self._readheader()
1190 self._mandatory = None
1190 self._mandatory = None
1191 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1191 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1192 self._pos = 0
1192 self._pos = 0
1193
1193
1194 def _fromheader(self, size):
1194 def _fromheader(self, size):
1195 """return the next <size> byte from the header"""
1195 """return the next <size> byte from the header"""
1196 offset = self._headeroffset
1196 offset = self._headeroffset
1197 data = self._headerdata[offset:(offset + size)]
1197 data = self._headerdata[offset:(offset + size)]
1198 self._headeroffset = offset + size
1198 self._headeroffset = offset + size
1199 return data
1199 return data
1200
1200
1201 def _unpackheader(self, format):
1201 def _unpackheader(self, format):
1202 """read given format from header
1202 """read given format from header
1203
1203
1204 This automatically compute the size of the format to read."""
1204 This automatically compute the size of the format to read."""
1205 data = self._fromheader(struct.calcsize(format))
1205 data = self._fromheader(struct.calcsize(format))
1206 return _unpack(format, data)
1206 return _unpack(format, data)
1207
1207
1208 def _initparams(self, mandatoryparams, advisoryparams):
1208 def _initparams(self, mandatoryparams, advisoryparams):
1209 """internal function to setup all logic related parameters"""
1209 """internal function to setup all logic related parameters"""
1210 # make it read only to prevent people touching it by mistake.
1210 # make it read only to prevent people touching it by mistake.
1211 self.mandatoryparams = tuple(mandatoryparams)
1211 self.mandatoryparams = tuple(mandatoryparams)
1212 self.advisoryparams = tuple(advisoryparams)
1212 self.advisoryparams = tuple(advisoryparams)
1213 # user friendly UI
1213 # user friendly UI
1214 self.params = util.sortdict(self.mandatoryparams)
1214 self.params = util.sortdict(self.mandatoryparams)
1215 self.params.update(self.advisoryparams)
1215 self.params.update(self.advisoryparams)
1216 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1216 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1217
1217
1218 def _payloadchunks(self, chunknum=0):
1218 def _payloadchunks(self, chunknum=0):
1219 '''seek to specified chunk and start yielding data'''
1219 '''seek to specified chunk and start yielding data'''
1220 if len(self._chunkindex) == 0:
1220 if len(self._chunkindex) == 0:
1221 assert chunknum == 0, 'Must start with chunk 0'
1221 assert chunknum == 0, 'Must start with chunk 0'
1222 self._chunkindex.append((0, self._tellfp()))
1222 self._chunkindex.append((0, self._tellfp()))
1223 else:
1223 else:
1224 assert chunknum < len(self._chunkindex), \
1224 assert chunknum < len(self._chunkindex), \
1225 'Unknown chunk %d' % chunknum
1225 'Unknown chunk %d' % chunknum
1226 self._seekfp(self._chunkindex[chunknum][1])
1226 self._seekfp(self._chunkindex[chunknum][1])
1227
1227
1228 pos = self._chunkindex[chunknum][0]
1228 pos = self._chunkindex[chunknum][0]
1229 payloadsize = self._unpack(_fpayloadsize)[0]
1229 payloadsize = self._unpack(_fpayloadsize)[0]
1230 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1230 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1231 while payloadsize:
1231 while payloadsize:
1232 if payloadsize == flaginterrupt:
1232 if payloadsize == flaginterrupt:
1233 # interruption detection, the handler will now read a
1233 # interruption detection, the handler will now read a
1234 # single part and process it.
1234 # single part and process it.
1235 interrupthandler(self.ui, self._fp)()
1235 interrupthandler(self.ui, self._fp)()
1236 elif payloadsize < 0:
1236 elif payloadsize < 0:
1237 msg = 'negative payload chunk size: %i' % payloadsize
1237 msg = 'negative payload chunk size: %i' % payloadsize
1238 raise error.BundleValueError(msg)
1238 raise error.BundleValueError(msg)
1239 else:
1239 else:
1240 result = self._readexact(payloadsize)
1240 result = self._readexact(payloadsize)
1241 chunknum += 1
1241 chunknum += 1
1242 pos += payloadsize
1242 pos += payloadsize
1243 if chunknum == len(self._chunkindex):
1243 if chunknum == len(self._chunkindex):
1244 self._chunkindex.append((pos, self._tellfp()))
1244 self._chunkindex.append((pos, self._tellfp()))
1245 yield result
1245 yield result
1246 payloadsize = self._unpack(_fpayloadsize)[0]
1246 payloadsize = self._unpack(_fpayloadsize)[0]
1247 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1247 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1248
1248
1249 def _findchunk(self, pos):
1249 def _findchunk(self, pos):
1250 '''for a given payload position, return a chunk number and offset'''
1250 '''for a given payload position, return a chunk number and offset'''
1251 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1251 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1252 if ppos == pos:
1252 if ppos == pos:
1253 return chunk, 0
1253 return chunk, 0
1254 elif ppos > pos:
1254 elif ppos > pos:
1255 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1255 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1256 raise ValueError('Unknown chunk')
1256 raise ValueError('Unknown chunk')
1257
1257
1258 def _readheader(self):
1258 def _readheader(self):
1259 """read the header and setup the object"""
1259 """read the header and setup the object"""
1260 typesize = self._unpackheader(_fparttypesize)[0]
1260 typesize = self._unpackheader(_fparttypesize)[0]
1261 self.type = self._fromheader(typesize)
1261 self.type = self._fromheader(typesize)
1262 indebug(self.ui, 'part type: "%s"' % self.type)
1262 indebug(self.ui, 'part type: "%s"' % self.type)
1263 self.id = self._unpackheader(_fpartid)[0]
1263 self.id = self._unpackheader(_fpartid)[0]
1264 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1264 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1265 # extract mandatory bit from type
1265 # extract mandatory bit from type
1266 self.mandatory = (self.type != self.type.lower())
1266 self.mandatory = (self.type != self.type.lower())
1267 self.type = self.type.lower()
1267 self.type = self.type.lower()
1268 ## reading parameters
1268 ## reading parameters
1269 # param count
1269 # param count
1270 mancount, advcount = self._unpackheader(_fpartparamcount)
1270 mancount, advcount = self._unpackheader(_fpartparamcount)
1271 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1271 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1272 # param size
1272 # param size
1273 fparamsizes = _makefpartparamsizes(mancount + advcount)
1273 fparamsizes = _makefpartparamsizes(mancount + advcount)
1274 paramsizes = self._unpackheader(fparamsizes)
1274 paramsizes = self._unpackheader(fparamsizes)
1275 # make it a list of couple again
1275 # make it a list of couple again
1276 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1276 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1277 # split mandatory from advisory
1277 # split mandatory from advisory
1278 mansizes = paramsizes[:mancount]
1278 mansizes = paramsizes[:mancount]
1279 advsizes = paramsizes[mancount:]
1279 advsizes = paramsizes[mancount:]
1280 # retrieve param value
1280 # retrieve param value
1281 manparams = []
1281 manparams = []
1282 for key, value in mansizes:
1282 for key, value in mansizes:
1283 manparams.append((self._fromheader(key), self._fromheader(value)))
1283 manparams.append((self._fromheader(key), self._fromheader(value)))
1284 advparams = []
1284 advparams = []
1285 for key, value in advsizes:
1285 for key, value in advsizes:
1286 advparams.append((self._fromheader(key), self._fromheader(value)))
1286 advparams.append((self._fromheader(key), self._fromheader(value)))
1287 self._initparams(manparams, advparams)
1287 self._initparams(manparams, advparams)
1288 ## part payload
1288 ## part payload
1289 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1289 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1290 # we read the data, tell it
1290 # we read the data, tell it
1291 self._initialized = True
1291 self._initialized = True
1292
1292
1293 def read(self, size=None):
1293 def read(self, size=None):
1294 """read payload data"""
1294 """read payload data"""
1295 if not self._initialized:
1295 if not self._initialized:
1296 self._readheader()
1296 self._readheader()
1297 if size is None:
1297 if size is None:
1298 data = self._payloadstream.read()
1298 data = self._payloadstream.read()
1299 else:
1299 else:
1300 data = self._payloadstream.read(size)
1300 data = self._payloadstream.read(size)
1301 self._pos += len(data)
1301 self._pos += len(data)
1302 if size is None or len(data) < size:
1302 if size is None or len(data) < size:
1303 if not self.consumed and self._pos:
1303 if not self.consumed and self._pos:
1304 self.ui.debug('bundle2-input-part: total payload size %i\n'
1304 self.ui.debug('bundle2-input-part: total payload size %i\n'
1305 % self._pos)
1305 % self._pos)
1306 self.consumed = True
1306 self.consumed = True
1307 return data
1307 return data
1308
1308
1309 def tell(self):
1309 def tell(self):
1310 return self._pos
1310 return self._pos
1311
1311
1312 def seek(self, offset, whence=0):
1312 def seek(self, offset, whence=0):
1313 if whence == 0:
1313 if whence == 0:
1314 newpos = offset
1314 newpos = offset
1315 elif whence == 1:
1315 elif whence == 1:
1316 newpos = self._pos + offset
1316 newpos = self._pos + offset
1317 elif whence == 2:
1317 elif whence == 2:
1318 if not self.consumed:
1318 if not self.consumed:
1319 self.read()
1319 self.read()
1320 newpos = self._chunkindex[-1][0] - offset
1320 newpos = self._chunkindex[-1][0] - offset
1321 else:
1321 else:
1322 raise ValueError('Unknown whence value: %r' % (whence,))
1322 raise ValueError('Unknown whence value: %r' % (whence,))
1323
1323
1324 if newpos > self._chunkindex[-1][0] and not self.consumed:
1324 if newpos > self._chunkindex[-1][0] and not self.consumed:
1325 self.read()
1325 self.read()
1326 if not 0 <= newpos <= self._chunkindex[-1][0]:
1326 if not 0 <= newpos <= self._chunkindex[-1][0]:
1327 raise ValueError('Offset out of range')
1327 raise ValueError('Offset out of range')
1328
1328
1329 if self._pos != newpos:
1329 if self._pos != newpos:
1330 chunk, internaloffset = self._findchunk(newpos)
1330 chunk, internaloffset = self._findchunk(newpos)
1331 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1331 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1332 adjust = self.read(internaloffset)
1332 adjust = self.read(internaloffset)
1333 if len(adjust) != internaloffset:
1333 if len(adjust) != internaloffset:
1334 raise error.Abort(_('Seek failed\n'))
1334 raise error.Abort(_('Seek failed\n'))
1335 self._pos = newpos
1335 self._pos = newpos
1336
1336
1337 def _seekfp(self, offset, whence=0):
1337 def _seekfp(self, offset, whence=0):
1338 """move the underlying file pointer
1338 """move the underlying file pointer
1339
1339
1340 This method is meant for internal usage by the bundle2 protocol only.
1340 This method is meant for internal usage by the bundle2 protocol only.
1341 They directly manipulate the low level stream including bundle2 level
1341 They directly manipulate the low level stream including bundle2 level
1342 instruction.
1342 instruction.
1343
1343
1344 Do not use it to implement higher-level logic or methods."""
1344 Do not use it to implement higher-level logic or methods."""
1345 if self._seekable:
1345 if self._seekable:
1346 return self._fp.seek(offset, whence)
1346 return self._fp.seek(offset, whence)
1347 else:
1347 else:
1348 raise NotImplementedError(_('File pointer is not seekable'))
1348 raise NotImplementedError(_('File pointer is not seekable'))
1349
1349
1350 def _tellfp(self):
1350 def _tellfp(self):
1351 """return the file offset, or None if file is not seekable
1351 """return the file offset, or None if file is not seekable
1352
1352
1353 This method is meant for internal usage by the bundle2 protocol only.
1353 This method is meant for internal usage by the bundle2 protocol only.
1354 They directly manipulate the low level stream including bundle2 level
1354 They directly manipulate the low level stream including bundle2 level
1355 instruction.
1355 instruction.
1356
1356
1357 Do not use it to implement higher-level logic or methods."""
1357 Do not use it to implement higher-level logic or methods."""
1358 if self._seekable:
1358 if self._seekable:
1359 try:
1359 try:
1360 return self._fp.tell()
1360 return self._fp.tell()
1361 except IOError as e:
1361 except IOError as e:
1362 if e.errno == errno.ESPIPE:
1362 if e.errno == errno.ESPIPE:
1363 self._seekable = False
1363 self._seekable = False
1364 else:
1364 else:
1365 raise
1365 raise
1366 return None
1366 return None
1367
1367
1368 # These are only the static capabilities.
1368 # These are only the static capabilities.
1369 # Check the 'getrepocaps' function for the rest.
1369 # Check the 'getrepocaps' function for the rest.
1370 capabilities = {'HG20': (),
1370 capabilities = {'HG20': (),
1371 'error': ('abort', 'unsupportedcontent', 'pushraced',
1371 'error': ('abort', 'unsupportedcontent', 'pushraced',
1372 'pushkey'),
1372 'pushkey'),
1373 'listkeys': (),
1373 'listkeys': (),
1374 'pushkey': (),
1374 'pushkey': (),
1375 'digests': tuple(sorted(util.DIGESTS.keys())),
1375 'digests': tuple(sorted(util.DIGESTS.keys())),
1376 'remote-changegroup': ('http', 'https'),
1376 'remote-changegroup': ('http', 'https'),
1377 'hgtagsfnodes': (),
1377 'hgtagsfnodes': (),
1378 }
1378 }
1379
1379
1380 def getrepocaps(repo, allowpushback=False):
1380 def getrepocaps(repo, allowpushback=False):
1381 """return the bundle2 capabilities for a given repo
1381 """return the bundle2 capabilities for a given repo
1382
1382
1383 Exists to allow extensions (like evolution) to mutate the capabilities.
1383 Exists to allow extensions (like evolution) to mutate the capabilities.
1384 """
1384 """
1385 caps = capabilities.copy()
1385 caps = capabilities.copy()
1386 caps['changegroup'] = tuple(sorted(
1386 caps['changegroup'] = tuple(sorted(
1387 changegroup.supportedincomingversions(repo)))
1387 changegroup.supportedincomingversions(repo)))
1388 if obsolete.isenabled(repo, obsolete.exchangeopt):
1388 if obsolete.isenabled(repo, obsolete.exchangeopt):
1389 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1389 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1390 caps['obsmarkers'] = supportedformat
1390 caps['obsmarkers'] = supportedformat
1391 if allowpushback:
1391 if allowpushback:
1392 caps['pushback'] = ()
1392 caps['pushback'] = ()
1393 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1393 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1394 if cpmode == 'check-related':
1394 if cpmode == 'check-related':
1395 caps['checkheads'] = ('related',)
1395 caps['checkheads'] = ('related',)
1396 return caps
1396 return caps
1397
1397
1398 def bundle2caps(remote):
1398 def bundle2caps(remote):
1399 """return the bundle capabilities of a peer as dict"""
1399 """return the bundle capabilities of a peer as dict"""
1400 raw = remote.capable('bundle2')
1400 raw = remote.capable('bundle2')
1401 if not raw and raw != '':
1401 if not raw and raw != '':
1402 return {}
1402 return {}
1403 capsblob = urlreq.unquote(remote.capable('bundle2'))
1403 capsblob = urlreq.unquote(remote.capable('bundle2'))
1404 return decodecaps(capsblob)
1404 return decodecaps(capsblob)
1405
1405
1406 def obsmarkersversion(caps):
1406 def obsmarkersversion(caps):
1407 """extract the list of supported obsmarkers versions from a bundle2caps dict
1407 """extract the list of supported obsmarkers versions from a bundle2caps dict
1408 """
1408 """
1409 obscaps = caps.get('obsmarkers', ())
1409 obscaps = caps.get('obsmarkers', ())
1410 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1410 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1411
1411
1412 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1412 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1413 vfs=None, compression=None, compopts=None):
1413 vfs=None, compression=None, compopts=None):
1414 if bundletype.startswith('HG10'):
1414 if bundletype.startswith('HG10'):
1415 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1415 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1416 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1416 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1417 compression=compression, compopts=compopts)
1417 compression=compression, compopts=compopts)
1418 elif not bundletype.startswith('HG20'):
1418 elif not bundletype.startswith('HG20'):
1419 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1419 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1420
1420
1421 caps = {}
1421 caps = {}
1422 if 'obsolescence' in opts:
1422 if 'obsolescence' in opts:
1423 caps['obsmarkers'] = ('V1',)
1423 caps['obsmarkers'] = ('V1',)
1424 bundle = bundle20(ui, caps)
1424 bundle = bundle20(ui, caps)
1425 bundle.setcompression(compression, compopts)
1425 bundle.setcompression(compression, compopts)
1426 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1426 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1427 chunkiter = bundle.getchunks()
1427 chunkiter = bundle.getchunks()
1428
1428
1429 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1429 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1430
1430
1431 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1431 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1432 # We should eventually reconcile this logic with the one behind
1432 # We should eventually reconcile this logic with the one behind
1433 # 'exchange.getbundle2partsgenerator'.
1433 # 'exchange.getbundle2partsgenerator'.
1434 #
1434 #
1435 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1435 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1436 # different right now. So we keep them separated for now for the sake of
1436 # different right now. So we keep them separated for now for the sake of
1437 # simplicity.
1437 # simplicity.
1438
1438
1439 # we always want a changegroup in such bundle
1439 # we always want a changegroup in such bundle
1440 cgversion = opts.get('cg.version')
1440 cgversion = opts.get('cg.version')
1441 if cgversion is None:
1441 if cgversion is None:
1442 cgversion = changegroup.safeversion(repo)
1442 cgversion = changegroup.safeversion(repo)
1443 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1443 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1444 part = bundler.newpart('changegroup', data=cg.getchunks())
1444 part = bundler.newpart('changegroup', data=cg.getchunks())
1445 part.addparam('version', cg.version)
1445 part.addparam('version', cg.version)
1446 if 'clcount' in cg.extras:
1446 if 'clcount' in cg.extras:
1447 part.addparam('nbchanges', str(cg.extras['clcount']),
1447 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1448 mandatory=False)
1448 mandatory=False)
1449 if opts.get('phases') and repo.revs('%ln and secret()',
1449 if opts.get('phases') and repo.revs('%ln and secret()',
1450 outgoing.missingheads):
1450 outgoing.missingheads):
1451 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1451 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1452
1452
1453 addparttagsfnodescache(repo, bundler, outgoing)
1453 addparttagsfnodescache(repo, bundler, outgoing)
1454
1454
1455 if opts.get('obsolescence', False):
1455 if opts.get('obsolescence', False):
1456 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1456 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1457 buildobsmarkerspart(bundler, obsmarkers)
1457 buildobsmarkerspart(bundler, obsmarkers)
1458
1458
1459 if opts.get('phases', False):
1459 if opts.get('phases', False):
1460 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1460 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1461 phasedata = []
1461 phasedata = []
1462 for phase in phases.allphases:
1462 for phase in phases.allphases:
1463 for head in headsbyphase[phase]:
1463 for head in headsbyphase[phase]:
1464 phasedata.append(_pack(_fphasesentry, phase, head))
1464 phasedata.append(_pack(_fphasesentry, phase, head))
1465 bundler.newpart('phase-heads', data=''.join(phasedata))
1465 bundler.newpart('phase-heads', data=''.join(phasedata))
1466
1466
1467 def addparttagsfnodescache(repo, bundler, outgoing):
1467 def addparttagsfnodescache(repo, bundler, outgoing):
1468 # we include the tags fnode cache for the bundle changeset
1468 # we include the tags fnode cache for the bundle changeset
1469 # (as an optional parts)
1469 # (as an optional parts)
1470 cache = tags.hgtagsfnodescache(repo.unfiltered())
1470 cache = tags.hgtagsfnodescache(repo.unfiltered())
1471 chunks = []
1471 chunks = []
1472
1472
1473 # .hgtags fnodes are only relevant for head changesets. While we could
1473 # .hgtags fnodes are only relevant for head changesets. While we could
1474 # transfer values for all known nodes, there will likely be little to
1474 # transfer values for all known nodes, there will likely be little to
1475 # no benefit.
1475 # no benefit.
1476 #
1476 #
1477 # We don't bother using a generator to produce output data because
1477 # We don't bother using a generator to produce output data because
1478 # a) we only have 40 bytes per head and even esoteric numbers of heads
1478 # a) we only have 40 bytes per head and even esoteric numbers of heads
1479 # consume little memory (1M heads is 40MB) b) we don't want to send the
1479 # consume little memory (1M heads is 40MB) b) we don't want to send the
1480 # part if we don't have entries and knowing if we have entries requires
1480 # part if we don't have entries and knowing if we have entries requires
1481 # cache lookups.
1481 # cache lookups.
1482 for node in outgoing.missingheads:
1482 for node in outgoing.missingheads:
1483 # Don't compute missing, as this may slow down serving.
1483 # Don't compute missing, as this may slow down serving.
1484 fnode = cache.getfnode(node, computemissing=False)
1484 fnode = cache.getfnode(node, computemissing=False)
1485 if fnode is not None:
1485 if fnode is not None:
1486 chunks.extend([node, fnode])
1486 chunks.extend([node, fnode])
1487
1487
1488 if chunks:
1488 if chunks:
1489 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1489 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1490
1490
1491 def buildobsmarkerspart(bundler, markers):
1491 def buildobsmarkerspart(bundler, markers):
1492 """add an obsmarker part to the bundler with <markers>
1492 """add an obsmarker part to the bundler with <markers>
1493
1493
1494 No part is created if markers is empty.
1494 No part is created if markers is empty.
1495 Raises ValueError if the bundler doesn't support any known obsmarker format.
1495 Raises ValueError if the bundler doesn't support any known obsmarker format.
1496 """
1496 """
1497 if not markers:
1497 if not markers:
1498 return None
1498 return None
1499
1499
1500 remoteversions = obsmarkersversion(bundler.capabilities)
1500 remoteversions = obsmarkersversion(bundler.capabilities)
1501 version = obsolete.commonversion(remoteversions)
1501 version = obsolete.commonversion(remoteversions)
1502 if version is None:
1502 if version is None:
1503 raise ValueError('bundler does not support common obsmarker format')
1503 raise ValueError('bundler does not support common obsmarker format')
1504 stream = obsolete.encodemarkers(markers, True, version=version)
1504 stream = obsolete.encodemarkers(markers, True, version=version)
1505 return bundler.newpart('obsmarkers', data=stream)
1505 return bundler.newpart('obsmarkers', data=stream)
1506
1506
1507 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1507 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1508 compopts=None):
1508 compopts=None):
1509 """Write a bundle file and return its filename.
1509 """Write a bundle file and return its filename.
1510
1510
1511 Existing files will not be overwritten.
1511 Existing files will not be overwritten.
1512 If no filename is specified, a temporary file is created.
1512 If no filename is specified, a temporary file is created.
1513 bz2 compression can be turned off.
1513 bz2 compression can be turned off.
1514 The bundle file will be deleted in case of errors.
1514 The bundle file will be deleted in case of errors.
1515 """
1515 """
1516
1516
1517 if bundletype == "HG20":
1517 if bundletype == "HG20":
1518 bundle = bundle20(ui)
1518 bundle = bundle20(ui)
1519 bundle.setcompression(compression, compopts)
1519 bundle.setcompression(compression, compopts)
1520 part = bundle.newpart('changegroup', data=cg.getchunks())
1520 part = bundle.newpart('changegroup', data=cg.getchunks())
1521 part.addparam('version', cg.version)
1521 part.addparam('version', cg.version)
1522 if 'clcount' in cg.extras:
1522 if 'clcount' in cg.extras:
1523 part.addparam('nbchanges', str(cg.extras['clcount']),
1523 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1524 mandatory=False)
1524 mandatory=False)
1525 chunkiter = bundle.getchunks()
1525 chunkiter = bundle.getchunks()
1526 else:
1526 else:
1527 # compression argument is only for the bundle2 case
1527 # compression argument is only for the bundle2 case
1528 assert compression is None
1528 assert compression is None
1529 if cg.version != '01':
1529 if cg.version != '01':
1530 raise error.Abort(_('old bundle types only supports v1 '
1530 raise error.Abort(_('old bundle types only supports v1 '
1531 'changegroups'))
1531 'changegroups'))
1532 header, comp = bundletypes[bundletype]
1532 header, comp = bundletypes[bundletype]
1533 if comp not in util.compengines.supportedbundletypes:
1533 if comp not in util.compengines.supportedbundletypes:
1534 raise error.Abort(_('unknown stream compression type: %s')
1534 raise error.Abort(_('unknown stream compression type: %s')
1535 % comp)
1535 % comp)
1536 compengine = util.compengines.forbundletype(comp)
1536 compengine = util.compengines.forbundletype(comp)
1537 def chunkiter():
1537 def chunkiter():
1538 yield header
1538 yield header
1539 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1539 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1540 yield chunk
1540 yield chunk
1541 chunkiter = chunkiter()
1541 chunkiter = chunkiter()
1542
1542
1543 # parse the changegroup data, otherwise we will block
1543 # parse the changegroup data, otherwise we will block
1544 # in case of sshrepo because we don't know the end of the stream
1544 # in case of sshrepo because we don't know the end of the stream
1545 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1545 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1546
1546
1547 def combinechangegroupresults(op):
1547 def combinechangegroupresults(op):
1548 """logic to combine 0 or more addchangegroup results into one"""
1548 """logic to combine 0 or more addchangegroup results into one"""
1549 results = [r.get('return', 0)
1549 results = [r.get('return', 0)
1550 for r in op.records['changegroup']]
1550 for r in op.records['changegroup']]
1551 changedheads = 0
1551 changedheads = 0
1552 result = 1
1552 result = 1
1553 for ret in results:
1553 for ret in results:
1554 # If any changegroup result is 0, return 0
1554 # If any changegroup result is 0, return 0
1555 if ret == 0:
1555 if ret == 0:
1556 result = 0
1556 result = 0
1557 break
1557 break
1558 if ret < -1:
1558 if ret < -1:
1559 changedheads += ret + 1
1559 changedheads += ret + 1
1560 elif ret > 1:
1560 elif ret > 1:
1561 changedheads += ret - 1
1561 changedheads += ret - 1
1562 if changedheads > 0:
1562 if changedheads > 0:
1563 result = 1 + changedheads
1563 result = 1 + changedheads
1564 elif changedheads < 0:
1564 elif changedheads < 0:
1565 result = -1 + changedheads
1565 result = -1 + changedheads
1566 return result
1566 return result
1567
1567
1568 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1568 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1569 'targetphase'))
1569 'targetphase'))
1570 def handlechangegroup(op, inpart):
1570 def handlechangegroup(op, inpart):
1571 """apply a changegroup part on the repo
1571 """apply a changegroup part on the repo
1572
1572
1573 This is a very early implementation that will massive rework before being
1573 This is a very early implementation that will massive rework before being
1574 inflicted to any end-user.
1574 inflicted to any end-user.
1575 """
1575 """
1576 tr = op.gettransaction()
1576 tr = op.gettransaction()
1577 unpackerversion = inpart.params.get('version', '01')
1577 unpackerversion = inpart.params.get('version', '01')
1578 # We should raise an appropriate exception here
1578 # We should raise an appropriate exception here
1579 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1579 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1580 # the source and url passed here are overwritten by the one contained in
1580 # the source and url passed here are overwritten by the one contained in
1581 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1581 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1582 nbchangesets = None
1582 nbchangesets = None
1583 if 'nbchanges' in inpart.params:
1583 if 'nbchanges' in inpart.params:
1584 nbchangesets = int(inpart.params.get('nbchanges'))
1584 nbchangesets = int(inpart.params.get('nbchanges'))
1585 if ('treemanifest' in inpart.params and
1585 if ('treemanifest' in inpart.params and
1586 'treemanifest' not in op.repo.requirements):
1586 'treemanifest' not in op.repo.requirements):
1587 if len(op.repo.changelog) != 0:
1587 if len(op.repo.changelog) != 0:
1588 raise error.Abort(_(
1588 raise error.Abort(_(
1589 "bundle contains tree manifests, but local repo is "
1589 "bundle contains tree manifests, but local repo is "
1590 "non-empty and does not use tree manifests"))
1590 "non-empty and does not use tree manifests"))
1591 op.repo.requirements.add('treemanifest')
1591 op.repo.requirements.add('treemanifest')
1592 op.repo._applyopenerreqs()
1592 op.repo._applyopenerreqs()
1593 op.repo._writerequirements()
1593 op.repo._writerequirements()
1594 extrakwargs = {}
1594 extrakwargs = {}
1595 targetphase = inpart.params.get('targetphase')
1595 targetphase = inpart.params.get('targetphase')
1596 if targetphase is not None:
1596 if targetphase is not None:
1597 extrakwargs['targetphase'] = int(targetphase)
1597 extrakwargs['targetphase'] = int(targetphase)
1598 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1598 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1599 expectedtotal=nbchangesets, **extrakwargs)
1599 expectedtotal=nbchangesets, **extrakwargs)
1600 if op.reply is not None:
1600 if op.reply is not None:
1601 # This is definitely not the final form of this
1601 # This is definitely not the final form of this
1602 # return. But one need to start somewhere.
1602 # return. But one need to start somewhere.
1603 part = op.reply.newpart('reply:changegroup', mandatory=False)
1603 part = op.reply.newpart('reply:changegroup', mandatory=False)
1604 part.addparam(
1604 part.addparam(
1605 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1605 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1606 part.addparam('return', '%i' % ret, mandatory=False)
1606 part.addparam('return', '%i' % ret, mandatory=False)
1607 assert not inpart.read()
1607 assert not inpart.read()
1608
1608
1609 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1609 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1610 ['digest:%s' % k for k in util.DIGESTS.keys()])
1610 ['digest:%s' % k for k in util.DIGESTS.keys()])
1611 @parthandler('remote-changegroup', _remotechangegroupparams)
1611 @parthandler('remote-changegroup', _remotechangegroupparams)
1612 def handleremotechangegroup(op, inpart):
1612 def handleremotechangegroup(op, inpart):
1613 """apply a bundle10 on the repo, given an url and validation information
1613 """apply a bundle10 on the repo, given an url and validation information
1614
1614
1615 All the information about the remote bundle to import are given as
1615 All the information about the remote bundle to import are given as
1616 parameters. The parameters include:
1616 parameters. The parameters include:
1617 - url: the url to the bundle10.
1617 - url: the url to the bundle10.
1618 - size: the bundle10 file size. It is used to validate what was
1618 - size: the bundle10 file size. It is used to validate what was
1619 retrieved by the client matches the server knowledge about the bundle.
1619 retrieved by the client matches the server knowledge about the bundle.
1620 - digests: a space separated list of the digest types provided as
1620 - digests: a space separated list of the digest types provided as
1621 parameters.
1621 parameters.
1622 - digest:<digest-type>: the hexadecimal representation of the digest with
1622 - digest:<digest-type>: the hexadecimal representation of the digest with
1623 that name. Like the size, it is used to validate what was retrieved by
1623 that name. Like the size, it is used to validate what was retrieved by
1624 the client matches what the server knows about the bundle.
1624 the client matches what the server knows about the bundle.
1625
1625
1626 When multiple digest types are given, all of them are checked.
1626 When multiple digest types are given, all of them are checked.
1627 """
1627 """
1628 try:
1628 try:
1629 raw_url = inpart.params['url']
1629 raw_url = inpart.params['url']
1630 except KeyError:
1630 except KeyError:
1631 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1631 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1632 parsed_url = util.url(raw_url)
1632 parsed_url = util.url(raw_url)
1633 if parsed_url.scheme not in capabilities['remote-changegroup']:
1633 if parsed_url.scheme not in capabilities['remote-changegroup']:
1634 raise error.Abort(_('remote-changegroup does not support %s urls') %
1634 raise error.Abort(_('remote-changegroup does not support %s urls') %
1635 parsed_url.scheme)
1635 parsed_url.scheme)
1636
1636
1637 try:
1637 try:
1638 size = int(inpart.params['size'])
1638 size = int(inpart.params['size'])
1639 except ValueError:
1639 except ValueError:
1640 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1640 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1641 % 'size')
1641 % 'size')
1642 except KeyError:
1642 except KeyError:
1643 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1643 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1644
1644
1645 digests = {}
1645 digests = {}
1646 for typ in inpart.params.get('digests', '').split():
1646 for typ in inpart.params.get('digests', '').split():
1647 param = 'digest:%s' % typ
1647 param = 'digest:%s' % typ
1648 try:
1648 try:
1649 value = inpart.params[param]
1649 value = inpart.params[param]
1650 except KeyError:
1650 except KeyError:
1651 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1651 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1652 param)
1652 param)
1653 digests[typ] = value
1653 digests[typ] = value
1654
1654
1655 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1655 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1656
1656
1657 tr = op.gettransaction()
1657 tr = op.gettransaction()
1658 from . import exchange
1658 from . import exchange
1659 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1659 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1660 if not isinstance(cg, changegroup.cg1unpacker):
1660 if not isinstance(cg, changegroup.cg1unpacker):
1661 raise error.Abort(_('%s: not a bundle version 1.0') %
1661 raise error.Abort(_('%s: not a bundle version 1.0') %
1662 util.hidepassword(raw_url))
1662 util.hidepassword(raw_url))
1663 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1663 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1664 if op.reply is not None:
1664 if op.reply is not None:
1665 # This is definitely not the final form of this
1665 # This is definitely not the final form of this
1666 # return. But one need to start somewhere.
1666 # return. But one need to start somewhere.
1667 part = op.reply.newpart('reply:changegroup')
1667 part = op.reply.newpart('reply:changegroup')
1668 part.addparam(
1668 part.addparam(
1669 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1669 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1670 part.addparam('return', '%i' % ret, mandatory=False)
1670 part.addparam('return', '%i' % ret, mandatory=False)
1671 try:
1671 try:
1672 real_part.validate()
1672 real_part.validate()
1673 except error.Abort as e:
1673 except error.Abort as e:
1674 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1674 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1675 (util.hidepassword(raw_url), str(e)))
1675 (util.hidepassword(raw_url), str(e)))
1676 assert not inpart.read()
1676 assert not inpart.read()
1677
1677
1678 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1678 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1679 def handlereplychangegroup(op, inpart):
1679 def handlereplychangegroup(op, inpart):
1680 ret = int(inpart.params['return'])
1680 ret = int(inpart.params['return'])
1681 replyto = int(inpart.params['in-reply-to'])
1681 replyto = int(inpart.params['in-reply-to'])
1682 op.records.add('changegroup', {'return': ret}, replyto)
1682 op.records.add('changegroup', {'return': ret}, replyto)
1683
1683
1684 @parthandler('check:heads')
1684 @parthandler('check:heads')
1685 def handlecheckheads(op, inpart):
1685 def handlecheckheads(op, inpart):
1686 """check that head of the repo did not change
1686 """check that head of the repo did not change
1687
1687
1688 This is used to detect a push race when using unbundle.
1688 This is used to detect a push race when using unbundle.
1689 This replaces the "heads" argument of unbundle."""
1689 This replaces the "heads" argument of unbundle."""
1690 h = inpart.read(20)
1690 h = inpart.read(20)
1691 heads = []
1691 heads = []
1692 while len(h) == 20:
1692 while len(h) == 20:
1693 heads.append(h)
1693 heads.append(h)
1694 h = inpart.read(20)
1694 h = inpart.read(20)
1695 assert not h
1695 assert not h
1696 # Trigger a transaction so that we are guaranteed to have the lock now.
1696 # Trigger a transaction so that we are guaranteed to have the lock now.
1697 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1697 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1698 op.gettransaction()
1698 op.gettransaction()
1699 if sorted(heads) != sorted(op.repo.heads()):
1699 if sorted(heads) != sorted(op.repo.heads()):
1700 raise error.PushRaced('repository changed while pushing - '
1700 raise error.PushRaced('repository changed while pushing - '
1701 'please try again')
1701 'please try again')
1702
1702
1703 @parthandler('check:updated-heads')
1703 @parthandler('check:updated-heads')
1704 def handlecheckupdatedheads(op, inpart):
1704 def handlecheckupdatedheads(op, inpart):
1705 """check for race on the heads touched by a push
1705 """check for race on the heads touched by a push
1706
1706
1707 This is similar to 'check:heads' but focus on the heads actually updated
1707 This is similar to 'check:heads' but focus on the heads actually updated
1708 during the push. If other activities happen on unrelated heads, it is
1708 during the push. If other activities happen on unrelated heads, it is
1709 ignored.
1709 ignored.
1710
1710
1711 This allow server with high traffic to avoid push contention as long as
1711 This allow server with high traffic to avoid push contention as long as
1712 unrelated parts of the graph are involved."""
1712 unrelated parts of the graph are involved."""
1713 h = inpart.read(20)
1713 h = inpart.read(20)
1714 heads = []
1714 heads = []
1715 while len(h) == 20:
1715 while len(h) == 20:
1716 heads.append(h)
1716 heads.append(h)
1717 h = inpart.read(20)
1717 h = inpart.read(20)
1718 assert not h
1718 assert not h
1719 # trigger a transaction so that we are guaranteed to have the lock now.
1719 # trigger a transaction so that we are guaranteed to have the lock now.
1720 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1720 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1721 op.gettransaction()
1721 op.gettransaction()
1722
1722
1723 currentheads = set()
1723 currentheads = set()
1724 for ls in op.repo.branchmap().itervalues():
1724 for ls in op.repo.branchmap().itervalues():
1725 currentheads.update(ls)
1725 currentheads.update(ls)
1726
1726
1727 for h in heads:
1727 for h in heads:
1728 if h not in currentheads:
1728 if h not in currentheads:
1729 raise error.PushRaced('repository changed while pushing - '
1729 raise error.PushRaced('repository changed while pushing - '
1730 'please try again')
1730 'please try again')
1731
1731
1732 @parthandler('output')
1732 @parthandler('output')
1733 def handleoutput(op, inpart):
1733 def handleoutput(op, inpart):
1734 """forward output captured on the server to the client"""
1734 """forward output captured on the server to the client"""
1735 for line in inpart.read().splitlines():
1735 for line in inpart.read().splitlines():
1736 op.ui.status(_('remote: %s\n') % line)
1736 op.ui.status(_('remote: %s\n') % line)
1737
1737
1738 @parthandler('replycaps')
1738 @parthandler('replycaps')
1739 def handlereplycaps(op, inpart):
1739 def handlereplycaps(op, inpart):
1740 """Notify that a reply bundle should be created
1740 """Notify that a reply bundle should be created
1741
1741
1742 The payload contains the capabilities information for the reply"""
1742 The payload contains the capabilities information for the reply"""
1743 caps = decodecaps(inpart.read())
1743 caps = decodecaps(inpart.read())
1744 if op.reply is None:
1744 if op.reply is None:
1745 op.reply = bundle20(op.ui, caps)
1745 op.reply = bundle20(op.ui, caps)
1746
1746
1747 class AbortFromPart(error.Abort):
1747 class AbortFromPart(error.Abort):
1748 """Sub-class of Abort that denotes an error from a bundle2 part."""
1748 """Sub-class of Abort that denotes an error from a bundle2 part."""
1749
1749
1750 @parthandler('error:abort', ('message', 'hint'))
1750 @parthandler('error:abort', ('message', 'hint'))
1751 def handleerrorabort(op, inpart):
1751 def handleerrorabort(op, inpart):
1752 """Used to transmit abort error over the wire"""
1752 """Used to transmit abort error over the wire"""
1753 raise AbortFromPart(inpart.params['message'],
1753 raise AbortFromPart(inpart.params['message'],
1754 hint=inpart.params.get('hint'))
1754 hint=inpart.params.get('hint'))
1755
1755
1756 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1756 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1757 'in-reply-to'))
1757 'in-reply-to'))
1758 def handleerrorpushkey(op, inpart):
1758 def handleerrorpushkey(op, inpart):
1759 """Used to transmit failure of a mandatory pushkey over the wire"""
1759 """Used to transmit failure of a mandatory pushkey over the wire"""
1760 kwargs = {}
1760 kwargs = {}
1761 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1761 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1762 value = inpart.params.get(name)
1762 value = inpart.params.get(name)
1763 if value is not None:
1763 if value is not None:
1764 kwargs[name] = value
1764 kwargs[name] = value
1765 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1765 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1766
1766
1767 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1767 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1768 def handleerrorunsupportedcontent(op, inpart):
1768 def handleerrorunsupportedcontent(op, inpart):
1769 """Used to transmit unknown content error over the wire"""
1769 """Used to transmit unknown content error over the wire"""
1770 kwargs = {}
1770 kwargs = {}
1771 parttype = inpart.params.get('parttype')
1771 parttype = inpart.params.get('parttype')
1772 if parttype is not None:
1772 if parttype is not None:
1773 kwargs['parttype'] = parttype
1773 kwargs['parttype'] = parttype
1774 params = inpart.params.get('params')
1774 params = inpart.params.get('params')
1775 if params is not None:
1775 if params is not None:
1776 kwargs['params'] = params.split('\0')
1776 kwargs['params'] = params.split('\0')
1777
1777
1778 raise error.BundleUnknownFeatureError(**kwargs)
1778 raise error.BundleUnknownFeatureError(**kwargs)
1779
1779
1780 @parthandler('error:pushraced', ('message',))
1780 @parthandler('error:pushraced', ('message',))
1781 def handleerrorpushraced(op, inpart):
1781 def handleerrorpushraced(op, inpart):
1782 """Used to transmit push race error over the wire"""
1782 """Used to transmit push race error over the wire"""
1783 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1783 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1784
1784
1785 @parthandler('listkeys', ('namespace',))
1785 @parthandler('listkeys', ('namespace',))
1786 def handlelistkeys(op, inpart):
1786 def handlelistkeys(op, inpart):
1787 """retrieve pushkey namespace content stored in a bundle2"""
1787 """retrieve pushkey namespace content stored in a bundle2"""
1788 namespace = inpart.params['namespace']
1788 namespace = inpart.params['namespace']
1789 r = pushkey.decodekeys(inpart.read())
1789 r = pushkey.decodekeys(inpart.read())
1790 op.records.add('listkeys', (namespace, r))
1790 op.records.add('listkeys', (namespace, r))
1791
1791
1792 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1792 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1793 def handlepushkey(op, inpart):
1793 def handlepushkey(op, inpart):
1794 """process a pushkey request"""
1794 """process a pushkey request"""
1795 dec = pushkey.decode
1795 dec = pushkey.decode
1796 namespace = dec(inpart.params['namespace'])
1796 namespace = dec(inpart.params['namespace'])
1797 key = dec(inpart.params['key'])
1797 key = dec(inpart.params['key'])
1798 old = dec(inpart.params['old'])
1798 old = dec(inpart.params['old'])
1799 new = dec(inpart.params['new'])
1799 new = dec(inpart.params['new'])
1800 # Grab the transaction to ensure that we have the lock before performing the
1800 # Grab the transaction to ensure that we have the lock before performing the
1801 # pushkey.
1801 # pushkey.
1802 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1802 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1803 op.gettransaction()
1803 op.gettransaction()
1804 ret = op.repo.pushkey(namespace, key, old, new)
1804 ret = op.repo.pushkey(namespace, key, old, new)
1805 record = {'namespace': namespace,
1805 record = {'namespace': namespace,
1806 'key': key,
1806 'key': key,
1807 'old': old,
1807 'old': old,
1808 'new': new}
1808 'new': new}
1809 op.records.add('pushkey', record)
1809 op.records.add('pushkey', record)
1810 if op.reply is not None:
1810 if op.reply is not None:
1811 rpart = op.reply.newpart('reply:pushkey')
1811 rpart = op.reply.newpart('reply:pushkey')
1812 rpart.addparam(
1812 rpart.addparam(
1813 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1813 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1814 rpart.addparam('return', '%i' % ret, mandatory=False)
1814 rpart.addparam('return', '%i' % ret, mandatory=False)
1815 if inpart.mandatory and not ret:
1815 if inpart.mandatory and not ret:
1816 kwargs = {}
1816 kwargs = {}
1817 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1817 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1818 if key in inpart.params:
1818 if key in inpart.params:
1819 kwargs[key] = inpart.params[key]
1819 kwargs[key] = inpart.params[key]
1820 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1820 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1821
1821
1822 def _readphaseheads(inpart):
1822 def _readphaseheads(inpart):
1823 headsbyphase = [[] for i in phases.allphases]
1823 headsbyphase = [[] for i in phases.allphases]
1824 entrysize = struct.calcsize(_fphasesentry)
1824 entrysize = struct.calcsize(_fphasesentry)
1825 while True:
1825 while True:
1826 entry = inpart.read(entrysize)
1826 entry = inpart.read(entrysize)
1827 if len(entry) < entrysize:
1827 if len(entry) < entrysize:
1828 if entry:
1828 if entry:
1829 raise error.Abort(_('bad phase-heads bundle part'))
1829 raise error.Abort(_('bad phase-heads bundle part'))
1830 break
1830 break
1831 phase, node = struct.unpack(_fphasesentry, entry)
1831 phase, node = struct.unpack(_fphasesentry, entry)
1832 headsbyphase[phase].append(node)
1832 headsbyphase[phase].append(node)
1833 return headsbyphase
1833 return headsbyphase
1834
1834
1835 @parthandler('phase-heads')
1835 @parthandler('phase-heads')
1836 def handlephases(op, inpart):
1836 def handlephases(op, inpart):
1837 """apply phases from bundle part to repo"""
1837 """apply phases from bundle part to repo"""
1838 headsbyphase = _readphaseheads(inpart)
1838 headsbyphase = _readphaseheads(inpart)
1839 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1839 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1840 op.records.add('phase-heads', {})
1840 op.records.add('phase-heads', {})
1841
1841
1842 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1842 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1843 def handlepushkeyreply(op, inpart):
1843 def handlepushkeyreply(op, inpart):
1844 """retrieve the result of a pushkey request"""
1844 """retrieve the result of a pushkey request"""
1845 ret = int(inpart.params['return'])
1845 ret = int(inpart.params['return'])
1846 partid = int(inpart.params['in-reply-to'])
1846 partid = int(inpart.params['in-reply-to'])
1847 op.records.add('pushkey', {'return': ret}, partid)
1847 op.records.add('pushkey', {'return': ret}, partid)
1848
1848
1849 @parthandler('obsmarkers')
1849 @parthandler('obsmarkers')
1850 def handleobsmarker(op, inpart):
1850 def handleobsmarker(op, inpart):
1851 """add a stream of obsmarkers to the repo"""
1851 """add a stream of obsmarkers to the repo"""
1852 tr = op.gettransaction()
1852 tr = op.gettransaction()
1853 markerdata = inpart.read()
1853 markerdata = inpart.read()
1854 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1854 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1855 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1855 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1856 % len(markerdata))
1856 % len(markerdata))
1857 # The mergemarkers call will crash if marker creation is not enabled.
1857 # The mergemarkers call will crash if marker creation is not enabled.
1858 # we want to avoid this if the part is advisory.
1858 # we want to avoid this if the part is advisory.
1859 if not inpart.mandatory and op.repo.obsstore.readonly:
1859 if not inpart.mandatory and op.repo.obsstore.readonly:
1860 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1860 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1861 return
1861 return
1862 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1862 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1863 op.repo.invalidatevolatilesets()
1863 op.repo.invalidatevolatilesets()
1864 if new:
1864 if new:
1865 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1865 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1866 op.records.add('obsmarkers', {'new': new})
1866 op.records.add('obsmarkers', {'new': new})
1867 if op.reply is not None:
1867 if op.reply is not None:
1868 rpart = op.reply.newpart('reply:obsmarkers')
1868 rpart = op.reply.newpart('reply:obsmarkers')
1869 rpart.addparam(
1869 rpart.addparam(
1870 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1870 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1871 rpart.addparam('new', '%i' % new, mandatory=False)
1871 rpart.addparam('new', '%i' % new, mandatory=False)
1872
1872
1873
1873
1874 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1874 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1875 def handleobsmarkerreply(op, inpart):
1875 def handleobsmarkerreply(op, inpart):
1876 """retrieve the result of a pushkey request"""
1876 """retrieve the result of a pushkey request"""
1877 ret = int(inpart.params['new'])
1877 ret = int(inpart.params['new'])
1878 partid = int(inpart.params['in-reply-to'])
1878 partid = int(inpart.params['in-reply-to'])
1879 op.records.add('obsmarkers', {'new': ret}, partid)
1879 op.records.add('obsmarkers', {'new': ret}, partid)
1880
1880
1881 @parthandler('hgtagsfnodes')
1881 @parthandler('hgtagsfnodes')
1882 def handlehgtagsfnodes(op, inpart):
1882 def handlehgtagsfnodes(op, inpart):
1883 """Applies .hgtags fnodes cache entries to the local repo.
1883 """Applies .hgtags fnodes cache entries to the local repo.
1884
1884
1885 Payload is pairs of 20 byte changeset nodes and filenodes.
1885 Payload is pairs of 20 byte changeset nodes and filenodes.
1886 """
1886 """
1887 # Grab the transaction so we ensure that we have the lock at this point.
1887 # Grab the transaction so we ensure that we have the lock at this point.
1888 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1888 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1889 op.gettransaction()
1889 op.gettransaction()
1890 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1890 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1891
1891
1892 count = 0
1892 count = 0
1893 while True:
1893 while True:
1894 node = inpart.read(20)
1894 node = inpart.read(20)
1895 fnode = inpart.read(20)
1895 fnode = inpart.read(20)
1896 if len(node) < 20 or len(fnode) < 20:
1896 if len(node) < 20 or len(fnode) < 20:
1897 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1897 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1898 break
1898 break
1899 cache.setfnode(node, fnode)
1899 cache.setfnode(node, fnode)
1900 count += 1
1900 count += 1
1901
1901
1902 cache.write()
1902 cache.write()
1903 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1903 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1904
1904
1905 @parthandler('pushvars')
1905 @parthandler('pushvars')
1906 def bundle2getvars(op, part):
1906 def bundle2getvars(op, part):
1907 '''unbundle a bundle2 containing shellvars on the server'''
1907 '''unbundle a bundle2 containing shellvars on the server'''
1908 # An option to disable unbundling on server-side for security reasons
1908 # An option to disable unbundling on server-side for security reasons
1909 if op.ui.configbool('push', 'pushvars.server'):
1909 if op.ui.configbool('push', 'pushvars.server'):
1910 hookargs = {}
1910 hookargs = {}
1911 for key, value in part.advisoryparams:
1911 for key, value in part.advisoryparams:
1912 key = key.upper()
1912 key = key.upper()
1913 # We want pushed variables to have USERVAR_ prepended so we know
1913 # We want pushed variables to have USERVAR_ prepended so we know
1914 # they came from the --pushvar flag.
1914 # they came from the --pushvar flag.
1915 key = "USERVAR_" + key
1915 key = "USERVAR_" + key
1916 hookargs[key] = value
1916 hookargs[key] = value
1917 op.addhookargs(hookargs)
1917 op.addhookargs(hookargs)
@@ -1,2012 +1,2013 b''
1 # exchange.py - utility to exchange data between repos.
1 # exchange.py - utility to exchange data between repos.
2 #
2 #
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import hashlib
11 import hashlib
12
12
13 from .i18n import _
13 from .i18n import _
14 from .node import (
14 from .node import (
15 hex,
15 hex,
16 nullid,
16 nullid,
17 )
17 )
18 from . import (
18 from . import (
19 bookmarks as bookmod,
19 bookmarks as bookmod,
20 bundle2,
20 bundle2,
21 changegroup,
21 changegroup,
22 discovery,
22 discovery,
23 error,
23 error,
24 lock as lockmod,
24 lock as lockmod,
25 obsolete,
25 obsolete,
26 phases,
26 phases,
27 pushkey,
27 pushkey,
28 pycompat,
28 pycompat,
29 scmutil,
29 scmutil,
30 sslutil,
30 sslutil,
31 streamclone,
31 streamclone,
32 url as urlmod,
32 url as urlmod,
33 util,
33 util,
34 )
34 )
35
35
36 urlerr = util.urlerr
36 urlerr = util.urlerr
37 urlreq = util.urlreq
37 urlreq = util.urlreq
38
38
39 # Maps bundle version human names to changegroup versions.
39 # Maps bundle version human names to changegroup versions.
40 _bundlespeccgversions = {'v1': '01',
40 _bundlespeccgversions = {'v1': '01',
41 'v2': '02',
41 'v2': '02',
42 'packed1': 's1',
42 'packed1': 's1',
43 'bundle2': '02', #legacy
43 'bundle2': '02', #legacy
44 }
44 }
45
45
46 # Compression engines allowed in version 1. THIS SHOULD NEVER CHANGE.
46 # Compression engines allowed in version 1. THIS SHOULD NEVER CHANGE.
47 _bundlespecv1compengines = {'gzip', 'bzip2', 'none'}
47 _bundlespecv1compengines = {'gzip', 'bzip2', 'none'}
48
48
49 def parsebundlespec(repo, spec, strict=True, externalnames=False):
49 def parsebundlespec(repo, spec, strict=True, externalnames=False):
50 """Parse a bundle string specification into parts.
50 """Parse a bundle string specification into parts.
51
51
52 Bundle specifications denote a well-defined bundle/exchange format.
52 Bundle specifications denote a well-defined bundle/exchange format.
53 The content of a given specification should not change over time in
53 The content of a given specification should not change over time in
54 order to ensure that bundles produced by a newer version of Mercurial are
54 order to ensure that bundles produced by a newer version of Mercurial are
55 readable from an older version.
55 readable from an older version.
56
56
57 The string currently has the form:
57 The string currently has the form:
58
58
59 <compression>-<type>[;<parameter0>[;<parameter1>]]
59 <compression>-<type>[;<parameter0>[;<parameter1>]]
60
60
61 Where <compression> is one of the supported compression formats
61 Where <compression> is one of the supported compression formats
62 and <type> is (currently) a version string. A ";" can follow the type and
62 and <type> is (currently) a version string. A ";" can follow the type and
63 all text afterwards is interpreted as URI encoded, ";" delimited key=value
63 all text afterwards is interpreted as URI encoded, ";" delimited key=value
64 pairs.
64 pairs.
65
65
66 If ``strict`` is True (the default) <compression> is required. Otherwise,
66 If ``strict`` is True (the default) <compression> is required. Otherwise,
67 it is optional.
67 it is optional.
68
68
69 If ``externalnames`` is False (the default), the human-centric names will
69 If ``externalnames`` is False (the default), the human-centric names will
70 be converted to their internal representation.
70 be converted to their internal representation.
71
71
72 Returns a 3-tuple of (compression, version, parameters). Compression will
72 Returns a 3-tuple of (compression, version, parameters). Compression will
73 be ``None`` if not in strict mode and a compression isn't defined.
73 be ``None`` if not in strict mode and a compression isn't defined.
74
74
75 An ``InvalidBundleSpecification`` is raised when the specification is
75 An ``InvalidBundleSpecification`` is raised when the specification is
76 not syntactically well formed.
76 not syntactically well formed.
77
77
78 An ``UnsupportedBundleSpecification`` is raised when the compression or
78 An ``UnsupportedBundleSpecification`` is raised when the compression or
79 bundle type/version is not recognized.
79 bundle type/version is not recognized.
80
80
81 Note: this function will likely eventually return a more complex data
81 Note: this function will likely eventually return a more complex data
82 structure, including bundle2 part information.
82 structure, including bundle2 part information.
83 """
83 """
84 def parseparams(s):
84 def parseparams(s):
85 if ';' not in s:
85 if ';' not in s:
86 return s, {}
86 return s, {}
87
87
88 params = {}
88 params = {}
89 version, paramstr = s.split(';', 1)
89 version, paramstr = s.split(';', 1)
90
90
91 for p in paramstr.split(';'):
91 for p in paramstr.split(';'):
92 if '=' not in p:
92 if '=' not in p:
93 raise error.InvalidBundleSpecification(
93 raise error.InvalidBundleSpecification(
94 _('invalid bundle specification: '
94 _('invalid bundle specification: '
95 'missing "=" in parameter: %s') % p)
95 'missing "=" in parameter: %s') % p)
96
96
97 key, value = p.split('=', 1)
97 key, value = p.split('=', 1)
98 key = urlreq.unquote(key)
98 key = urlreq.unquote(key)
99 value = urlreq.unquote(value)
99 value = urlreq.unquote(value)
100 params[key] = value
100 params[key] = value
101
101
102 return version, params
102 return version, params
103
103
104
104
105 if strict and '-' not in spec:
105 if strict and '-' not in spec:
106 raise error.InvalidBundleSpecification(
106 raise error.InvalidBundleSpecification(
107 _('invalid bundle specification; '
107 _('invalid bundle specification; '
108 'must be prefixed with compression: %s') % spec)
108 'must be prefixed with compression: %s') % spec)
109
109
110 if '-' in spec:
110 if '-' in spec:
111 compression, version = spec.split('-', 1)
111 compression, version = spec.split('-', 1)
112
112
113 if compression not in util.compengines.supportedbundlenames:
113 if compression not in util.compengines.supportedbundlenames:
114 raise error.UnsupportedBundleSpecification(
114 raise error.UnsupportedBundleSpecification(
115 _('%s compression is not supported') % compression)
115 _('%s compression is not supported') % compression)
116
116
117 version, params = parseparams(version)
117 version, params = parseparams(version)
118
118
119 if version not in _bundlespeccgversions:
119 if version not in _bundlespeccgversions:
120 raise error.UnsupportedBundleSpecification(
120 raise error.UnsupportedBundleSpecification(
121 _('%s is not a recognized bundle version') % version)
121 _('%s is not a recognized bundle version') % version)
122 else:
122 else:
123 # Value could be just the compression or just the version, in which
123 # Value could be just the compression or just the version, in which
124 # case some defaults are assumed (but only when not in strict mode).
124 # case some defaults are assumed (but only when not in strict mode).
125 assert not strict
125 assert not strict
126
126
127 spec, params = parseparams(spec)
127 spec, params = parseparams(spec)
128
128
129 if spec in util.compengines.supportedbundlenames:
129 if spec in util.compengines.supportedbundlenames:
130 compression = spec
130 compression = spec
131 version = 'v1'
131 version = 'v1'
132 # Generaldelta repos require v2.
132 # Generaldelta repos require v2.
133 if 'generaldelta' in repo.requirements:
133 if 'generaldelta' in repo.requirements:
134 version = 'v2'
134 version = 'v2'
135 # Modern compression engines require v2.
135 # Modern compression engines require v2.
136 if compression not in _bundlespecv1compengines:
136 if compression not in _bundlespecv1compengines:
137 version = 'v2'
137 version = 'v2'
138 elif spec in _bundlespeccgversions:
138 elif spec in _bundlespeccgversions:
139 if spec == 'packed1':
139 if spec == 'packed1':
140 compression = 'none'
140 compression = 'none'
141 else:
141 else:
142 compression = 'bzip2'
142 compression = 'bzip2'
143 version = spec
143 version = spec
144 else:
144 else:
145 raise error.UnsupportedBundleSpecification(
145 raise error.UnsupportedBundleSpecification(
146 _('%s is not a recognized bundle specification') % spec)
146 _('%s is not a recognized bundle specification') % spec)
147
147
148 # Bundle version 1 only supports a known set of compression engines.
148 # Bundle version 1 only supports a known set of compression engines.
149 if version == 'v1' and compression not in _bundlespecv1compengines:
149 if version == 'v1' and compression not in _bundlespecv1compengines:
150 raise error.UnsupportedBundleSpecification(
150 raise error.UnsupportedBundleSpecification(
151 _('compression engine %s is not supported on v1 bundles') %
151 _('compression engine %s is not supported on v1 bundles') %
152 compression)
152 compression)
153
153
154 # The specification for packed1 can optionally declare the data formats
154 # The specification for packed1 can optionally declare the data formats
155 # required to apply it. If we see this metadata, compare against what the
155 # required to apply it. If we see this metadata, compare against what the
156 # repo supports and error if the bundle isn't compatible.
156 # repo supports and error if the bundle isn't compatible.
157 if version == 'packed1' and 'requirements' in params:
157 if version == 'packed1' and 'requirements' in params:
158 requirements = set(params['requirements'].split(','))
158 requirements = set(params['requirements'].split(','))
159 missingreqs = requirements - repo.supportedformats
159 missingreqs = requirements - repo.supportedformats
160 if missingreqs:
160 if missingreqs:
161 raise error.UnsupportedBundleSpecification(
161 raise error.UnsupportedBundleSpecification(
162 _('missing support for repository features: %s') %
162 _('missing support for repository features: %s') %
163 ', '.join(sorted(missingreqs)))
163 ', '.join(sorted(missingreqs)))
164
164
165 if not externalnames:
165 if not externalnames:
166 engine = util.compengines.forbundlename(compression)
166 engine = util.compengines.forbundlename(compression)
167 compression = engine.bundletype()[1]
167 compression = engine.bundletype()[1]
168 version = _bundlespeccgversions[version]
168 version = _bundlespeccgversions[version]
169 return compression, version, params
169 return compression, version, params
170
170
171 def readbundle(ui, fh, fname, vfs=None):
171 def readbundle(ui, fh, fname, vfs=None):
172 header = changegroup.readexactly(fh, 4)
172 header = changegroup.readexactly(fh, 4)
173
173
174 alg = None
174 alg = None
175 if not fname:
175 if not fname:
176 fname = "stream"
176 fname = "stream"
177 if not header.startswith('HG') and header.startswith('\0'):
177 if not header.startswith('HG') and header.startswith('\0'):
178 fh = changegroup.headerlessfixup(fh, header)
178 fh = changegroup.headerlessfixup(fh, header)
179 header = "HG10"
179 header = "HG10"
180 alg = 'UN'
180 alg = 'UN'
181 elif vfs:
181 elif vfs:
182 fname = vfs.join(fname)
182 fname = vfs.join(fname)
183
183
184 magic, version = header[0:2], header[2:4]
184 magic, version = header[0:2], header[2:4]
185
185
186 if magic != 'HG':
186 if magic != 'HG':
187 raise error.Abort(_('%s: not a Mercurial bundle') % fname)
187 raise error.Abort(_('%s: not a Mercurial bundle') % fname)
188 if version == '10':
188 if version == '10':
189 if alg is None:
189 if alg is None:
190 alg = changegroup.readexactly(fh, 2)
190 alg = changegroup.readexactly(fh, 2)
191 return changegroup.cg1unpacker(fh, alg)
191 return changegroup.cg1unpacker(fh, alg)
192 elif version.startswith('2'):
192 elif version.startswith('2'):
193 return bundle2.getunbundler(ui, fh, magicstring=magic + version)
193 return bundle2.getunbundler(ui, fh, magicstring=magic + version)
194 elif version == 'S1':
194 elif version == 'S1':
195 return streamclone.streamcloneapplier(fh)
195 return streamclone.streamcloneapplier(fh)
196 else:
196 else:
197 raise error.Abort(_('%s: unknown bundle version %s') % (fname, version))
197 raise error.Abort(_('%s: unknown bundle version %s') % (fname, version))
198
198
199 def getbundlespec(ui, fh):
199 def getbundlespec(ui, fh):
200 """Infer the bundlespec from a bundle file handle.
200 """Infer the bundlespec from a bundle file handle.
201
201
202 The input file handle is seeked and the original seek position is not
202 The input file handle is seeked and the original seek position is not
203 restored.
203 restored.
204 """
204 """
205 def speccompression(alg):
205 def speccompression(alg):
206 try:
206 try:
207 return util.compengines.forbundletype(alg).bundletype()[0]
207 return util.compengines.forbundletype(alg).bundletype()[0]
208 except KeyError:
208 except KeyError:
209 return None
209 return None
210
210
211 b = readbundle(ui, fh, None)
211 b = readbundle(ui, fh, None)
212 if isinstance(b, changegroup.cg1unpacker):
212 if isinstance(b, changegroup.cg1unpacker):
213 alg = b._type
213 alg = b._type
214 if alg == '_truncatedBZ':
214 if alg == '_truncatedBZ':
215 alg = 'BZ'
215 alg = 'BZ'
216 comp = speccompression(alg)
216 comp = speccompression(alg)
217 if not comp:
217 if not comp:
218 raise error.Abort(_('unknown compression algorithm: %s') % alg)
218 raise error.Abort(_('unknown compression algorithm: %s') % alg)
219 return '%s-v1' % comp
219 return '%s-v1' % comp
220 elif isinstance(b, bundle2.unbundle20):
220 elif isinstance(b, bundle2.unbundle20):
221 if 'Compression' in b.params:
221 if 'Compression' in b.params:
222 comp = speccompression(b.params['Compression'])
222 comp = speccompression(b.params['Compression'])
223 if not comp:
223 if not comp:
224 raise error.Abort(_('unknown compression algorithm: %s') % comp)
224 raise error.Abort(_('unknown compression algorithm: %s') % comp)
225 else:
225 else:
226 comp = 'none'
226 comp = 'none'
227
227
228 version = None
228 version = None
229 for part in b.iterparts():
229 for part in b.iterparts():
230 if part.type == 'changegroup':
230 if part.type == 'changegroup':
231 version = part.params['version']
231 version = part.params['version']
232 if version in ('01', '02'):
232 if version in ('01', '02'):
233 version = 'v2'
233 version = 'v2'
234 else:
234 else:
235 raise error.Abort(_('changegroup version %s does not have '
235 raise error.Abort(_('changegroup version %s does not have '
236 'a known bundlespec') % version,
236 'a known bundlespec') % version,
237 hint=_('try upgrading your Mercurial '
237 hint=_('try upgrading your Mercurial '
238 'client'))
238 'client'))
239
239
240 if not version:
240 if not version:
241 raise error.Abort(_('could not identify changegroup version in '
241 raise error.Abort(_('could not identify changegroup version in '
242 'bundle'))
242 'bundle'))
243
243
244 return '%s-%s' % (comp, version)
244 return '%s-%s' % (comp, version)
245 elif isinstance(b, streamclone.streamcloneapplier):
245 elif isinstance(b, streamclone.streamcloneapplier):
246 requirements = streamclone.readbundle1header(fh)[2]
246 requirements = streamclone.readbundle1header(fh)[2]
247 params = 'requirements=%s' % ','.join(sorted(requirements))
247 params = 'requirements=%s' % ','.join(sorted(requirements))
248 return 'none-packed1;%s' % urlreq.quote(params)
248 return 'none-packed1;%s' % urlreq.quote(params)
249 else:
249 else:
250 raise error.Abort(_('unknown bundle type: %s') % b)
250 raise error.Abort(_('unknown bundle type: %s') % b)
251
251
252 def _computeoutgoing(repo, heads, common):
252 def _computeoutgoing(repo, heads, common):
253 """Computes which revs are outgoing given a set of common
253 """Computes which revs are outgoing given a set of common
254 and a set of heads.
254 and a set of heads.
255
255
256 This is a separate function so extensions can have access to
256 This is a separate function so extensions can have access to
257 the logic.
257 the logic.
258
258
259 Returns a discovery.outgoing object.
259 Returns a discovery.outgoing object.
260 """
260 """
261 cl = repo.changelog
261 cl = repo.changelog
262 if common:
262 if common:
263 hasnode = cl.hasnode
263 hasnode = cl.hasnode
264 common = [n for n in common if hasnode(n)]
264 common = [n for n in common if hasnode(n)]
265 else:
265 else:
266 common = [nullid]
266 common = [nullid]
267 if not heads:
267 if not heads:
268 heads = cl.heads()
268 heads = cl.heads()
269 return discovery.outgoing(repo, common, heads)
269 return discovery.outgoing(repo, common, heads)
270
270
271 def _forcebundle1(op):
271 def _forcebundle1(op):
272 """return true if a pull/push must use bundle1
272 """return true if a pull/push must use bundle1
273
273
274 This function is used to allow testing of the older bundle version"""
274 This function is used to allow testing of the older bundle version"""
275 ui = op.repo.ui
275 ui = op.repo.ui
276 forcebundle1 = False
276 forcebundle1 = False
277 # The goal is this config is to allow developer to choose the bundle
277 # The goal is this config is to allow developer to choose the bundle
278 # version used during exchanged. This is especially handy during test.
278 # version used during exchanged. This is especially handy during test.
279 # Value is a list of bundle version to be picked from, highest version
279 # Value is a list of bundle version to be picked from, highest version
280 # should be used.
280 # should be used.
281 #
281 #
282 # developer config: devel.legacy.exchange
282 # developer config: devel.legacy.exchange
283 exchange = ui.configlist('devel', 'legacy.exchange')
283 exchange = ui.configlist('devel', 'legacy.exchange')
284 forcebundle1 = 'bundle2' not in exchange and 'bundle1' in exchange
284 forcebundle1 = 'bundle2' not in exchange and 'bundle1' in exchange
285 return forcebundle1 or not op.remote.capable('bundle2')
285 return forcebundle1 or not op.remote.capable('bundle2')
286
286
287 class pushoperation(object):
287 class pushoperation(object):
288 """A object that represent a single push operation
288 """A object that represent a single push operation
289
289
290 Its purpose is to carry push related state and very common operations.
290 Its purpose is to carry push related state and very common operations.
291
291
292 A new pushoperation should be created at the beginning of each push and
292 A new pushoperation should be created at the beginning of each push and
293 discarded afterward.
293 discarded afterward.
294 """
294 """
295
295
296 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
296 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
297 bookmarks=(), pushvars=None):
297 bookmarks=(), pushvars=None):
298 # repo we push from
298 # repo we push from
299 self.repo = repo
299 self.repo = repo
300 self.ui = repo.ui
300 self.ui = repo.ui
301 # repo we push to
301 # repo we push to
302 self.remote = remote
302 self.remote = remote
303 # force option provided
303 # force option provided
304 self.force = force
304 self.force = force
305 # revs to be pushed (None is "all")
305 # revs to be pushed (None is "all")
306 self.revs = revs
306 self.revs = revs
307 # bookmark explicitly pushed
307 # bookmark explicitly pushed
308 self.bookmarks = bookmarks
308 self.bookmarks = bookmarks
309 # allow push of new branch
309 # allow push of new branch
310 self.newbranch = newbranch
310 self.newbranch = newbranch
311 # step already performed
311 # step already performed
312 # (used to check what steps have been already performed through bundle2)
312 # (used to check what steps have been already performed through bundle2)
313 self.stepsdone = set()
313 self.stepsdone = set()
314 # Integer version of the changegroup push result
314 # Integer version of the changegroup push result
315 # - None means nothing to push
315 # - None means nothing to push
316 # - 0 means HTTP error
316 # - 0 means HTTP error
317 # - 1 means we pushed and remote head count is unchanged *or*
317 # - 1 means we pushed and remote head count is unchanged *or*
318 # we have outgoing changesets but refused to push
318 # we have outgoing changesets but refused to push
319 # - other values as described by addchangegroup()
319 # - other values as described by addchangegroup()
320 self.cgresult = None
320 self.cgresult = None
321 # Boolean value for the bookmark push
321 # Boolean value for the bookmark push
322 self.bkresult = None
322 self.bkresult = None
323 # discover.outgoing object (contains common and outgoing data)
323 # discover.outgoing object (contains common and outgoing data)
324 self.outgoing = None
324 self.outgoing = None
325 # all remote topological heads before the push
325 # all remote topological heads before the push
326 self.remoteheads = None
326 self.remoteheads = None
327 # Details of the remote branch pre and post push
327 # Details of the remote branch pre and post push
328 #
328 #
329 # mapping: {'branch': ([remoteheads],
329 # mapping: {'branch': ([remoteheads],
330 # [newheads],
330 # [newheads],
331 # [unsyncedheads],
331 # [unsyncedheads],
332 # [discardedheads])}
332 # [discardedheads])}
333 # - branch: the branch name
333 # - branch: the branch name
334 # - remoteheads: the list of remote heads known locally
334 # - remoteheads: the list of remote heads known locally
335 # None if the branch is new
335 # None if the branch is new
336 # - newheads: the new remote heads (known locally) with outgoing pushed
336 # - newheads: the new remote heads (known locally) with outgoing pushed
337 # - unsyncedheads: the list of remote heads unknown locally.
337 # - unsyncedheads: the list of remote heads unknown locally.
338 # - discardedheads: the list of remote heads made obsolete by the push
338 # - discardedheads: the list of remote heads made obsolete by the push
339 self.pushbranchmap = None
339 self.pushbranchmap = None
340 # testable as a boolean indicating if any nodes are missing locally.
340 # testable as a boolean indicating if any nodes are missing locally.
341 self.incoming = None
341 self.incoming = None
342 # phases changes that must be pushed along side the changesets
342 # phases changes that must be pushed along side the changesets
343 self.outdatedphases = None
343 self.outdatedphases = None
344 # phases changes that must be pushed if changeset push fails
344 # phases changes that must be pushed if changeset push fails
345 self.fallbackoutdatedphases = None
345 self.fallbackoutdatedphases = None
346 # outgoing obsmarkers
346 # outgoing obsmarkers
347 self.outobsmarkers = set()
347 self.outobsmarkers = set()
348 # outgoing bookmarks
348 # outgoing bookmarks
349 self.outbookmarks = []
349 self.outbookmarks = []
350 # transaction manager
350 # transaction manager
351 self.trmanager = None
351 self.trmanager = None
352 # map { pushkey partid -> callback handling failure}
352 # map { pushkey partid -> callback handling failure}
353 # used to handle exception from mandatory pushkey part failure
353 # used to handle exception from mandatory pushkey part failure
354 self.pkfailcb = {}
354 self.pkfailcb = {}
355 # an iterable of pushvars or None
355 # an iterable of pushvars or None
356 self.pushvars = pushvars
356 self.pushvars = pushvars
357
357
358 @util.propertycache
358 @util.propertycache
359 def futureheads(self):
359 def futureheads(self):
360 """future remote heads if the changeset push succeeds"""
360 """future remote heads if the changeset push succeeds"""
361 return self.outgoing.missingheads
361 return self.outgoing.missingheads
362
362
363 @util.propertycache
363 @util.propertycache
364 def fallbackheads(self):
364 def fallbackheads(self):
365 """future remote heads if the changeset push fails"""
365 """future remote heads if the changeset push fails"""
366 if self.revs is None:
366 if self.revs is None:
367 # not target to push, all common are relevant
367 # not target to push, all common are relevant
368 return self.outgoing.commonheads
368 return self.outgoing.commonheads
369 unfi = self.repo.unfiltered()
369 unfi = self.repo.unfiltered()
370 # I want cheads = heads(::missingheads and ::commonheads)
370 # I want cheads = heads(::missingheads and ::commonheads)
371 # (missingheads is revs with secret changeset filtered out)
371 # (missingheads is revs with secret changeset filtered out)
372 #
372 #
373 # This can be expressed as:
373 # This can be expressed as:
374 # cheads = ( (missingheads and ::commonheads)
374 # cheads = ( (missingheads and ::commonheads)
375 # + (commonheads and ::missingheads))"
375 # + (commonheads and ::missingheads))"
376 # )
376 # )
377 #
377 #
378 # while trying to push we already computed the following:
378 # while trying to push we already computed the following:
379 # common = (::commonheads)
379 # common = (::commonheads)
380 # missing = ((commonheads::missingheads) - commonheads)
380 # missing = ((commonheads::missingheads) - commonheads)
381 #
381 #
382 # We can pick:
382 # We can pick:
383 # * missingheads part of common (::commonheads)
383 # * missingheads part of common (::commonheads)
384 common = self.outgoing.common
384 common = self.outgoing.common
385 nm = self.repo.changelog.nodemap
385 nm = self.repo.changelog.nodemap
386 cheads = [node for node in self.revs if nm[node] in common]
386 cheads = [node for node in self.revs if nm[node] in common]
387 # and
387 # and
388 # * commonheads parents on missing
388 # * commonheads parents on missing
389 revset = unfi.set('%ln and parents(roots(%ln))',
389 revset = unfi.set('%ln and parents(roots(%ln))',
390 self.outgoing.commonheads,
390 self.outgoing.commonheads,
391 self.outgoing.missing)
391 self.outgoing.missing)
392 cheads.extend(c.node() for c in revset)
392 cheads.extend(c.node() for c in revset)
393 return cheads
393 return cheads
394
394
395 @property
395 @property
396 def commonheads(self):
396 def commonheads(self):
397 """set of all common heads after changeset bundle push"""
397 """set of all common heads after changeset bundle push"""
398 if self.cgresult:
398 if self.cgresult:
399 return self.futureheads
399 return self.futureheads
400 else:
400 else:
401 return self.fallbackheads
401 return self.fallbackheads
402
402
403 # mapping of message used when pushing bookmark
403 # mapping of message used when pushing bookmark
404 bookmsgmap = {'update': (_("updating bookmark %s\n"),
404 bookmsgmap = {'update': (_("updating bookmark %s\n"),
405 _('updating bookmark %s failed!\n')),
405 _('updating bookmark %s failed!\n')),
406 'export': (_("exporting bookmark %s\n"),
406 'export': (_("exporting bookmark %s\n"),
407 _('exporting bookmark %s failed!\n')),
407 _('exporting bookmark %s failed!\n')),
408 'delete': (_("deleting remote bookmark %s\n"),
408 'delete': (_("deleting remote bookmark %s\n"),
409 _('deleting remote bookmark %s failed!\n')),
409 _('deleting remote bookmark %s failed!\n')),
410 }
410 }
411
411
412
412
413 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=(),
413 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=(),
414 opargs=None):
414 opargs=None):
415 '''Push outgoing changesets (limited by revs) from a local
415 '''Push outgoing changesets (limited by revs) from a local
416 repository to remote. Return an integer:
416 repository to remote. Return an integer:
417 - None means nothing to push
417 - None means nothing to push
418 - 0 means HTTP error
418 - 0 means HTTP error
419 - 1 means we pushed and remote head count is unchanged *or*
419 - 1 means we pushed and remote head count is unchanged *or*
420 we have outgoing changesets but refused to push
420 we have outgoing changesets but refused to push
421 - other values as described by addchangegroup()
421 - other values as described by addchangegroup()
422 '''
422 '''
423 if opargs is None:
423 if opargs is None:
424 opargs = {}
424 opargs = {}
425 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks,
425 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks,
426 **pycompat.strkwargs(opargs))
426 **pycompat.strkwargs(opargs))
427 if pushop.remote.local():
427 if pushop.remote.local():
428 missing = (set(pushop.repo.requirements)
428 missing = (set(pushop.repo.requirements)
429 - pushop.remote.local().supported)
429 - pushop.remote.local().supported)
430 if missing:
430 if missing:
431 msg = _("required features are not"
431 msg = _("required features are not"
432 " supported in the destination:"
432 " supported in the destination:"
433 " %s") % (', '.join(sorted(missing)))
433 " %s") % (', '.join(sorted(missing)))
434 raise error.Abort(msg)
434 raise error.Abort(msg)
435
435
436 if not pushop.remote.canpush():
436 if not pushop.remote.canpush():
437 raise error.Abort(_("destination does not support push"))
437 raise error.Abort(_("destination does not support push"))
438
438
439 if not pushop.remote.capable('unbundle'):
439 if not pushop.remote.capable('unbundle'):
440 raise error.Abort(_('cannot push: destination does not support the '
440 raise error.Abort(_('cannot push: destination does not support the '
441 'unbundle wire protocol command'))
441 'unbundle wire protocol command'))
442
442
443 # get lock as we might write phase data
443 # get lock as we might write phase data
444 wlock = lock = None
444 wlock = lock = None
445 try:
445 try:
446 # bundle2 push may receive a reply bundle touching bookmarks or other
446 # bundle2 push may receive a reply bundle touching bookmarks or other
447 # things requiring the wlock. Take it now to ensure proper ordering.
447 # things requiring the wlock. Take it now to ensure proper ordering.
448 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
448 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
449 if (not _forcebundle1(pushop)) and maypushback:
449 if (not _forcebundle1(pushop)) and maypushback:
450 wlock = pushop.repo.wlock()
450 wlock = pushop.repo.wlock()
451 lock = pushop.repo.lock()
451 lock = pushop.repo.lock()
452 pushop.trmanager = transactionmanager(pushop.repo,
452 pushop.trmanager = transactionmanager(pushop.repo,
453 'push-response',
453 'push-response',
454 pushop.remote.url())
454 pushop.remote.url())
455 except IOError as err:
455 except IOError as err:
456 if err.errno != errno.EACCES:
456 if err.errno != errno.EACCES:
457 raise
457 raise
458 # source repo cannot be locked.
458 # source repo cannot be locked.
459 # We do not abort the push, but just disable the local phase
459 # We do not abort the push, but just disable the local phase
460 # synchronisation.
460 # synchronisation.
461 msg = 'cannot lock source repository: %s\n' % err
461 msg = 'cannot lock source repository: %s\n' % err
462 pushop.ui.debug(msg)
462 pushop.ui.debug(msg)
463
463
464 with wlock or util.nullcontextmanager(), \
464 with wlock or util.nullcontextmanager(), \
465 lock or util.nullcontextmanager(), \
465 lock or util.nullcontextmanager(), \
466 pushop.trmanager or util.nullcontextmanager():
466 pushop.trmanager or util.nullcontextmanager():
467 pushop.repo.checkpush(pushop)
467 pushop.repo.checkpush(pushop)
468 _pushdiscovery(pushop)
468 _pushdiscovery(pushop)
469 if not _forcebundle1(pushop):
469 if not _forcebundle1(pushop):
470 _pushbundle2(pushop)
470 _pushbundle2(pushop)
471 _pushchangeset(pushop)
471 _pushchangeset(pushop)
472 _pushsyncphase(pushop)
472 _pushsyncphase(pushop)
473 _pushobsolete(pushop)
473 _pushobsolete(pushop)
474 _pushbookmark(pushop)
474 _pushbookmark(pushop)
475
475
476 return pushop
476 return pushop
477
477
478 # list of steps to perform discovery before push
478 # list of steps to perform discovery before push
479 pushdiscoveryorder = []
479 pushdiscoveryorder = []
480
480
481 # Mapping between step name and function
481 # Mapping between step name and function
482 #
482 #
483 # This exists to help extensions wrap steps if necessary
483 # This exists to help extensions wrap steps if necessary
484 pushdiscoverymapping = {}
484 pushdiscoverymapping = {}
485
485
486 def pushdiscovery(stepname):
486 def pushdiscovery(stepname):
487 """decorator for function performing discovery before push
487 """decorator for function performing discovery before push
488
488
489 The function is added to the step -> function mapping and appended to the
489 The function is added to the step -> function mapping and appended to the
490 list of steps. Beware that decorated function will be added in order (this
490 list of steps. Beware that decorated function will be added in order (this
491 may matter).
491 may matter).
492
492
493 You can only use this decorator for a new step, if you want to wrap a step
493 You can only use this decorator for a new step, if you want to wrap a step
494 from an extension, change the pushdiscovery dictionary directly."""
494 from an extension, change the pushdiscovery dictionary directly."""
495 def dec(func):
495 def dec(func):
496 assert stepname not in pushdiscoverymapping
496 assert stepname not in pushdiscoverymapping
497 pushdiscoverymapping[stepname] = func
497 pushdiscoverymapping[stepname] = func
498 pushdiscoveryorder.append(stepname)
498 pushdiscoveryorder.append(stepname)
499 return func
499 return func
500 return dec
500 return dec
501
501
502 def _pushdiscovery(pushop):
502 def _pushdiscovery(pushop):
503 """Run all discovery steps"""
503 """Run all discovery steps"""
504 for stepname in pushdiscoveryorder:
504 for stepname in pushdiscoveryorder:
505 step = pushdiscoverymapping[stepname]
505 step = pushdiscoverymapping[stepname]
506 step(pushop)
506 step(pushop)
507
507
508 @pushdiscovery('changeset')
508 @pushdiscovery('changeset')
509 def _pushdiscoverychangeset(pushop):
509 def _pushdiscoverychangeset(pushop):
510 """discover the changeset that need to be pushed"""
510 """discover the changeset that need to be pushed"""
511 fci = discovery.findcommonincoming
511 fci = discovery.findcommonincoming
512 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
512 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
513 common, inc, remoteheads = commoninc
513 common, inc, remoteheads = commoninc
514 fco = discovery.findcommonoutgoing
514 fco = discovery.findcommonoutgoing
515 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
515 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
516 commoninc=commoninc, force=pushop.force)
516 commoninc=commoninc, force=pushop.force)
517 pushop.outgoing = outgoing
517 pushop.outgoing = outgoing
518 pushop.remoteheads = remoteheads
518 pushop.remoteheads = remoteheads
519 pushop.incoming = inc
519 pushop.incoming = inc
520
520
521 @pushdiscovery('phase')
521 @pushdiscovery('phase')
522 def _pushdiscoveryphase(pushop):
522 def _pushdiscoveryphase(pushop):
523 """discover the phase that needs to be pushed
523 """discover the phase that needs to be pushed
524
524
525 (computed for both success and failure case for changesets push)"""
525 (computed for both success and failure case for changesets push)"""
526 outgoing = pushop.outgoing
526 outgoing = pushop.outgoing
527 unfi = pushop.repo.unfiltered()
527 unfi = pushop.repo.unfiltered()
528 remotephases = pushop.remote.listkeys('phases')
528 remotephases = pushop.remote.listkeys('phases')
529 publishing = remotephases.get('publishing', False)
529 publishing = remotephases.get('publishing', False)
530 if (pushop.ui.configbool('ui', '_usedassubrepo')
530 if (pushop.ui.configbool('ui', '_usedassubrepo')
531 and remotephases # server supports phases
531 and remotephases # server supports phases
532 and not pushop.outgoing.missing # no changesets to be pushed
532 and not pushop.outgoing.missing # no changesets to be pushed
533 and publishing):
533 and publishing):
534 # When:
534 # When:
535 # - this is a subrepo push
535 # - this is a subrepo push
536 # - and remote support phase
536 # - and remote support phase
537 # - and no changeset are to be pushed
537 # - and no changeset are to be pushed
538 # - and remote is publishing
538 # - and remote is publishing
539 # We may be in issue 3871 case!
539 # We may be in issue 3871 case!
540 # We drop the possible phase synchronisation done by
540 # We drop the possible phase synchronisation done by
541 # courtesy to publish changesets possibly locally draft
541 # courtesy to publish changesets possibly locally draft
542 # on the remote.
542 # on the remote.
543 remotephases = {'publishing': 'True'}
543 remotephases = {'publishing': 'True'}
544 ana = phases.analyzeremotephases(pushop.repo,
544 ana = phases.analyzeremotephases(pushop.repo,
545 pushop.fallbackheads,
545 pushop.fallbackheads,
546 remotephases)
546 remotephases)
547 pheads, droots = ana
547 pheads, droots = ana
548 extracond = ''
548 extracond = ''
549 if not publishing:
549 if not publishing:
550 extracond = ' and public()'
550 extracond = ' and public()'
551 revset = 'heads((%%ln::%%ln) %s)' % extracond
551 revset = 'heads((%%ln::%%ln) %s)' % extracond
552 # Get the list of all revs draft on remote by public here.
552 # Get the list of all revs draft on remote by public here.
553 # XXX Beware that revset break if droots is not strictly
553 # XXX Beware that revset break if droots is not strictly
554 # XXX root we may want to ensure it is but it is costly
554 # XXX root we may want to ensure it is but it is costly
555 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
555 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
556 if not outgoing.missing:
556 if not outgoing.missing:
557 future = fallback
557 future = fallback
558 else:
558 else:
559 # adds changeset we are going to push as draft
559 # adds changeset we are going to push as draft
560 #
560 #
561 # should not be necessary for publishing server, but because of an
561 # should not be necessary for publishing server, but because of an
562 # issue fixed in xxxxx we have to do it anyway.
562 # issue fixed in xxxxx we have to do it anyway.
563 fdroots = list(unfi.set('roots(%ln + %ln::)',
563 fdroots = list(unfi.set('roots(%ln + %ln::)',
564 outgoing.missing, droots))
564 outgoing.missing, droots))
565 fdroots = [f.node() for f in fdroots]
565 fdroots = [f.node() for f in fdroots]
566 future = list(unfi.set(revset, fdroots, pushop.futureheads))
566 future = list(unfi.set(revset, fdroots, pushop.futureheads))
567 pushop.outdatedphases = future
567 pushop.outdatedphases = future
568 pushop.fallbackoutdatedphases = fallback
568 pushop.fallbackoutdatedphases = fallback
569
569
570 @pushdiscovery('obsmarker')
570 @pushdiscovery('obsmarker')
571 def _pushdiscoveryobsmarkers(pushop):
571 def _pushdiscoveryobsmarkers(pushop):
572 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
572 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
573 and pushop.repo.obsstore
573 and pushop.repo.obsstore
574 and 'obsolete' in pushop.remote.listkeys('namespaces')):
574 and 'obsolete' in pushop.remote.listkeys('namespaces')):
575 repo = pushop.repo
575 repo = pushop.repo
576 # very naive computation, that can be quite expensive on big repo.
576 # very naive computation, that can be quite expensive on big repo.
577 # However: evolution is currently slow on them anyway.
577 # However: evolution is currently slow on them anyway.
578 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
578 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
579 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
579 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
580
580
581 @pushdiscovery('bookmarks')
581 @pushdiscovery('bookmarks')
582 def _pushdiscoverybookmarks(pushop):
582 def _pushdiscoverybookmarks(pushop):
583 ui = pushop.ui
583 ui = pushop.ui
584 repo = pushop.repo.unfiltered()
584 repo = pushop.repo.unfiltered()
585 remote = pushop.remote
585 remote = pushop.remote
586 ui.debug("checking for updated bookmarks\n")
586 ui.debug("checking for updated bookmarks\n")
587 ancestors = ()
587 ancestors = ()
588 if pushop.revs:
588 if pushop.revs:
589 revnums = map(repo.changelog.rev, pushop.revs)
589 revnums = map(repo.changelog.rev, pushop.revs)
590 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
590 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
591 remotebookmark = remote.listkeys('bookmarks')
591 remotebookmark = remote.listkeys('bookmarks')
592
592
593 explicit = set([repo._bookmarks.expandname(bookmark)
593 explicit = set([repo._bookmarks.expandname(bookmark)
594 for bookmark in pushop.bookmarks])
594 for bookmark in pushop.bookmarks])
595
595
596 remotebookmark = bookmod.unhexlifybookmarks(remotebookmark)
596 remotebookmark = bookmod.unhexlifybookmarks(remotebookmark)
597 comp = bookmod.comparebookmarks(repo, repo._bookmarks, remotebookmark)
597 comp = bookmod.comparebookmarks(repo, repo._bookmarks, remotebookmark)
598
598
599 def safehex(x):
599 def safehex(x):
600 if x is None:
600 if x is None:
601 return x
601 return x
602 return hex(x)
602 return hex(x)
603
603
604 def hexifycompbookmarks(bookmarks):
604 def hexifycompbookmarks(bookmarks):
605 for b, scid, dcid in bookmarks:
605 for b, scid, dcid in bookmarks:
606 yield b, safehex(scid), safehex(dcid)
606 yield b, safehex(scid), safehex(dcid)
607
607
608 comp = [hexifycompbookmarks(marks) for marks in comp]
608 comp = [hexifycompbookmarks(marks) for marks in comp]
609 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
609 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
610
610
611 for b, scid, dcid in advsrc:
611 for b, scid, dcid in advsrc:
612 if b in explicit:
612 if b in explicit:
613 explicit.remove(b)
613 explicit.remove(b)
614 if not ancestors or repo[scid].rev() in ancestors:
614 if not ancestors or repo[scid].rev() in ancestors:
615 pushop.outbookmarks.append((b, dcid, scid))
615 pushop.outbookmarks.append((b, dcid, scid))
616 # search added bookmark
616 # search added bookmark
617 for b, scid, dcid in addsrc:
617 for b, scid, dcid in addsrc:
618 if b in explicit:
618 if b in explicit:
619 explicit.remove(b)
619 explicit.remove(b)
620 pushop.outbookmarks.append((b, '', scid))
620 pushop.outbookmarks.append((b, '', scid))
621 # search for overwritten bookmark
621 # search for overwritten bookmark
622 for b, scid, dcid in list(advdst) + list(diverge) + list(differ):
622 for b, scid, dcid in list(advdst) + list(diverge) + list(differ):
623 if b in explicit:
623 if b in explicit:
624 explicit.remove(b)
624 explicit.remove(b)
625 pushop.outbookmarks.append((b, dcid, scid))
625 pushop.outbookmarks.append((b, dcid, scid))
626 # search for bookmark to delete
626 # search for bookmark to delete
627 for b, scid, dcid in adddst:
627 for b, scid, dcid in adddst:
628 if b in explicit:
628 if b in explicit:
629 explicit.remove(b)
629 explicit.remove(b)
630 # treat as "deleted locally"
630 # treat as "deleted locally"
631 pushop.outbookmarks.append((b, dcid, ''))
631 pushop.outbookmarks.append((b, dcid, ''))
632 # identical bookmarks shouldn't get reported
632 # identical bookmarks shouldn't get reported
633 for b, scid, dcid in same:
633 for b, scid, dcid in same:
634 if b in explicit:
634 if b in explicit:
635 explicit.remove(b)
635 explicit.remove(b)
636
636
637 if explicit:
637 if explicit:
638 explicit = sorted(explicit)
638 explicit = sorted(explicit)
639 # we should probably list all of them
639 # we should probably list all of them
640 ui.warn(_('bookmark %s does not exist on the local '
640 ui.warn(_('bookmark %s does not exist on the local '
641 'or remote repository!\n') % explicit[0])
641 'or remote repository!\n') % explicit[0])
642 pushop.bkresult = 2
642 pushop.bkresult = 2
643
643
644 pushop.outbookmarks.sort()
644 pushop.outbookmarks.sort()
645
645
646 def _pushcheckoutgoing(pushop):
646 def _pushcheckoutgoing(pushop):
647 outgoing = pushop.outgoing
647 outgoing = pushop.outgoing
648 unfi = pushop.repo.unfiltered()
648 unfi = pushop.repo.unfiltered()
649 if not outgoing.missing:
649 if not outgoing.missing:
650 # nothing to push
650 # nothing to push
651 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
651 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
652 return False
652 return False
653 # something to push
653 # something to push
654 if not pushop.force:
654 if not pushop.force:
655 # if repo.obsstore == False --> no obsolete
655 # if repo.obsstore == False --> no obsolete
656 # then, save the iteration
656 # then, save the iteration
657 if unfi.obsstore:
657 if unfi.obsstore:
658 # this message are here for 80 char limit reason
658 # this message are here for 80 char limit reason
659 mso = _("push includes obsolete changeset: %s!")
659 mso = _("push includes obsolete changeset: %s!")
660 mspd = _("push includes phase-divergent changeset: %s!")
660 mspd = _("push includes phase-divergent changeset: %s!")
661 mscd = _("push includes content-divergent changeset: %s!")
661 mscd = _("push includes content-divergent changeset: %s!")
662 mst = {"orphan": _("push includes orphan changeset: %s!"),
662 mst = {"orphan": _("push includes orphan changeset: %s!"),
663 "phase-divergent": mspd,
663 "phase-divergent": mspd,
664 "content-divergent": mscd}
664 "content-divergent": mscd}
665 # If we are to push if there is at least one
665 # If we are to push if there is at least one
666 # obsolete or unstable changeset in missing, at
666 # obsolete or unstable changeset in missing, at
667 # least one of the missinghead will be obsolete or
667 # least one of the missinghead will be obsolete or
668 # unstable. So checking heads only is ok
668 # unstable. So checking heads only is ok
669 for node in outgoing.missingheads:
669 for node in outgoing.missingheads:
670 ctx = unfi[node]
670 ctx = unfi[node]
671 if ctx.obsolete():
671 if ctx.obsolete():
672 raise error.Abort(mso % ctx)
672 raise error.Abort(mso % ctx)
673 elif ctx.isunstable():
673 elif ctx.isunstable():
674 # TODO print more than one instability in the abort
674 # TODO print more than one instability in the abort
675 # message
675 # message
676 raise error.Abort(mst[ctx.instabilities()[0]] % ctx)
676 raise error.Abort(mst[ctx.instabilities()[0]] % ctx)
677
677
678 discovery.checkheads(pushop)
678 discovery.checkheads(pushop)
679 return True
679 return True
680
680
681 # List of names of steps to perform for an outgoing bundle2, order matters.
681 # List of names of steps to perform for an outgoing bundle2, order matters.
682 b2partsgenorder = []
682 b2partsgenorder = []
683
683
684 # Mapping between step name and function
684 # Mapping between step name and function
685 #
685 #
686 # This exists to help extensions wrap steps if necessary
686 # This exists to help extensions wrap steps if necessary
687 b2partsgenmapping = {}
687 b2partsgenmapping = {}
688
688
689 def b2partsgenerator(stepname, idx=None):
689 def b2partsgenerator(stepname, idx=None):
690 """decorator for function generating bundle2 part
690 """decorator for function generating bundle2 part
691
691
692 The function is added to the step -> function mapping and appended to the
692 The function is added to the step -> function mapping and appended to the
693 list of steps. Beware that decorated functions will be added in order
693 list of steps. Beware that decorated functions will be added in order
694 (this may matter).
694 (this may matter).
695
695
696 You can only use this decorator for new steps, if you want to wrap a step
696 You can only use this decorator for new steps, if you want to wrap a step
697 from an extension, attack the b2partsgenmapping dictionary directly."""
697 from an extension, attack the b2partsgenmapping dictionary directly."""
698 def dec(func):
698 def dec(func):
699 assert stepname not in b2partsgenmapping
699 assert stepname not in b2partsgenmapping
700 b2partsgenmapping[stepname] = func
700 b2partsgenmapping[stepname] = func
701 if idx is None:
701 if idx is None:
702 b2partsgenorder.append(stepname)
702 b2partsgenorder.append(stepname)
703 else:
703 else:
704 b2partsgenorder.insert(idx, stepname)
704 b2partsgenorder.insert(idx, stepname)
705 return func
705 return func
706 return dec
706 return dec
707
707
708 def _pushb2ctxcheckheads(pushop, bundler):
708 def _pushb2ctxcheckheads(pushop, bundler):
709 """Generate race condition checking parts
709 """Generate race condition checking parts
710
710
711 Exists as an independent function to aid extensions
711 Exists as an independent function to aid extensions
712 """
712 """
713 # * 'force' do not check for push race,
713 # * 'force' do not check for push race,
714 # * if we don't push anything, there are nothing to check.
714 # * if we don't push anything, there are nothing to check.
715 if not pushop.force and pushop.outgoing.missingheads:
715 if not pushop.force and pushop.outgoing.missingheads:
716 allowunrelated = 'related' in bundler.capabilities.get('checkheads', ())
716 allowunrelated = 'related' in bundler.capabilities.get('checkheads', ())
717 emptyremote = pushop.pushbranchmap is None
717 emptyremote = pushop.pushbranchmap is None
718 if not allowunrelated or emptyremote:
718 if not allowunrelated or emptyremote:
719 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
719 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
720 else:
720 else:
721 affected = set()
721 affected = set()
722 for branch, heads in pushop.pushbranchmap.iteritems():
722 for branch, heads in pushop.pushbranchmap.iteritems():
723 remoteheads, newheads, unsyncedheads, discardedheads = heads
723 remoteheads, newheads, unsyncedheads, discardedheads = heads
724 if remoteheads is not None:
724 if remoteheads is not None:
725 remote = set(remoteheads)
725 remote = set(remoteheads)
726 affected |= set(discardedheads) & remote
726 affected |= set(discardedheads) & remote
727 affected |= remote - set(newheads)
727 affected |= remote - set(newheads)
728 if affected:
728 if affected:
729 data = iter(sorted(affected))
729 data = iter(sorted(affected))
730 bundler.newpart('check:updated-heads', data=data)
730 bundler.newpart('check:updated-heads', data=data)
731
731
732 @b2partsgenerator('changeset')
732 @b2partsgenerator('changeset')
733 def _pushb2ctx(pushop, bundler):
733 def _pushb2ctx(pushop, bundler):
734 """handle changegroup push through bundle2
734 """handle changegroup push through bundle2
735
735
736 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
736 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
737 """
737 """
738 if 'changesets' in pushop.stepsdone:
738 if 'changesets' in pushop.stepsdone:
739 return
739 return
740 pushop.stepsdone.add('changesets')
740 pushop.stepsdone.add('changesets')
741 # Send known heads to the server for race detection.
741 # Send known heads to the server for race detection.
742 if not _pushcheckoutgoing(pushop):
742 if not _pushcheckoutgoing(pushop):
743 return
743 return
744 pushop.repo.prepushoutgoinghooks(pushop)
744 pushop.repo.prepushoutgoinghooks(pushop)
745
745
746 _pushb2ctxcheckheads(pushop, bundler)
746 _pushb2ctxcheckheads(pushop, bundler)
747
747
748 b2caps = bundle2.bundle2caps(pushop.remote)
748 b2caps = bundle2.bundle2caps(pushop.remote)
749 version = '01'
749 version = '01'
750 cgversions = b2caps.get('changegroup')
750 cgversions = b2caps.get('changegroup')
751 if cgversions: # 3.1 and 3.2 ship with an empty value
751 if cgversions: # 3.1 and 3.2 ship with an empty value
752 cgversions = [v for v in cgversions
752 cgversions = [v for v in cgversions
753 if v in changegroup.supportedoutgoingversions(
753 if v in changegroup.supportedoutgoingversions(
754 pushop.repo)]
754 pushop.repo)]
755 if not cgversions:
755 if not cgversions:
756 raise ValueError(_('no common changegroup version'))
756 raise ValueError(_('no common changegroup version'))
757 version = max(cgversions)
757 version = max(cgversions)
758 cgstream = changegroup.makestream(pushop.repo, pushop.outgoing, version,
758 cgstream = changegroup.makestream(pushop.repo, pushop.outgoing, version,
759 'push')
759 'push')
760 cgpart = bundler.newpart('changegroup', data=cgstream)
760 cgpart = bundler.newpart('changegroup', data=cgstream)
761 if cgversions:
761 if cgversions:
762 cgpart.addparam('version', version)
762 cgpart.addparam('version', version)
763 if 'treemanifest' in pushop.repo.requirements:
763 if 'treemanifest' in pushop.repo.requirements:
764 cgpart.addparam('treemanifest', '1')
764 cgpart.addparam('treemanifest', '1')
765 def handlereply(op):
765 def handlereply(op):
766 """extract addchangegroup returns from server reply"""
766 """extract addchangegroup returns from server reply"""
767 cgreplies = op.records.getreplies(cgpart.id)
767 cgreplies = op.records.getreplies(cgpart.id)
768 assert len(cgreplies['changegroup']) == 1
768 assert len(cgreplies['changegroup']) == 1
769 pushop.cgresult = cgreplies['changegroup'][0]['return']
769 pushop.cgresult = cgreplies['changegroup'][0]['return']
770 return handlereply
770 return handlereply
771
771
772 @b2partsgenerator('phase')
772 @b2partsgenerator('phase')
773 def _pushb2phases(pushop, bundler):
773 def _pushb2phases(pushop, bundler):
774 """handle phase push through bundle2"""
774 """handle phase push through bundle2"""
775 if 'phases' in pushop.stepsdone:
775 if 'phases' in pushop.stepsdone:
776 return
776 return
777 b2caps = bundle2.bundle2caps(pushop.remote)
777 b2caps = bundle2.bundle2caps(pushop.remote)
778 if not 'pushkey' in b2caps:
778 if not 'pushkey' in b2caps:
779 return
779 return
780 pushop.stepsdone.add('phases')
780 pushop.stepsdone.add('phases')
781 part2node = []
781 part2node = []
782
782
783 def handlefailure(pushop, exc):
783 def handlefailure(pushop, exc):
784 targetid = int(exc.partid)
784 targetid = int(exc.partid)
785 for partid, node in part2node:
785 for partid, node in part2node:
786 if partid == targetid:
786 if partid == targetid:
787 raise error.Abort(_('updating %s to public failed') % node)
787 raise error.Abort(_('updating %s to public failed') % node)
788
788
789 enc = pushkey.encode
789 enc = pushkey.encode
790 for newremotehead in pushop.outdatedphases:
790 for newremotehead in pushop.outdatedphases:
791 part = bundler.newpart('pushkey')
791 part = bundler.newpart('pushkey')
792 part.addparam('namespace', enc('phases'))
792 part.addparam('namespace', enc('phases'))
793 part.addparam('key', enc(newremotehead.hex()))
793 part.addparam('key', enc(newremotehead.hex()))
794 part.addparam('old', enc('%d' % phases.draft))
794 part.addparam('old', enc('%d' % phases.draft))
795 part.addparam('new', enc('%d' % phases.public))
795 part.addparam('new', enc('%d' % phases.public))
796 part2node.append((part.id, newremotehead))
796 part2node.append((part.id, newremotehead))
797 pushop.pkfailcb[part.id] = handlefailure
797 pushop.pkfailcb[part.id] = handlefailure
798
798
799 def handlereply(op):
799 def handlereply(op):
800 for partid, node in part2node:
800 for partid, node in part2node:
801 partrep = op.records.getreplies(partid)
801 partrep = op.records.getreplies(partid)
802 results = partrep['pushkey']
802 results = partrep['pushkey']
803 assert len(results) <= 1
803 assert len(results) <= 1
804 msg = None
804 msg = None
805 if not results:
805 if not results:
806 msg = _('server ignored update of %s to public!\n') % node
806 msg = _('server ignored update of %s to public!\n') % node
807 elif not int(results[0]['return']):
807 elif not int(results[0]['return']):
808 msg = _('updating %s to public failed!\n') % node
808 msg = _('updating %s to public failed!\n') % node
809 if msg is not None:
809 if msg is not None:
810 pushop.ui.warn(msg)
810 pushop.ui.warn(msg)
811 return handlereply
811 return handlereply
812
812
813 @b2partsgenerator('obsmarkers')
813 @b2partsgenerator('obsmarkers')
814 def _pushb2obsmarkers(pushop, bundler):
814 def _pushb2obsmarkers(pushop, bundler):
815 if 'obsmarkers' in pushop.stepsdone:
815 if 'obsmarkers' in pushop.stepsdone:
816 return
816 return
817 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
817 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
818 if obsolete.commonversion(remoteversions) is None:
818 if obsolete.commonversion(remoteversions) is None:
819 return
819 return
820 pushop.stepsdone.add('obsmarkers')
820 pushop.stepsdone.add('obsmarkers')
821 if pushop.outobsmarkers:
821 if pushop.outobsmarkers:
822 markers = sorted(pushop.outobsmarkers)
822 markers = sorted(pushop.outobsmarkers)
823 bundle2.buildobsmarkerspart(bundler, markers)
823 bundle2.buildobsmarkerspart(bundler, markers)
824
824
825 @b2partsgenerator('bookmarks')
825 @b2partsgenerator('bookmarks')
826 def _pushb2bookmarks(pushop, bundler):
826 def _pushb2bookmarks(pushop, bundler):
827 """handle bookmark push through bundle2"""
827 """handle bookmark push through bundle2"""
828 if 'bookmarks' in pushop.stepsdone:
828 if 'bookmarks' in pushop.stepsdone:
829 return
829 return
830 b2caps = bundle2.bundle2caps(pushop.remote)
830 b2caps = bundle2.bundle2caps(pushop.remote)
831 if 'pushkey' not in b2caps:
831 if 'pushkey' not in b2caps:
832 return
832 return
833 pushop.stepsdone.add('bookmarks')
833 pushop.stepsdone.add('bookmarks')
834 part2book = []
834 part2book = []
835 enc = pushkey.encode
835 enc = pushkey.encode
836
836
837 def handlefailure(pushop, exc):
837 def handlefailure(pushop, exc):
838 targetid = int(exc.partid)
838 targetid = int(exc.partid)
839 for partid, book, action in part2book:
839 for partid, book, action in part2book:
840 if partid == targetid:
840 if partid == targetid:
841 raise error.Abort(bookmsgmap[action][1].rstrip() % book)
841 raise error.Abort(bookmsgmap[action][1].rstrip() % book)
842 # we should not be called for part we did not generated
842 # we should not be called for part we did not generated
843 assert False
843 assert False
844
844
845 for book, old, new in pushop.outbookmarks:
845 for book, old, new in pushop.outbookmarks:
846 part = bundler.newpart('pushkey')
846 part = bundler.newpart('pushkey')
847 part.addparam('namespace', enc('bookmarks'))
847 part.addparam('namespace', enc('bookmarks'))
848 part.addparam('key', enc(book))
848 part.addparam('key', enc(book))
849 part.addparam('old', enc(old))
849 part.addparam('old', enc(old))
850 part.addparam('new', enc(new))
850 part.addparam('new', enc(new))
851 action = 'update'
851 action = 'update'
852 if not old:
852 if not old:
853 action = 'export'
853 action = 'export'
854 elif not new:
854 elif not new:
855 action = 'delete'
855 action = 'delete'
856 part2book.append((part.id, book, action))
856 part2book.append((part.id, book, action))
857 pushop.pkfailcb[part.id] = handlefailure
857 pushop.pkfailcb[part.id] = handlefailure
858
858
859 def handlereply(op):
859 def handlereply(op):
860 ui = pushop.ui
860 ui = pushop.ui
861 for partid, book, action in part2book:
861 for partid, book, action in part2book:
862 partrep = op.records.getreplies(partid)
862 partrep = op.records.getreplies(partid)
863 results = partrep['pushkey']
863 results = partrep['pushkey']
864 assert len(results) <= 1
864 assert len(results) <= 1
865 if not results:
865 if not results:
866 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
866 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
867 else:
867 else:
868 ret = int(results[0]['return'])
868 ret = int(results[0]['return'])
869 if ret:
869 if ret:
870 ui.status(bookmsgmap[action][0] % book)
870 ui.status(bookmsgmap[action][0] % book)
871 else:
871 else:
872 ui.warn(bookmsgmap[action][1] % book)
872 ui.warn(bookmsgmap[action][1] % book)
873 if pushop.bkresult is not None:
873 if pushop.bkresult is not None:
874 pushop.bkresult = 1
874 pushop.bkresult = 1
875 return handlereply
875 return handlereply
876
876
877 @b2partsgenerator('pushvars', idx=0)
877 @b2partsgenerator('pushvars', idx=0)
878 def _getbundlesendvars(pushop, bundler):
878 def _getbundlesendvars(pushop, bundler):
879 '''send shellvars via bundle2'''
879 '''send shellvars via bundle2'''
880 pushvars = pushop.pushvars
880 pushvars = pushop.pushvars
881 if pushvars:
881 if pushvars:
882 shellvars = {}
882 shellvars = {}
883 for raw in pushvars:
883 for raw in pushvars:
884 if '=' not in raw:
884 if '=' not in raw:
885 msg = ("unable to parse variable '%s', should follow "
885 msg = ("unable to parse variable '%s', should follow "
886 "'KEY=VALUE' or 'KEY=' format")
886 "'KEY=VALUE' or 'KEY=' format")
887 raise error.Abort(msg % raw)
887 raise error.Abort(msg % raw)
888 k, v = raw.split('=', 1)
888 k, v = raw.split('=', 1)
889 shellvars[k] = v
889 shellvars[k] = v
890
890
891 part = bundler.newpart('pushvars')
891 part = bundler.newpart('pushvars')
892
892
893 for key, value in shellvars.iteritems():
893 for key, value in shellvars.iteritems():
894 part.addparam(key, value, mandatory=False)
894 part.addparam(key, value, mandatory=False)
895
895
896 def _pushbundle2(pushop):
896 def _pushbundle2(pushop):
897 """push data to the remote using bundle2
897 """push data to the remote using bundle2
898
898
899 The only currently supported type of data is changegroup but this will
899 The only currently supported type of data is changegroup but this will
900 evolve in the future."""
900 evolve in the future."""
901 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
901 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
902 pushback = (pushop.trmanager
902 pushback = (pushop.trmanager
903 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
903 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
904
904
905 # create reply capability
905 # create reply capability
906 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
906 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
907 allowpushback=pushback))
907 allowpushback=pushback))
908 bundler.newpart('replycaps', data=capsblob)
908 bundler.newpart('replycaps', data=capsblob)
909 replyhandlers = []
909 replyhandlers = []
910 for partgenname in b2partsgenorder:
910 for partgenname in b2partsgenorder:
911 partgen = b2partsgenmapping[partgenname]
911 partgen = b2partsgenmapping[partgenname]
912 ret = partgen(pushop, bundler)
912 ret = partgen(pushop, bundler)
913 if callable(ret):
913 if callable(ret):
914 replyhandlers.append(ret)
914 replyhandlers.append(ret)
915 # do not push if nothing to push
915 # do not push if nothing to push
916 if bundler.nbparts <= 1:
916 if bundler.nbparts <= 1:
917 return
917 return
918 stream = util.chunkbuffer(bundler.getchunks())
918 stream = util.chunkbuffer(bundler.getchunks())
919 try:
919 try:
920 try:
920 try:
921 reply = pushop.remote.unbundle(
921 reply = pushop.remote.unbundle(
922 stream, ['force'], pushop.remote.url())
922 stream, ['force'], pushop.remote.url())
923 except error.BundleValueError as exc:
923 except error.BundleValueError as exc:
924 raise error.Abort(_('missing support for %s') % exc)
924 raise error.Abort(_('missing support for %s') % exc)
925 try:
925 try:
926 trgetter = None
926 trgetter = None
927 if pushback:
927 if pushback:
928 trgetter = pushop.trmanager.transaction
928 trgetter = pushop.trmanager.transaction
929 op = bundle2.processbundle(pushop.repo, reply, trgetter)
929 op = bundle2.processbundle(pushop.repo, reply, trgetter)
930 except error.BundleValueError as exc:
930 except error.BundleValueError as exc:
931 raise error.Abort(_('missing support for %s') % exc)
931 raise error.Abort(_('missing support for %s') % exc)
932 except bundle2.AbortFromPart as exc:
932 except bundle2.AbortFromPart as exc:
933 pushop.ui.status(_('remote: %s\n') % exc)
933 pushop.ui.status(_('remote: %s\n') % exc)
934 if exc.hint is not None:
934 if exc.hint is not None:
935 pushop.ui.status(_('remote: %s\n') % ('(%s)' % exc.hint))
935 pushop.ui.status(_('remote: %s\n') % ('(%s)' % exc.hint))
936 raise error.Abort(_('push failed on remote'))
936 raise error.Abort(_('push failed on remote'))
937 except error.PushkeyFailed as exc:
937 except error.PushkeyFailed as exc:
938 partid = int(exc.partid)
938 partid = int(exc.partid)
939 if partid not in pushop.pkfailcb:
939 if partid not in pushop.pkfailcb:
940 raise
940 raise
941 pushop.pkfailcb[partid](pushop, exc)
941 pushop.pkfailcb[partid](pushop, exc)
942 for rephand in replyhandlers:
942 for rephand in replyhandlers:
943 rephand(op)
943 rephand(op)
944
944
945 def _pushchangeset(pushop):
945 def _pushchangeset(pushop):
946 """Make the actual push of changeset bundle to remote repo"""
946 """Make the actual push of changeset bundle to remote repo"""
947 if 'changesets' in pushop.stepsdone:
947 if 'changesets' in pushop.stepsdone:
948 return
948 return
949 pushop.stepsdone.add('changesets')
949 pushop.stepsdone.add('changesets')
950 if not _pushcheckoutgoing(pushop):
950 if not _pushcheckoutgoing(pushop):
951 return
951 return
952
952
953 # Should have verified this in push().
953 # Should have verified this in push().
954 assert pushop.remote.capable('unbundle')
954 assert pushop.remote.capable('unbundle')
955
955
956 pushop.repo.prepushoutgoinghooks(pushop)
956 pushop.repo.prepushoutgoinghooks(pushop)
957 outgoing = pushop.outgoing
957 outgoing = pushop.outgoing
958 # TODO: get bundlecaps from remote
958 # TODO: get bundlecaps from remote
959 bundlecaps = None
959 bundlecaps = None
960 # create a changegroup from local
960 # create a changegroup from local
961 if pushop.revs is None and not (outgoing.excluded
961 if pushop.revs is None and not (outgoing.excluded
962 or pushop.repo.changelog.filteredrevs):
962 or pushop.repo.changelog.filteredrevs):
963 # push everything,
963 # push everything,
964 # use the fast path, no race possible on push
964 # use the fast path, no race possible on push
965 cg = changegroup.makechangegroup(pushop.repo, outgoing, '01', 'push',
965 cg = changegroup.makechangegroup(pushop.repo, outgoing, '01', 'push',
966 fastpath=True, bundlecaps=bundlecaps)
966 fastpath=True, bundlecaps=bundlecaps)
967 else:
967 else:
968 cg = changegroup.makechangegroup(pushop.repo, outgoing, '01',
968 cg = changegroup.makechangegroup(pushop.repo, outgoing, '01',
969 'push', bundlecaps=bundlecaps)
969 'push', bundlecaps=bundlecaps)
970
970
971 # apply changegroup to remote
971 # apply changegroup to remote
972 # local repo finds heads on server, finds out what
972 # local repo finds heads on server, finds out what
973 # revs it must push. once revs transferred, if server
973 # revs it must push. once revs transferred, if server
974 # finds it has different heads (someone else won
974 # finds it has different heads (someone else won
975 # commit/push race), server aborts.
975 # commit/push race), server aborts.
976 if pushop.force:
976 if pushop.force:
977 remoteheads = ['force']
977 remoteheads = ['force']
978 else:
978 else:
979 remoteheads = pushop.remoteheads
979 remoteheads = pushop.remoteheads
980 # ssh: return remote's addchangegroup()
980 # ssh: return remote's addchangegroup()
981 # http: return remote's addchangegroup() or 0 for error
981 # http: return remote's addchangegroup() or 0 for error
982 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
982 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
983 pushop.repo.url())
983 pushop.repo.url())
984
984
985 def _pushsyncphase(pushop):
985 def _pushsyncphase(pushop):
986 """synchronise phase information locally and remotely"""
986 """synchronise phase information locally and remotely"""
987 cheads = pushop.commonheads
987 cheads = pushop.commonheads
988 # even when we don't push, exchanging phase data is useful
988 # even when we don't push, exchanging phase data is useful
989 remotephases = pushop.remote.listkeys('phases')
989 remotephases = pushop.remote.listkeys('phases')
990 if (pushop.ui.configbool('ui', '_usedassubrepo')
990 if (pushop.ui.configbool('ui', '_usedassubrepo')
991 and remotephases # server supports phases
991 and remotephases # server supports phases
992 and pushop.cgresult is None # nothing was pushed
992 and pushop.cgresult is None # nothing was pushed
993 and remotephases.get('publishing', False)):
993 and remotephases.get('publishing', False)):
994 # When:
994 # When:
995 # - this is a subrepo push
995 # - this is a subrepo push
996 # - and remote support phase
996 # - and remote support phase
997 # - and no changeset was pushed
997 # - and no changeset was pushed
998 # - and remote is publishing
998 # - and remote is publishing
999 # We may be in issue 3871 case!
999 # We may be in issue 3871 case!
1000 # We drop the possible phase synchronisation done by
1000 # We drop the possible phase synchronisation done by
1001 # courtesy to publish changesets possibly locally draft
1001 # courtesy to publish changesets possibly locally draft
1002 # on the remote.
1002 # on the remote.
1003 remotephases = {'publishing': 'True'}
1003 remotephases = {'publishing': 'True'}
1004 if not remotephases: # old server or public only reply from non-publishing
1004 if not remotephases: # old server or public only reply from non-publishing
1005 _localphasemove(pushop, cheads)
1005 _localphasemove(pushop, cheads)
1006 # don't push any phase data as there is nothing to push
1006 # don't push any phase data as there is nothing to push
1007 else:
1007 else:
1008 ana = phases.analyzeremotephases(pushop.repo, cheads,
1008 ana = phases.analyzeremotephases(pushop.repo, cheads,
1009 remotephases)
1009 remotephases)
1010 pheads, droots = ana
1010 pheads, droots = ana
1011 ### Apply remote phase on local
1011 ### Apply remote phase on local
1012 if remotephases.get('publishing', False):
1012 if remotephases.get('publishing', False):
1013 _localphasemove(pushop, cheads)
1013 _localphasemove(pushop, cheads)
1014 else: # publish = False
1014 else: # publish = False
1015 _localphasemove(pushop, pheads)
1015 _localphasemove(pushop, pheads)
1016 _localphasemove(pushop, cheads, phases.draft)
1016 _localphasemove(pushop, cheads, phases.draft)
1017 ### Apply local phase on remote
1017 ### Apply local phase on remote
1018
1018
1019 if pushop.cgresult:
1019 if pushop.cgresult:
1020 if 'phases' in pushop.stepsdone:
1020 if 'phases' in pushop.stepsdone:
1021 # phases already pushed though bundle2
1021 # phases already pushed though bundle2
1022 return
1022 return
1023 outdated = pushop.outdatedphases
1023 outdated = pushop.outdatedphases
1024 else:
1024 else:
1025 outdated = pushop.fallbackoutdatedphases
1025 outdated = pushop.fallbackoutdatedphases
1026
1026
1027 pushop.stepsdone.add('phases')
1027 pushop.stepsdone.add('phases')
1028
1028
1029 # filter heads already turned public by the push
1029 # filter heads already turned public by the push
1030 outdated = [c for c in outdated if c.node() not in pheads]
1030 outdated = [c for c in outdated if c.node() not in pheads]
1031 # fallback to independent pushkey command
1031 # fallback to independent pushkey command
1032 for newremotehead in outdated:
1032 for newremotehead in outdated:
1033 r = pushop.remote.pushkey('phases',
1033 r = pushop.remote.pushkey('phases',
1034 newremotehead.hex(),
1034 newremotehead.hex(),
1035 str(phases.draft),
1035 str(phases.draft),
1036 str(phases.public))
1036 str(phases.public))
1037 if not r:
1037 if not r:
1038 pushop.ui.warn(_('updating %s to public failed!\n')
1038 pushop.ui.warn(_('updating %s to public failed!\n')
1039 % newremotehead)
1039 % newremotehead)
1040
1040
1041 def _localphasemove(pushop, nodes, phase=phases.public):
1041 def _localphasemove(pushop, nodes, phase=phases.public):
1042 """move <nodes> to <phase> in the local source repo"""
1042 """move <nodes> to <phase> in the local source repo"""
1043 if pushop.trmanager:
1043 if pushop.trmanager:
1044 phases.advanceboundary(pushop.repo,
1044 phases.advanceboundary(pushop.repo,
1045 pushop.trmanager.transaction(),
1045 pushop.trmanager.transaction(),
1046 phase,
1046 phase,
1047 nodes)
1047 nodes)
1048 else:
1048 else:
1049 # repo is not locked, do not change any phases!
1049 # repo is not locked, do not change any phases!
1050 # Informs the user that phases should have been moved when
1050 # Informs the user that phases should have been moved when
1051 # applicable.
1051 # applicable.
1052 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
1052 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
1053 phasestr = phases.phasenames[phase]
1053 phasestr = phases.phasenames[phase]
1054 if actualmoves:
1054 if actualmoves:
1055 pushop.ui.status(_('cannot lock source repo, skipping '
1055 pushop.ui.status(_('cannot lock source repo, skipping '
1056 'local %s phase update\n') % phasestr)
1056 'local %s phase update\n') % phasestr)
1057
1057
1058 def _pushobsolete(pushop):
1058 def _pushobsolete(pushop):
1059 """utility function to push obsolete markers to a remote"""
1059 """utility function to push obsolete markers to a remote"""
1060 if 'obsmarkers' in pushop.stepsdone:
1060 if 'obsmarkers' in pushop.stepsdone:
1061 return
1061 return
1062 repo = pushop.repo
1062 repo = pushop.repo
1063 remote = pushop.remote
1063 remote = pushop.remote
1064 pushop.stepsdone.add('obsmarkers')
1064 pushop.stepsdone.add('obsmarkers')
1065 if pushop.outobsmarkers:
1065 if pushop.outobsmarkers:
1066 pushop.ui.debug('try to push obsolete markers to remote\n')
1066 pushop.ui.debug('try to push obsolete markers to remote\n')
1067 rslts = []
1067 rslts = []
1068 remotedata = obsolete._pushkeyescape(sorted(pushop.outobsmarkers))
1068 remotedata = obsolete._pushkeyescape(sorted(pushop.outobsmarkers))
1069 for key in sorted(remotedata, reverse=True):
1069 for key in sorted(remotedata, reverse=True):
1070 # reverse sort to ensure we end with dump0
1070 # reverse sort to ensure we end with dump0
1071 data = remotedata[key]
1071 data = remotedata[key]
1072 rslts.append(remote.pushkey('obsolete', key, '', data))
1072 rslts.append(remote.pushkey('obsolete', key, '', data))
1073 if [r for r in rslts if not r]:
1073 if [r for r in rslts if not r]:
1074 msg = _('failed to push some obsolete markers!\n')
1074 msg = _('failed to push some obsolete markers!\n')
1075 repo.ui.warn(msg)
1075 repo.ui.warn(msg)
1076
1076
1077 def _pushbookmark(pushop):
1077 def _pushbookmark(pushop):
1078 """Update bookmark position on remote"""
1078 """Update bookmark position on remote"""
1079 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
1079 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
1080 return
1080 return
1081 pushop.stepsdone.add('bookmarks')
1081 pushop.stepsdone.add('bookmarks')
1082 ui = pushop.ui
1082 ui = pushop.ui
1083 remote = pushop.remote
1083 remote = pushop.remote
1084
1084
1085 for b, old, new in pushop.outbookmarks:
1085 for b, old, new in pushop.outbookmarks:
1086 action = 'update'
1086 action = 'update'
1087 if not old:
1087 if not old:
1088 action = 'export'
1088 action = 'export'
1089 elif not new:
1089 elif not new:
1090 action = 'delete'
1090 action = 'delete'
1091 if remote.pushkey('bookmarks', b, old, new):
1091 if remote.pushkey('bookmarks', b, old, new):
1092 ui.status(bookmsgmap[action][0] % b)
1092 ui.status(bookmsgmap[action][0] % b)
1093 else:
1093 else:
1094 ui.warn(bookmsgmap[action][1] % b)
1094 ui.warn(bookmsgmap[action][1] % b)
1095 # discovery can have set the value form invalid entry
1095 # discovery can have set the value form invalid entry
1096 if pushop.bkresult is not None:
1096 if pushop.bkresult is not None:
1097 pushop.bkresult = 1
1097 pushop.bkresult = 1
1098
1098
1099 class pulloperation(object):
1099 class pulloperation(object):
1100 """A object that represent a single pull operation
1100 """A object that represent a single pull operation
1101
1101
1102 It purpose is to carry pull related state and very common operation.
1102 It purpose is to carry pull related state and very common operation.
1103
1103
1104 A new should be created at the beginning of each pull and discarded
1104 A new should be created at the beginning of each pull and discarded
1105 afterward.
1105 afterward.
1106 """
1106 """
1107
1107
1108 def __init__(self, repo, remote, heads=None, force=False, bookmarks=(),
1108 def __init__(self, repo, remote, heads=None, force=False, bookmarks=(),
1109 remotebookmarks=None, streamclonerequested=None):
1109 remotebookmarks=None, streamclonerequested=None):
1110 # repo we pull into
1110 # repo we pull into
1111 self.repo = repo
1111 self.repo = repo
1112 # repo we pull from
1112 # repo we pull from
1113 self.remote = remote
1113 self.remote = remote
1114 # revision we try to pull (None is "all")
1114 # revision we try to pull (None is "all")
1115 self.heads = heads
1115 self.heads = heads
1116 # bookmark pulled explicitly
1116 # bookmark pulled explicitly
1117 self.explicitbookmarks = [repo._bookmarks.expandname(bookmark)
1117 self.explicitbookmarks = [repo._bookmarks.expandname(bookmark)
1118 for bookmark in bookmarks]
1118 for bookmark in bookmarks]
1119 # do we force pull?
1119 # do we force pull?
1120 self.force = force
1120 self.force = force
1121 # whether a streaming clone was requested
1121 # whether a streaming clone was requested
1122 self.streamclonerequested = streamclonerequested
1122 self.streamclonerequested = streamclonerequested
1123 # transaction manager
1123 # transaction manager
1124 self.trmanager = None
1124 self.trmanager = None
1125 # set of common changeset between local and remote before pull
1125 # set of common changeset between local and remote before pull
1126 self.common = None
1126 self.common = None
1127 # set of pulled head
1127 # set of pulled head
1128 self.rheads = None
1128 self.rheads = None
1129 # list of missing changeset to fetch remotely
1129 # list of missing changeset to fetch remotely
1130 self.fetch = None
1130 self.fetch = None
1131 # remote bookmarks data
1131 # remote bookmarks data
1132 self.remotebookmarks = remotebookmarks
1132 self.remotebookmarks = remotebookmarks
1133 # result of changegroup pulling (used as return code by pull)
1133 # result of changegroup pulling (used as return code by pull)
1134 self.cgresult = None
1134 self.cgresult = None
1135 # list of step already done
1135 # list of step already done
1136 self.stepsdone = set()
1136 self.stepsdone = set()
1137 # Whether we attempted a clone from pre-generated bundles.
1137 # Whether we attempted a clone from pre-generated bundles.
1138 self.clonebundleattempted = False
1138 self.clonebundleattempted = False
1139
1139
1140 @util.propertycache
1140 @util.propertycache
1141 def pulledsubset(self):
1141 def pulledsubset(self):
1142 """heads of the set of changeset target by the pull"""
1142 """heads of the set of changeset target by the pull"""
1143 # compute target subset
1143 # compute target subset
1144 if self.heads is None:
1144 if self.heads is None:
1145 # We pulled every thing possible
1145 # We pulled every thing possible
1146 # sync on everything common
1146 # sync on everything common
1147 c = set(self.common)
1147 c = set(self.common)
1148 ret = list(self.common)
1148 ret = list(self.common)
1149 for n in self.rheads:
1149 for n in self.rheads:
1150 if n not in c:
1150 if n not in c:
1151 ret.append(n)
1151 ret.append(n)
1152 return ret
1152 return ret
1153 else:
1153 else:
1154 # We pulled a specific subset
1154 # We pulled a specific subset
1155 # sync on this subset
1155 # sync on this subset
1156 return self.heads
1156 return self.heads
1157
1157
1158 @util.propertycache
1158 @util.propertycache
1159 def canusebundle2(self):
1159 def canusebundle2(self):
1160 return not _forcebundle1(self)
1160 return not _forcebundle1(self)
1161
1161
1162 @util.propertycache
1162 @util.propertycache
1163 def remotebundle2caps(self):
1163 def remotebundle2caps(self):
1164 return bundle2.bundle2caps(self.remote)
1164 return bundle2.bundle2caps(self.remote)
1165
1165
1166 def gettransaction(self):
1166 def gettransaction(self):
1167 # deprecated; talk to trmanager directly
1167 # deprecated; talk to trmanager directly
1168 return self.trmanager.transaction()
1168 return self.trmanager.transaction()
1169
1169
1170 class transactionmanager(util.transactional):
1170 class transactionmanager(util.transactional):
1171 """An object to manage the life cycle of a transaction
1171 """An object to manage the life cycle of a transaction
1172
1172
1173 It creates the transaction on demand and calls the appropriate hooks when
1173 It creates the transaction on demand and calls the appropriate hooks when
1174 closing the transaction."""
1174 closing the transaction."""
1175 def __init__(self, repo, source, url):
1175 def __init__(self, repo, source, url):
1176 self.repo = repo
1176 self.repo = repo
1177 self.source = source
1177 self.source = source
1178 self.url = url
1178 self.url = url
1179 self._tr = None
1179 self._tr = None
1180
1180
1181 def transaction(self):
1181 def transaction(self):
1182 """Return an open transaction object, constructing if necessary"""
1182 """Return an open transaction object, constructing if necessary"""
1183 if not self._tr:
1183 if not self._tr:
1184 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
1184 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
1185 self._tr = self.repo.transaction(trname)
1185 self._tr = self.repo.transaction(trname)
1186 self._tr.hookargs['source'] = self.source
1186 self._tr.hookargs['source'] = self.source
1187 self._tr.hookargs['url'] = self.url
1187 self._tr.hookargs['url'] = self.url
1188 return self._tr
1188 return self._tr
1189
1189
1190 def close(self):
1190 def close(self):
1191 """close transaction if created"""
1191 """close transaction if created"""
1192 if self._tr is not None:
1192 if self._tr is not None:
1193 self._tr.close()
1193 self._tr.close()
1194
1194
1195 def release(self):
1195 def release(self):
1196 """release transaction if created"""
1196 """release transaction if created"""
1197 if self._tr is not None:
1197 if self._tr is not None:
1198 self._tr.release()
1198 self._tr.release()
1199
1199
1200 def pull(repo, remote, heads=None, force=False, bookmarks=(), opargs=None,
1200 def pull(repo, remote, heads=None, force=False, bookmarks=(), opargs=None,
1201 streamclonerequested=None):
1201 streamclonerequested=None):
1202 """Fetch repository data from a remote.
1202 """Fetch repository data from a remote.
1203
1203
1204 This is the main function used to retrieve data from a remote repository.
1204 This is the main function used to retrieve data from a remote repository.
1205
1205
1206 ``repo`` is the local repository to clone into.
1206 ``repo`` is the local repository to clone into.
1207 ``remote`` is a peer instance.
1207 ``remote`` is a peer instance.
1208 ``heads`` is an iterable of revisions we want to pull. ``None`` (the
1208 ``heads`` is an iterable of revisions we want to pull. ``None`` (the
1209 default) means to pull everything from the remote.
1209 default) means to pull everything from the remote.
1210 ``bookmarks`` is an iterable of bookmarks requesting to be pulled. By
1210 ``bookmarks`` is an iterable of bookmarks requesting to be pulled. By
1211 default, all remote bookmarks are pulled.
1211 default, all remote bookmarks are pulled.
1212 ``opargs`` are additional keyword arguments to pass to ``pulloperation``
1212 ``opargs`` are additional keyword arguments to pass to ``pulloperation``
1213 initialization.
1213 initialization.
1214 ``streamclonerequested`` is a boolean indicating whether a "streaming
1214 ``streamclonerequested`` is a boolean indicating whether a "streaming
1215 clone" is requested. A "streaming clone" is essentially a raw file copy
1215 clone" is requested. A "streaming clone" is essentially a raw file copy
1216 of revlogs from the server. This only works when the local repository is
1216 of revlogs from the server. This only works when the local repository is
1217 empty. The default value of ``None`` means to respect the server
1217 empty. The default value of ``None`` means to respect the server
1218 configuration for preferring stream clones.
1218 configuration for preferring stream clones.
1219
1219
1220 Returns the ``pulloperation`` created for this pull.
1220 Returns the ``pulloperation`` created for this pull.
1221 """
1221 """
1222 if opargs is None:
1222 if opargs is None:
1223 opargs = {}
1223 opargs = {}
1224 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks,
1224 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks,
1225 streamclonerequested=streamclonerequested, **opargs)
1225 streamclonerequested=streamclonerequested, **opargs)
1226
1226
1227 peerlocal = pullop.remote.local()
1227 peerlocal = pullop.remote.local()
1228 if peerlocal:
1228 if peerlocal:
1229 missing = set(peerlocal.requirements) - pullop.repo.supported
1229 missing = set(peerlocal.requirements) - pullop.repo.supported
1230 if missing:
1230 if missing:
1231 msg = _("required features are not"
1231 msg = _("required features are not"
1232 " supported in the destination:"
1232 " supported in the destination:"
1233 " %s") % (', '.join(sorted(missing)))
1233 " %s") % (', '.join(sorted(missing)))
1234 raise error.Abort(msg)
1234 raise error.Abort(msg)
1235
1235
1236 wlock = lock = None
1236 wlock = lock = None
1237 try:
1237 try:
1238 wlock = pullop.repo.wlock()
1238 wlock = pullop.repo.wlock()
1239 lock = pullop.repo.lock()
1239 lock = pullop.repo.lock()
1240 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
1240 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
1241 streamclone.maybeperformlegacystreamclone(pullop)
1241 streamclone.maybeperformlegacystreamclone(pullop)
1242 # This should ideally be in _pullbundle2(). However, it needs to run
1242 # This should ideally be in _pullbundle2(). However, it needs to run
1243 # before discovery to avoid extra work.
1243 # before discovery to avoid extra work.
1244 _maybeapplyclonebundle(pullop)
1244 _maybeapplyclonebundle(pullop)
1245 _pulldiscovery(pullop)
1245 _pulldiscovery(pullop)
1246 if pullop.canusebundle2:
1246 if pullop.canusebundle2:
1247 _pullbundle2(pullop)
1247 _pullbundle2(pullop)
1248 _pullchangeset(pullop)
1248 _pullchangeset(pullop)
1249 _pullphase(pullop)
1249 _pullphase(pullop)
1250 _pullbookmarks(pullop)
1250 _pullbookmarks(pullop)
1251 _pullobsolete(pullop)
1251 _pullobsolete(pullop)
1252 pullop.trmanager.close()
1252 pullop.trmanager.close()
1253 finally:
1253 finally:
1254 lockmod.release(pullop.trmanager, lock, wlock)
1254 lockmod.release(pullop.trmanager, lock, wlock)
1255
1255
1256 return pullop
1256 return pullop
1257
1257
1258 # list of steps to perform discovery before pull
1258 # list of steps to perform discovery before pull
1259 pulldiscoveryorder = []
1259 pulldiscoveryorder = []
1260
1260
1261 # Mapping between step name and function
1261 # Mapping between step name and function
1262 #
1262 #
1263 # This exists to help extensions wrap steps if necessary
1263 # This exists to help extensions wrap steps if necessary
1264 pulldiscoverymapping = {}
1264 pulldiscoverymapping = {}
1265
1265
1266 def pulldiscovery(stepname):
1266 def pulldiscovery(stepname):
1267 """decorator for function performing discovery before pull
1267 """decorator for function performing discovery before pull
1268
1268
1269 The function is added to the step -> function mapping and appended to the
1269 The function is added to the step -> function mapping and appended to the
1270 list of steps. Beware that decorated function will be added in order (this
1270 list of steps. Beware that decorated function will be added in order (this
1271 may matter).
1271 may matter).
1272
1272
1273 You can only use this decorator for a new step, if you want to wrap a step
1273 You can only use this decorator for a new step, if you want to wrap a step
1274 from an extension, change the pulldiscovery dictionary directly."""
1274 from an extension, change the pulldiscovery dictionary directly."""
1275 def dec(func):
1275 def dec(func):
1276 assert stepname not in pulldiscoverymapping
1276 assert stepname not in pulldiscoverymapping
1277 pulldiscoverymapping[stepname] = func
1277 pulldiscoverymapping[stepname] = func
1278 pulldiscoveryorder.append(stepname)
1278 pulldiscoveryorder.append(stepname)
1279 return func
1279 return func
1280 return dec
1280 return dec
1281
1281
1282 def _pulldiscovery(pullop):
1282 def _pulldiscovery(pullop):
1283 """Run all discovery steps"""
1283 """Run all discovery steps"""
1284 for stepname in pulldiscoveryorder:
1284 for stepname in pulldiscoveryorder:
1285 step = pulldiscoverymapping[stepname]
1285 step = pulldiscoverymapping[stepname]
1286 step(pullop)
1286 step(pullop)
1287
1287
1288 @pulldiscovery('b1:bookmarks')
1288 @pulldiscovery('b1:bookmarks')
1289 def _pullbookmarkbundle1(pullop):
1289 def _pullbookmarkbundle1(pullop):
1290 """fetch bookmark data in bundle1 case
1290 """fetch bookmark data in bundle1 case
1291
1291
1292 If not using bundle2, we have to fetch bookmarks before changeset
1292 If not using bundle2, we have to fetch bookmarks before changeset
1293 discovery to reduce the chance and impact of race conditions."""
1293 discovery to reduce the chance and impact of race conditions."""
1294 if pullop.remotebookmarks is not None:
1294 if pullop.remotebookmarks is not None:
1295 return
1295 return
1296 if pullop.canusebundle2 and 'listkeys' in pullop.remotebundle2caps:
1296 if pullop.canusebundle2 and 'listkeys' in pullop.remotebundle2caps:
1297 # all known bundle2 servers now support listkeys, but lets be nice with
1297 # all known bundle2 servers now support listkeys, but lets be nice with
1298 # new implementation.
1298 # new implementation.
1299 return
1299 return
1300 pullop.remotebookmarks = pullop.remote.listkeys('bookmarks')
1300 pullop.remotebookmarks = pullop.remote.listkeys('bookmarks')
1301
1301
1302
1302
1303 @pulldiscovery('changegroup')
1303 @pulldiscovery('changegroup')
1304 def _pulldiscoverychangegroup(pullop):
1304 def _pulldiscoverychangegroup(pullop):
1305 """discovery phase for the pull
1305 """discovery phase for the pull
1306
1306
1307 Current handle changeset discovery only, will change handle all discovery
1307 Current handle changeset discovery only, will change handle all discovery
1308 at some point."""
1308 at some point."""
1309 tmp = discovery.findcommonincoming(pullop.repo,
1309 tmp = discovery.findcommonincoming(pullop.repo,
1310 pullop.remote,
1310 pullop.remote,
1311 heads=pullop.heads,
1311 heads=pullop.heads,
1312 force=pullop.force)
1312 force=pullop.force)
1313 common, fetch, rheads = tmp
1313 common, fetch, rheads = tmp
1314 nm = pullop.repo.unfiltered().changelog.nodemap
1314 nm = pullop.repo.unfiltered().changelog.nodemap
1315 if fetch and rheads:
1315 if fetch and rheads:
1316 # If a remote heads in filtered locally, lets drop it from the unknown
1316 # If a remote heads in filtered locally, lets drop it from the unknown
1317 # remote heads and put in back in common.
1317 # remote heads and put in back in common.
1318 #
1318 #
1319 # This is a hackish solution to catch most of "common but locally
1319 # This is a hackish solution to catch most of "common but locally
1320 # hidden situation". We do not performs discovery on unfiltered
1320 # hidden situation". We do not performs discovery on unfiltered
1321 # repository because it end up doing a pathological amount of round
1321 # repository because it end up doing a pathological amount of round
1322 # trip for w huge amount of changeset we do not care about.
1322 # trip for w huge amount of changeset we do not care about.
1323 #
1323 #
1324 # If a set of such "common but filtered" changeset exist on the server
1324 # If a set of such "common but filtered" changeset exist on the server
1325 # but are not including a remote heads, we'll not be able to detect it,
1325 # but are not including a remote heads, we'll not be able to detect it,
1326 scommon = set(common)
1326 scommon = set(common)
1327 filteredrheads = []
1327 filteredrheads = []
1328 for n in rheads:
1328 for n in rheads:
1329 if n in nm:
1329 if n in nm:
1330 if n not in scommon:
1330 if n not in scommon:
1331 common.append(n)
1331 common.append(n)
1332 else:
1332 else:
1333 filteredrheads.append(n)
1333 filteredrheads.append(n)
1334 if not filteredrheads:
1334 if not filteredrheads:
1335 fetch = []
1335 fetch = []
1336 rheads = filteredrheads
1336 rheads = filteredrheads
1337 pullop.common = common
1337 pullop.common = common
1338 pullop.fetch = fetch
1338 pullop.fetch = fetch
1339 pullop.rheads = rheads
1339 pullop.rheads = rheads
1340
1340
1341 def _pullbundle2(pullop):
1341 def _pullbundle2(pullop):
1342 """pull data using bundle2
1342 """pull data using bundle2
1343
1343
1344 For now, the only supported data are changegroup."""
1344 For now, the only supported data are changegroup."""
1345 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
1345 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
1346
1346
1347 # At the moment we don't do stream clones over bundle2. If that is
1347 # At the moment we don't do stream clones over bundle2. If that is
1348 # implemented then here's where the check for that will go.
1348 # implemented then here's where the check for that will go.
1349 streaming = False
1349 streaming = False
1350
1350
1351 # pulling changegroup
1351 # pulling changegroup
1352 pullop.stepsdone.add('changegroup')
1352 pullop.stepsdone.add('changegroup')
1353
1353
1354 kwargs['common'] = pullop.common
1354 kwargs['common'] = pullop.common
1355 kwargs['heads'] = pullop.heads or pullop.rheads
1355 kwargs['heads'] = pullop.heads or pullop.rheads
1356 kwargs['cg'] = pullop.fetch
1356 kwargs['cg'] = pullop.fetch
1357 if 'listkeys' in pullop.remotebundle2caps:
1357 if 'listkeys' in pullop.remotebundle2caps:
1358 kwargs['listkeys'] = ['phases']
1358 kwargs['listkeys'] = ['phases']
1359 if pullop.remotebookmarks is None:
1359 if pullop.remotebookmarks is None:
1360 # make sure to always includes bookmark data when migrating
1360 # make sure to always includes bookmark data when migrating
1361 # `hg incoming --bundle` to using this function.
1361 # `hg incoming --bundle` to using this function.
1362 kwargs['listkeys'].append('bookmarks')
1362 kwargs['listkeys'].append('bookmarks')
1363
1363
1364 # If this is a full pull / clone and the server supports the clone bundles
1364 # If this is a full pull / clone and the server supports the clone bundles
1365 # feature, tell the server whether we attempted a clone bundle. The
1365 # feature, tell the server whether we attempted a clone bundle. The
1366 # presence of this flag indicates the client supports clone bundles. This
1366 # presence of this flag indicates the client supports clone bundles. This
1367 # will enable the server to treat clients that support clone bundles
1367 # will enable the server to treat clients that support clone bundles
1368 # differently from those that don't.
1368 # differently from those that don't.
1369 if (pullop.remote.capable('clonebundles')
1369 if (pullop.remote.capable('clonebundles')
1370 and pullop.heads is None and list(pullop.common) == [nullid]):
1370 and pullop.heads is None and list(pullop.common) == [nullid]):
1371 kwargs['cbattempted'] = pullop.clonebundleattempted
1371 kwargs['cbattempted'] = pullop.clonebundleattempted
1372
1372
1373 if streaming:
1373 if streaming:
1374 pullop.repo.ui.status(_('streaming all changes\n'))
1374 pullop.repo.ui.status(_('streaming all changes\n'))
1375 elif not pullop.fetch:
1375 elif not pullop.fetch:
1376 pullop.repo.ui.status(_("no changes found\n"))
1376 pullop.repo.ui.status(_("no changes found\n"))
1377 pullop.cgresult = 0
1377 pullop.cgresult = 0
1378 else:
1378 else:
1379 if pullop.heads is None and list(pullop.common) == [nullid]:
1379 if pullop.heads is None and list(pullop.common) == [nullid]:
1380 pullop.repo.ui.status(_("requesting all changes\n"))
1380 pullop.repo.ui.status(_("requesting all changes\n"))
1381 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1381 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1382 remoteversions = bundle2.obsmarkersversion(pullop.remotebundle2caps)
1382 remoteversions = bundle2.obsmarkersversion(pullop.remotebundle2caps)
1383 if obsolete.commonversion(remoteversions) is not None:
1383 if obsolete.commonversion(remoteversions) is not None:
1384 kwargs['obsmarkers'] = True
1384 kwargs['obsmarkers'] = True
1385 pullop.stepsdone.add('obsmarkers')
1385 pullop.stepsdone.add('obsmarkers')
1386 _pullbundle2extraprepare(pullop, kwargs)
1386 _pullbundle2extraprepare(pullop, kwargs)
1387 bundle = pullop.remote.getbundle('pull', **pycompat.strkwargs(kwargs))
1387 bundle = pullop.remote.getbundle('pull', **pycompat.strkwargs(kwargs))
1388 try:
1388 try:
1389 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
1389 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
1390 except bundle2.AbortFromPart as exc:
1390 except bundle2.AbortFromPart as exc:
1391 pullop.repo.ui.status(_('remote: abort: %s\n') % exc)
1391 pullop.repo.ui.status(_('remote: abort: %s\n') % exc)
1392 raise error.Abort(_('pull failed on remote'), hint=exc.hint)
1392 raise error.Abort(_('pull failed on remote'), hint=exc.hint)
1393 except error.BundleValueError as exc:
1393 except error.BundleValueError as exc:
1394 raise error.Abort(_('missing support for %s') % exc)
1394 raise error.Abort(_('missing support for %s') % exc)
1395
1395
1396 if pullop.fetch:
1396 if pullop.fetch:
1397 pullop.cgresult = bundle2.combinechangegroupresults(op)
1397 pullop.cgresult = bundle2.combinechangegroupresults(op)
1398
1398
1399 # If the bundle had a phase-heads part, then phase exchange is already done
1399 # If the bundle had a phase-heads part, then phase exchange is already done
1400 if op.records['phase-heads']:
1400 if op.records['phase-heads']:
1401 pullop.stepsdone.add('phases')
1401 pullop.stepsdone.add('phases')
1402
1402
1403 # processing phases change
1403 # processing phases change
1404 for namespace, value in op.records['listkeys']:
1404 for namespace, value in op.records['listkeys']:
1405 if namespace == 'phases':
1405 if namespace == 'phases':
1406 _pullapplyphases(pullop, value)
1406 _pullapplyphases(pullop, value)
1407
1407
1408 # processing bookmark update
1408 # processing bookmark update
1409 for namespace, value in op.records['listkeys']:
1409 for namespace, value in op.records['listkeys']:
1410 if namespace == 'bookmarks':
1410 if namespace == 'bookmarks':
1411 pullop.remotebookmarks = value
1411 pullop.remotebookmarks = value
1412
1412
1413 # bookmark data were either already there or pulled in the bundle
1413 # bookmark data were either already there or pulled in the bundle
1414 if pullop.remotebookmarks is not None:
1414 if pullop.remotebookmarks is not None:
1415 _pullbookmarks(pullop)
1415 _pullbookmarks(pullop)
1416
1416
1417 def _pullbundle2extraprepare(pullop, kwargs):
1417 def _pullbundle2extraprepare(pullop, kwargs):
1418 """hook function so that extensions can extend the getbundle call"""
1418 """hook function so that extensions can extend the getbundle call"""
1419 pass
1419 pass
1420
1420
1421 def _pullchangeset(pullop):
1421 def _pullchangeset(pullop):
1422 """pull changeset from unbundle into the local repo"""
1422 """pull changeset from unbundle into the local repo"""
1423 # We delay the open of the transaction as late as possible so we
1423 # We delay the open of the transaction as late as possible so we
1424 # don't open transaction for nothing or you break future useful
1424 # don't open transaction for nothing or you break future useful
1425 # rollback call
1425 # rollback call
1426 if 'changegroup' in pullop.stepsdone:
1426 if 'changegroup' in pullop.stepsdone:
1427 return
1427 return
1428 pullop.stepsdone.add('changegroup')
1428 pullop.stepsdone.add('changegroup')
1429 if not pullop.fetch:
1429 if not pullop.fetch:
1430 pullop.repo.ui.status(_("no changes found\n"))
1430 pullop.repo.ui.status(_("no changes found\n"))
1431 pullop.cgresult = 0
1431 pullop.cgresult = 0
1432 return
1432 return
1433 tr = pullop.gettransaction()
1433 tr = pullop.gettransaction()
1434 if pullop.heads is None and list(pullop.common) == [nullid]:
1434 if pullop.heads is None and list(pullop.common) == [nullid]:
1435 pullop.repo.ui.status(_("requesting all changes\n"))
1435 pullop.repo.ui.status(_("requesting all changes\n"))
1436 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1436 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1437 # issue1320, avoid a race if remote changed after discovery
1437 # issue1320, avoid a race if remote changed after discovery
1438 pullop.heads = pullop.rheads
1438 pullop.heads = pullop.rheads
1439
1439
1440 if pullop.remote.capable('getbundle'):
1440 if pullop.remote.capable('getbundle'):
1441 # TODO: get bundlecaps from remote
1441 # TODO: get bundlecaps from remote
1442 cg = pullop.remote.getbundle('pull', common=pullop.common,
1442 cg = pullop.remote.getbundle('pull', common=pullop.common,
1443 heads=pullop.heads or pullop.rheads)
1443 heads=pullop.heads or pullop.rheads)
1444 elif pullop.heads is None:
1444 elif pullop.heads is None:
1445 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1445 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1446 elif not pullop.remote.capable('changegroupsubset'):
1446 elif not pullop.remote.capable('changegroupsubset'):
1447 raise error.Abort(_("partial pull cannot be done because "
1447 raise error.Abort(_("partial pull cannot be done because "
1448 "other repository doesn't support "
1448 "other repository doesn't support "
1449 "changegroupsubset."))
1449 "changegroupsubset."))
1450 else:
1450 else:
1451 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1451 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1452 bundleop = bundle2.applybundle(pullop.repo, cg, tr, 'pull',
1452 bundleop = bundle2.applybundle(pullop.repo, cg, tr, 'pull',
1453 pullop.remote.url())
1453 pullop.remote.url())
1454 pullop.cgresult = bundle2.combinechangegroupresults(bundleop)
1454 pullop.cgresult = bundle2.combinechangegroupresults(bundleop)
1455
1455
1456 def _pullphase(pullop):
1456 def _pullphase(pullop):
1457 # Get remote phases data from remote
1457 # Get remote phases data from remote
1458 if 'phases' in pullop.stepsdone:
1458 if 'phases' in pullop.stepsdone:
1459 return
1459 return
1460 remotephases = pullop.remote.listkeys('phases')
1460 remotephases = pullop.remote.listkeys('phases')
1461 _pullapplyphases(pullop, remotephases)
1461 _pullapplyphases(pullop, remotephases)
1462
1462
1463 def _pullapplyphases(pullop, remotephases):
1463 def _pullapplyphases(pullop, remotephases):
1464 """apply phase movement from observed remote state"""
1464 """apply phase movement from observed remote state"""
1465 if 'phases' in pullop.stepsdone:
1465 if 'phases' in pullop.stepsdone:
1466 return
1466 return
1467 pullop.stepsdone.add('phases')
1467 pullop.stepsdone.add('phases')
1468 publishing = bool(remotephases.get('publishing', False))
1468 publishing = bool(remotephases.get('publishing', False))
1469 if remotephases and not publishing:
1469 if remotephases and not publishing:
1470 # remote is new and non-publishing
1470 # remote is new and non-publishing
1471 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1471 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1472 pullop.pulledsubset,
1472 pullop.pulledsubset,
1473 remotephases)
1473 remotephases)
1474 dheads = pullop.pulledsubset
1474 dheads = pullop.pulledsubset
1475 else:
1475 else:
1476 # Remote is old or publishing all common changesets
1476 # Remote is old or publishing all common changesets
1477 # should be seen as public
1477 # should be seen as public
1478 pheads = pullop.pulledsubset
1478 pheads = pullop.pulledsubset
1479 dheads = []
1479 dheads = []
1480 unfi = pullop.repo.unfiltered()
1480 unfi = pullop.repo.unfiltered()
1481 phase = unfi._phasecache.phase
1481 phase = unfi._phasecache.phase
1482 rev = unfi.changelog.nodemap.get
1482 rev = unfi.changelog.nodemap.get
1483 public = phases.public
1483 public = phases.public
1484 draft = phases.draft
1484 draft = phases.draft
1485
1485
1486 # exclude changesets already public locally and update the others
1486 # exclude changesets already public locally and update the others
1487 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1487 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1488 if pheads:
1488 if pheads:
1489 tr = pullop.gettransaction()
1489 tr = pullop.gettransaction()
1490 phases.advanceboundary(pullop.repo, tr, public, pheads)
1490 phases.advanceboundary(pullop.repo, tr, public, pheads)
1491
1491
1492 # exclude changesets already draft locally and update the others
1492 # exclude changesets already draft locally and update the others
1493 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1493 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1494 if dheads:
1494 if dheads:
1495 tr = pullop.gettransaction()
1495 tr = pullop.gettransaction()
1496 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1496 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1497
1497
1498 def _pullbookmarks(pullop):
1498 def _pullbookmarks(pullop):
1499 """process the remote bookmark information to update the local one"""
1499 """process the remote bookmark information to update the local one"""
1500 if 'bookmarks' in pullop.stepsdone:
1500 if 'bookmarks' in pullop.stepsdone:
1501 return
1501 return
1502 pullop.stepsdone.add('bookmarks')
1502 pullop.stepsdone.add('bookmarks')
1503 repo = pullop.repo
1503 repo = pullop.repo
1504 remotebookmarks = pullop.remotebookmarks
1504 remotebookmarks = pullop.remotebookmarks
1505 remotebookmarks = bookmod.unhexlifybookmarks(remotebookmarks)
1505 remotebookmarks = bookmod.unhexlifybookmarks(remotebookmarks)
1506 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1506 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1507 pullop.remote.url(),
1507 pullop.remote.url(),
1508 pullop.gettransaction,
1508 pullop.gettransaction,
1509 explicit=pullop.explicitbookmarks)
1509 explicit=pullop.explicitbookmarks)
1510
1510
1511 def _pullobsolete(pullop):
1511 def _pullobsolete(pullop):
1512 """utility function to pull obsolete markers from a remote
1512 """utility function to pull obsolete markers from a remote
1513
1513
1514 The `gettransaction` is function that return the pull transaction, creating
1514 The `gettransaction` is function that return the pull transaction, creating
1515 one if necessary. We return the transaction to inform the calling code that
1515 one if necessary. We return the transaction to inform the calling code that
1516 a new transaction have been created (when applicable).
1516 a new transaction have been created (when applicable).
1517
1517
1518 Exists mostly to allow overriding for experimentation purpose"""
1518 Exists mostly to allow overriding for experimentation purpose"""
1519 if 'obsmarkers' in pullop.stepsdone:
1519 if 'obsmarkers' in pullop.stepsdone:
1520 return
1520 return
1521 pullop.stepsdone.add('obsmarkers')
1521 pullop.stepsdone.add('obsmarkers')
1522 tr = None
1522 tr = None
1523 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1523 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1524 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1524 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1525 remoteobs = pullop.remote.listkeys('obsolete')
1525 remoteobs = pullop.remote.listkeys('obsolete')
1526 if 'dump0' in remoteobs:
1526 if 'dump0' in remoteobs:
1527 tr = pullop.gettransaction()
1527 tr = pullop.gettransaction()
1528 markers = []
1528 markers = []
1529 for key in sorted(remoteobs, reverse=True):
1529 for key in sorted(remoteobs, reverse=True):
1530 if key.startswith('dump'):
1530 if key.startswith('dump'):
1531 data = util.b85decode(remoteobs[key])
1531 data = util.b85decode(remoteobs[key])
1532 version, newmarks = obsolete._readmarkers(data)
1532 version, newmarks = obsolete._readmarkers(data)
1533 markers += newmarks
1533 markers += newmarks
1534 if markers:
1534 if markers:
1535 pullop.repo.obsstore.add(tr, markers)
1535 pullop.repo.obsstore.add(tr, markers)
1536 pullop.repo.invalidatevolatilesets()
1536 pullop.repo.invalidatevolatilesets()
1537 return tr
1537 return tr
1538
1538
1539 def caps20to10(repo):
1539 def caps20to10(repo):
1540 """return a set with appropriate options to use bundle20 during getbundle"""
1540 """return a set with appropriate options to use bundle20 during getbundle"""
1541 caps = {'HG20'}
1541 caps = {'HG20'}
1542 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1542 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1543 caps.add('bundle2=' + urlreq.quote(capsblob))
1543 caps.add('bundle2=' + urlreq.quote(capsblob))
1544 return caps
1544 return caps
1545
1545
1546 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1546 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1547 getbundle2partsorder = []
1547 getbundle2partsorder = []
1548
1548
1549 # Mapping between step name and function
1549 # Mapping between step name and function
1550 #
1550 #
1551 # This exists to help extensions wrap steps if necessary
1551 # This exists to help extensions wrap steps if necessary
1552 getbundle2partsmapping = {}
1552 getbundle2partsmapping = {}
1553
1553
1554 def getbundle2partsgenerator(stepname, idx=None):
1554 def getbundle2partsgenerator(stepname, idx=None):
1555 """decorator for function generating bundle2 part for getbundle
1555 """decorator for function generating bundle2 part for getbundle
1556
1556
1557 The function is added to the step -> function mapping and appended to the
1557 The function is added to the step -> function mapping and appended to the
1558 list of steps. Beware that decorated functions will be added in order
1558 list of steps. Beware that decorated functions will be added in order
1559 (this may matter).
1559 (this may matter).
1560
1560
1561 You can only use this decorator for new steps, if you want to wrap a step
1561 You can only use this decorator for new steps, if you want to wrap a step
1562 from an extension, attack the getbundle2partsmapping dictionary directly."""
1562 from an extension, attack the getbundle2partsmapping dictionary directly."""
1563 def dec(func):
1563 def dec(func):
1564 assert stepname not in getbundle2partsmapping
1564 assert stepname not in getbundle2partsmapping
1565 getbundle2partsmapping[stepname] = func
1565 getbundle2partsmapping[stepname] = func
1566 if idx is None:
1566 if idx is None:
1567 getbundle2partsorder.append(stepname)
1567 getbundle2partsorder.append(stepname)
1568 else:
1568 else:
1569 getbundle2partsorder.insert(idx, stepname)
1569 getbundle2partsorder.insert(idx, stepname)
1570 return func
1570 return func
1571 return dec
1571 return dec
1572
1572
1573 def bundle2requested(bundlecaps):
1573 def bundle2requested(bundlecaps):
1574 if bundlecaps is not None:
1574 if bundlecaps is not None:
1575 return any(cap.startswith('HG2') for cap in bundlecaps)
1575 return any(cap.startswith('HG2') for cap in bundlecaps)
1576 return False
1576 return False
1577
1577
1578 def getbundlechunks(repo, source, heads=None, common=None, bundlecaps=None,
1578 def getbundlechunks(repo, source, heads=None, common=None, bundlecaps=None,
1579 **kwargs):
1579 **kwargs):
1580 """Return chunks constituting a bundle's raw data.
1580 """Return chunks constituting a bundle's raw data.
1581
1581
1582 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
1582 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
1583 passed.
1583 passed.
1584
1584
1585 Returns an iterator over raw chunks (of varying sizes).
1585 Returns an iterator over raw chunks (of varying sizes).
1586 """
1586 """
1587 kwargs = pycompat.byteskwargs(kwargs)
1587 kwargs = pycompat.byteskwargs(kwargs)
1588 usebundle2 = bundle2requested(bundlecaps)
1588 usebundle2 = bundle2requested(bundlecaps)
1589 # bundle10 case
1589 # bundle10 case
1590 if not usebundle2:
1590 if not usebundle2:
1591 if bundlecaps and not kwargs.get('cg', True):
1591 if bundlecaps and not kwargs.get('cg', True):
1592 raise ValueError(_('request for bundle10 must include changegroup'))
1592 raise ValueError(_('request for bundle10 must include changegroup'))
1593
1593
1594 if kwargs:
1594 if kwargs:
1595 raise ValueError(_('unsupported getbundle arguments: %s')
1595 raise ValueError(_('unsupported getbundle arguments: %s')
1596 % ', '.join(sorted(kwargs.keys())))
1596 % ', '.join(sorted(kwargs.keys())))
1597 outgoing = _computeoutgoing(repo, heads, common)
1597 outgoing = _computeoutgoing(repo, heads, common)
1598 return changegroup.makestream(repo, outgoing, '01', source,
1598 return changegroup.makestream(repo, outgoing, '01', source,
1599 bundlecaps=bundlecaps)
1599 bundlecaps=bundlecaps)
1600
1600
1601 # bundle20 case
1601 # bundle20 case
1602 b2caps = {}
1602 b2caps = {}
1603 for bcaps in bundlecaps:
1603 for bcaps in bundlecaps:
1604 if bcaps.startswith('bundle2='):
1604 if bcaps.startswith('bundle2='):
1605 blob = urlreq.unquote(bcaps[len('bundle2='):])
1605 blob = urlreq.unquote(bcaps[len('bundle2='):])
1606 b2caps.update(bundle2.decodecaps(blob))
1606 b2caps.update(bundle2.decodecaps(blob))
1607 bundler = bundle2.bundle20(repo.ui, b2caps)
1607 bundler = bundle2.bundle20(repo.ui, b2caps)
1608
1608
1609 kwargs['heads'] = heads
1609 kwargs['heads'] = heads
1610 kwargs['common'] = common
1610 kwargs['common'] = common
1611
1611
1612 for name in getbundle2partsorder:
1612 for name in getbundle2partsorder:
1613 func = getbundle2partsmapping[name]
1613 func = getbundle2partsmapping[name]
1614 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1614 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1615 **pycompat.strkwargs(kwargs))
1615 **pycompat.strkwargs(kwargs))
1616
1616
1617 return bundler.getchunks()
1617 return bundler.getchunks()
1618
1618
1619 @getbundle2partsgenerator('changegroup')
1619 @getbundle2partsgenerator('changegroup')
1620 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1620 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1621 b2caps=None, heads=None, common=None, **kwargs):
1621 b2caps=None, heads=None, common=None, **kwargs):
1622 """add a changegroup part to the requested bundle"""
1622 """add a changegroup part to the requested bundle"""
1623 cgstream = None
1623 cgstream = None
1624 if kwargs.get('cg', True):
1624 if kwargs.get('cg', True):
1625 # build changegroup bundle here.
1625 # build changegroup bundle here.
1626 version = '01'
1626 version = '01'
1627 cgversions = b2caps.get('changegroup')
1627 cgversions = b2caps.get('changegroup')
1628 if cgversions: # 3.1 and 3.2 ship with an empty value
1628 if cgversions: # 3.1 and 3.2 ship with an empty value
1629 cgversions = [v for v in cgversions
1629 cgversions = [v for v in cgversions
1630 if v in changegroup.supportedoutgoingversions(repo)]
1630 if v in changegroup.supportedoutgoingversions(repo)]
1631 if not cgversions:
1631 if not cgversions:
1632 raise ValueError(_('no common changegroup version'))
1632 raise ValueError(_('no common changegroup version'))
1633 version = max(cgversions)
1633 version = max(cgversions)
1634 outgoing = _computeoutgoing(repo, heads, common)
1634 outgoing = _computeoutgoing(repo, heads, common)
1635 if outgoing.missing:
1635 if outgoing.missing:
1636 cgstream = changegroup.makestream(repo, outgoing, version, source,
1636 cgstream = changegroup.makestream(repo, outgoing, version, source,
1637 bundlecaps=bundlecaps)
1637 bundlecaps=bundlecaps)
1638
1638
1639 if cgstream:
1639 if cgstream:
1640 part = bundler.newpart('changegroup', data=cgstream)
1640 part = bundler.newpart('changegroup', data=cgstream)
1641 if cgversions:
1641 if cgversions:
1642 part.addparam('version', version)
1642 part.addparam('version', version)
1643 part.addparam('nbchanges', str(len(outgoing.missing)), mandatory=False)
1643 part.addparam('nbchanges', '%d' % len(outgoing.missing),
1644 mandatory=False)
1644 if 'treemanifest' in repo.requirements:
1645 if 'treemanifest' in repo.requirements:
1645 part.addparam('treemanifest', '1')
1646 part.addparam('treemanifest', '1')
1646
1647
1647 @getbundle2partsgenerator('listkeys')
1648 @getbundle2partsgenerator('listkeys')
1648 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1649 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1649 b2caps=None, **kwargs):
1650 b2caps=None, **kwargs):
1650 """add parts containing listkeys namespaces to the requested bundle"""
1651 """add parts containing listkeys namespaces to the requested bundle"""
1651 listkeys = kwargs.get('listkeys', ())
1652 listkeys = kwargs.get('listkeys', ())
1652 for namespace in listkeys:
1653 for namespace in listkeys:
1653 part = bundler.newpart('listkeys')
1654 part = bundler.newpart('listkeys')
1654 part.addparam('namespace', namespace)
1655 part.addparam('namespace', namespace)
1655 keys = repo.listkeys(namespace).items()
1656 keys = repo.listkeys(namespace).items()
1656 part.data = pushkey.encodekeys(keys)
1657 part.data = pushkey.encodekeys(keys)
1657
1658
1658 @getbundle2partsgenerator('obsmarkers')
1659 @getbundle2partsgenerator('obsmarkers')
1659 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1660 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1660 b2caps=None, heads=None, **kwargs):
1661 b2caps=None, heads=None, **kwargs):
1661 """add an obsolescence markers part to the requested bundle"""
1662 """add an obsolescence markers part to the requested bundle"""
1662 if kwargs.get('obsmarkers', False):
1663 if kwargs.get('obsmarkers', False):
1663 if heads is None:
1664 if heads is None:
1664 heads = repo.heads()
1665 heads = repo.heads()
1665 subset = [c.node() for c in repo.set('::%ln', heads)]
1666 subset = [c.node() for c in repo.set('::%ln', heads)]
1666 markers = repo.obsstore.relevantmarkers(subset)
1667 markers = repo.obsstore.relevantmarkers(subset)
1667 markers = sorted(markers)
1668 markers = sorted(markers)
1668 bundle2.buildobsmarkerspart(bundler, markers)
1669 bundle2.buildobsmarkerspart(bundler, markers)
1669
1670
1670 @getbundle2partsgenerator('hgtagsfnodes')
1671 @getbundle2partsgenerator('hgtagsfnodes')
1671 def _getbundletagsfnodes(bundler, repo, source, bundlecaps=None,
1672 def _getbundletagsfnodes(bundler, repo, source, bundlecaps=None,
1672 b2caps=None, heads=None, common=None,
1673 b2caps=None, heads=None, common=None,
1673 **kwargs):
1674 **kwargs):
1674 """Transfer the .hgtags filenodes mapping.
1675 """Transfer the .hgtags filenodes mapping.
1675
1676
1676 Only values for heads in this bundle will be transferred.
1677 Only values for heads in this bundle will be transferred.
1677
1678
1678 The part data consists of pairs of 20 byte changeset node and .hgtags
1679 The part data consists of pairs of 20 byte changeset node and .hgtags
1679 filenodes raw values.
1680 filenodes raw values.
1680 """
1681 """
1681 # Don't send unless:
1682 # Don't send unless:
1682 # - changeset are being exchanged,
1683 # - changeset are being exchanged,
1683 # - the client supports it.
1684 # - the client supports it.
1684 if not (kwargs.get('cg', True) and 'hgtagsfnodes' in b2caps):
1685 if not (kwargs.get('cg', True) and 'hgtagsfnodes' in b2caps):
1685 return
1686 return
1686
1687
1687 outgoing = _computeoutgoing(repo, heads, common)
1688 outgoing = _computeoutgoing(repo, heads, common)
1688 bundle2.addparttagsfnodescache(repo, bundler, outgoing)
1689 bundle2.addparttagsfnodescache(repo, bundler, outgoing)
1689
1690
1690 def _getbookmarks(repo, **kwargs):
1691 def _getbookmarks(repo, **kwargs):
1691 """Returns bookmark to node mapping.
1692 """Returns bookmark to node mapping.
1692
1693
1693 This function is primarily used to generate `bookmarks` bundle2 part.
1694 This function is primarily used to generate `bookmarks` bundle2 part.
1694 It is a separate function in order to make it easy to wrap it
1695 It is a separate function in order to make it easy to wrap it
1695 in extensions. Passing `kwargs` to the function makes it easy to
1696 in extensions. Passing `kwargs` to the function makes it easy to
1696 add new parameters in extensions.
1697 add new parameters in extensions.
1697 """
1698 """
1698
1699
1699 return dict(bookmod.listbinbookmarks(repo))
1700 return dict(bookmod.listbinbookmarks(repo))
1700
1701
1701 def check_heads(repo, their_heads, context):
1702 def check_heads(repo, their_heads, context):
1702 """check if the heads of a repo have been modified
1703 """check if the heads of a repo have been modified
1703
1704
1704 Used by peer for unbundling.
1705 Used by peer for unbundling.
1705 """
1706 """
1706 heads = repo.heads()
1707 heads = repo.heads()
1707 heads_hash = hashlib.sha1(''.join(sorted(heads))).digest()
1708 heads_hash = hashlib.sha1(''.join(sorted(heads))).digest()
1708 if not (their_heads == ['force'] or their_heads == heads or
1709 if not (their_heads == ['force'] or their_heads == heads or
1709 their_heads == ['hashed', heads_hash]):
1710 their_heads == ['hashed', heads_hash]):
1710 # someone else committed/pushed/unbundled while we
1711 # someone else committed/pushed/unbundled while we
1711 # were transferring data
1712 # were transferring data
1712 raise error.PushRaced('repository changed while %s - '
1713 raise error.PushRaced('repository changed while %s - '
1713 'please try again' % context)
1714 'please try again' % context)
1714
1715
1715 def unbundle(repo, cg, heads, source, url):
1716 def unbundle(repo, cg, heads, source, url):
1716 """Apply a bundle to a repo.
1717 """Apply a bundle to a repo.
1717
1718
1718 this function makes sure the repo is locked during the application and have
1719 this function makes sure the repo is locked during the application and have
1719 mechanism to check that no push race occurred between the creation of the
1720 mechanism to check that no push race occurred between the creation of the
1720 bundle and its application.
1721 bundle and its application.
1721
1722
1722 If the push was raced as PushRaced exception is raised."""
1723 If the push was raced as PushRaced exception is raised."""
1723 r = 0
1724 r = 0
1724 # need a transaction when processing a bundle2 stream
1725 # need a transaction when processing a bundle2 stream
1725 # [wlock, lock, tr] - needs to be an array so nested functions can modify it
1726 # [wlock, lock, tr] - needs to be an array so nested functions can modify it
1726 lockandtr = [None, None, None]
1727 lockandtr = [None, None, None]
1727 recordout = None
1728 recordout = None
1728 # quick fix for output mismatch with bundle2 in 3.4
1729 # quick fix for output mismatch with bundle2 in 3.4
1729 captureoutput = repo.ui.configbool('experimental', 'bundle2-output-capture')
1730 captureoutput = repo.ui.configbool('experimental', 'bundle2-output-capture')
1730 if url.startswith('remote:http:') or url.startswith('remote:https:'):
1731 if url.startswith('remote:http:') or url.startswith('remote:https:'):
1731 captureoutput = True
1732 captureoutput = True
1732 try:
1733 try:
1733 # note: outside bundle1, 'heads' is expected to be empty and this
1734 # note: outside bundle1, 'heads' is expected to be empty and this
1734 # 'check_heads' call wil be a no-op
1735 # 'check_heads' call wil be a no-op
1735 check_heads(repo, heads, 'uploading changes')
1736 check_heads(repo, heads, 'uploading changes')
1736 # push can proceed
1737 # push can proceed
1737 if not isinstance(cg, bundle2.unbundle20):
1738 if not isinstance(cg, bundle2.unbundle20):
1738 # legacy case: bundle1 (changegroup 01)
1739 # legacy case: bundle1 (changegroup 01)
1739 txnname = "\n".join([source, util.hidepassword(url)])
1740 txnname = "\n".join([source, util.hidepassword(url)])
1740 with repo.lock(), repo.transaction(txnname) as tr:
1741 with repo.lock(), repo.transaction(txnname) as tr:
1741 op = bundle2.applybundle(repo, cg, tr, source, url)
1742 op = bundle2.applybundle(repo, cg, tr, source, url)
1742 r = bundle2.combinechangegroupresults(op)
1743 r = bundle2.combinechangegroupresults(op)
1743 else:
1744 else:
1744 r = None
1745 r = None
1745 try:
1746 try:
1746 def gettransaction():
1747 def gettransaction():
1747 if not lockandtr[2]:
1748 if not lockandtr[2]:
1748 lockandtr[0] = repo.wlock()
1749 lockandtr[0] = repo.wlock()
1749 lockandtr[1] = repo.lock()
1750 lockandtr[1] = repo.lock()
1750 lockandtr[2] = repo.transaction(source)
1751 lockandtr[2] = repo.transaction(source)
1751 lockandtr[2].hookargs['source'] = source
1752 lockandtr[2].hookargs['source'] = source
1752 lockandtr[2].hookargs['url'] = url
1753 lockandtr[2].hookargs['url'] = url
1753 lockandtr[2].hookargs['bundle2'] = '1'
1754 lockandtr[2].hookargs['bundle2'] = '1'
1754 return lockandtr[2]
1755 return lockandtr[2]
1755
1756
1756 # Do greedy locking by default until we're satisfied with lazy
1757 # Do greedy locking by default until we're satisfied with lazy
1757 # locking.
1758 # locking.
1758 if not repo.ui.configbool('experimental', 'bundle2lazylocking'):
1759 if not repo.ui.configbool('experimental', 'bundle2lazylocking'):
1759 gettransaction()
1760 gettransaction()
1760
1761
1761 op = bundle2.bundleoperation(repo, gettransaction,
1762 op = bundle2.bundleoperation(repo, gettransaction,
1762 captureoutput=captureoutput)
1763 captureoutput=captureoutput)
1763 try:
1764 try:
1764 op = bundle2.processbundle(repo, cg, op=op)
1765 op = bundle2.processbundle(repo, cg, op=op)
1765 finally:
1766 finally:
1766 r = op.reply
1767 r = op.reply
1767 if captureoutput and r is not None:
1768 if captureoutput and r is not None:
1768 repo.ui.pushbuffer(error=True, subproc=True)
1769 repo.ui.pushbuffer(error=True, subproc=True)
1769 def recordout(output):
1770 def recordout(output):
1770 r.newpart('output', data=output, mandatory=False)
1771 r.newpart('output', data=output, mandatory=False)
1771 if lockandtr[2] is not None:
1772 if lockandtr[2] is not None:
1772 lockandtr[2].close()
1773 lockandtr[2].close()
1773 except BaseException as exc:
1774 except BaseException as exc:
1774 exc.duringunbundle2 = True
1775 exc.duringunbundle2 = True
1775 if captureoutput and r is not None:
1776 if captureoutput and r is not None:
1776 parts = exc._bundle2salvagedoutput = r.salvageoutput()
1777 parts = exc._bundle2salvagedoutput = r.salvageoutput()
1777 def recordout(output):
1778 def recordout(output):
1778 part = bundle2.bundlepart('output', data=output,
1779 part = bundle2.bundlepart('output', data=output,
1779 mandatory=False)
1780 mandatory=False)
1780 parts.append(part)
1781 parts.append(part)
1781 raise
1782 raise
1782 finally:
1783 finally:
1783 lockmod.release(lockandtr[2], lockandtr[1], lockandtr[0])
1784 lockmod.release(lockandtr[2], lockandtr[1], lockandtr[0])
1784 if recordout is not None:
1785 if recordout is not None:
1785 recordout(repo.ui.popbuffer())
1786 recordout(repo.ui.popbuffer())
1786 return r
1787 return r
1787
1788
1788 def _maybeapplyclonebundle(pullop):
1789 def _maybeapplyclonebundle(pullop):
1789 """Apply a clone bundle from a remote, if possible."""
1790 """Apply a clone bundle from a remote, if possible."""
1790
1791
1791 repo = pullop.repo
1792 repo = pullop.repo
1792 remote = pullop.remote
1793 remote = pullop.remote
1793
1794
1794 if not repo.ui.configbool('ui', 'clonebundles'):
1795 if not repo.ui.configbool('ui', 'clonebundles'):
1795 return
1796 return
1796
1797
1797 # Only run if local repo is empty.
1798 # Only run if local repo is empty.
1798 if len(repo):
1799 if len(repo):
1799 return
1800 return
1800
1801
1801 if pullop.heads:
1802 if pullop.heads:
1802 return
1803 return
1803
1804
1804 if not remote.capable('clonebundles'):
1805 if not remote.capable('clonebundles'):
1805 return
1806 return
1806
1807
1807 res = remote._call('clonebundles')
1808 res = remote._call('clonebundles')
1808
1809
1809 # If we call the wire protocol command, that's good enough to record the
1810 # If we call the wire protocol command, that's good enough to record the
1810 # attempt.
1811 # attempt.
1811 pullop.clonebundleattempted = True
1812 pullop.clonebundleattempted = True
1812
1813
1813 entries = parseclonebundlesmanifest(repo, res)
1814 entries = parseclonebundlesmanifest(repo, res)
1814 if not entries:
1815 if not entries:
1815 repo.ui.note(_('no clone bundles available on remote; '
1816 repo.ui.note(_('no clone bundles available on remote; '
1816 'falling back to regular clone\n'))
1817 'falling back to regular clone\n'))
1817 return
1818 return
1818
1819
1819 entries = filterclonebundleentries(repo, entries)
1820 entries = filterclonebundleentries(repo, entries)
1820 if not entries:
1821 if not entries:
1821 # There is a thundering herd concern here. However, if a server
1822 # There is a thundering herd concern here. However, if a server
1822 # operator doesn't advertise bundles appropriate for its clients,
1823 # operator doesn't advertise bundles appropriate for its clients,
1823 # they deserve what's coming. Furthermore, from a client's
1824 # they deserve what's coming. Furthermore, from a client's
1824 # perspective, no automatic fallback would mean not being able to
1825 # perspective, no automatic fallback would mean not being able to
1825 # clone!
1826 # clone!
1826 repo.ui.warn(_('no compatible clone bundles available on server; '
1827 repo.ui.warn(_('no compatible clone bundles available on server; '
1827 'falling back to regular clone\n'))
1828 'falling back to regular clone\n'))
1828 repo.ui.warn(_('(you may want to report this to the server '
1829 repo.ui.warn(_('(you may want to report this to the server '
1829 'operator)\n'))
1830 'operator)\n'))
1830 return
1831 return
1831
1832
1832 entries = sortclonebundleentries(repo.ui, entries)
1833 entries = sortclonebundleentries(repo.ui, entries)
1833
1834
1834 url = entries[0]['URL']
1835 url = entries[0]['URL']
1835 repo.ui.status(_('applying clone bundle from %s\n') % url)
1836 repo.ui.status(_('applying clone bundle from %s\n') % url)
1836 if trypullbundlefromurl(repo.ui, repo, url):
1837 if trypullbundlefromurl(repo.ui, repo, url):
1837 repo.ui.status(_('finished applying clone bundle\n'))
1838 repo.ui.status(_('finished applying clone bundle\n'))
1838 # Bundle failed.
1839 # Bundle failed.
1839 #
1840 #
1840 # We abort by default to avoid the thundering herd of
1841 # We abort by default to avoid the thundering herd of
1841 # clients flooding a server that was expecting expensive
1842 # clients flooding a server that was expecting expensive
1842 # clone load to be offloaded.
1843 # clone load to be offloaded.
1843 elif repo.ui.configbool('ui', 'clonebundlefallback'):
1844 elif repo.ui.configbool('ui', 'clonebundlefallback'):
1844 repo.ui.warn(_('falling back to normal clone\n'))
1845 repo.ui.warn(_('falling back to normal clone\n'))
1845 else:
1846 else:
1846 raise error.Abort(_('error applying bundle'),
1847 raise error.Abort(_('error applying bundle'),
1847 hint=_('if this error persists, consider contacting '
1848 hint=_('if this error persists, consider contacting '
1848 'the server operator or disable clone '
1849 'the server operator or disable clone '
1849 'bundles via '
1850 'bundles via '
1850 '"--config ui.clonebundles=false"'))
1851 '"--config ui.clonebundles=false"'))
1851
1852
1852 def parseclonebundlesmanifest(repo, s):
1853 def parseclonebundlesmanifest(repo, s):
1853 """Parses the raw text of a clone bundles manifest.
1854 """Parses the raw text of a clone bundles manifest.
1854
1855
1855 Returns a list of dicts. The dicts have a ``URL`` key corresponding
1856 Returns a list of dicts. The dicts have a ``URL`` key corresponding
1856 to the URL and other keys are the attributes for the entry.
1857 to the URL and other keys are the attributes for the entry.
1857 """
1858 """
1858 m = []
1859 m = []
1859 for line in s.splitlines():
1860 for line in s.splitlines():
1860 fields = line.split()
1861 fields = line.split()
1861 if not fields:
1862 if not fields:
1862 continue
1863 continue
1863 attrs = {'URL': fields[0]}
1864 attrs = {'URL': fields[0]}
1864 for rawattr in fields[1:]:
1865 for rawattr in fields[1:]:
1865 key, value = rawattr.split('=', 1)
1866 key, value = rawattr.split('=', 1)
1866 key = urlreq.unquote(key)
1867 key = urlreq.unquote(key)
1867 value = urlreq.unquote(value)
1868 value = urlreq.unquote(value)
1868 attrs[key] = value
1869 attrs[key] = value
1869
1870
1870 # Parse BUNDLESPEC into components. This makes client-side
1871 # Parse BUNDLESPEC into components. This makes client-side
1871 # preferences easier to specify since you can prefer a single
1872 # preferences easier to specify since you can prefer a single
1872 # component of the BUNDLESPEC.
1873 # component of the BUNDLESPEC.
1873 if key == 'BUNDLESPEC':
1874 if key == 'BUNDLESPEC':
1874 try:
1875 try:
1875 comp, version, params = parsebundlespec(repo, value,
1876 comp, version, params = parsebundlespec(repo, value,
1876 externalnames=True)
1877 externalnames=True)
1877 attrs['COMPRESSION'] = comp
1878 attrs['COMPRESSION'] = comp
1878 attrs['VERSION'] = version
1879 attrs['VERSION'] = version
1879 except error.InvalidBundleSpecification:
1880 except error.InvalidBundleSpecification:
1880 pass
1881 pass
1881 except error.UnsupportedBundleSpecification:
1882 except error.UnsupportedBundleSpecification:
1882 pass
1883 pass
1883
1884
1884 m.append(attrs)
1885 m.append(attrs)
1885
1886
1886 return m
1887 return m
1887
1888
1888 def filterclonebundleentries(repo, entries):
1889 def filterclonebundleentries(repo, entries):
1889 """Remove incompatible clone bundle manifest entries.
1890 """Remove incompatible clone bundle manifest entries.
1890
1891
1891 Accepts a list of entries parsed with ``parseclonebundlesmanifest``
1892 Accepts a list of entries parsed with ``parseclonebundlesmanifest``
1892 and returns a new list consisting of only the entries that this client
1893 and returns a new list consisting of only the entries that this client
1893 should be able to apply.
1894 should be able to apply.
1894
1895
1895 There is no guarantee we'll be able to apply all returned entries because
1896 There is no guarantee we'll be able to apply all returned entries because
1896 the metadata we use to filter on may be missing or wrong.
1897 the metadata we use to filter on may be missing or wrong.
1897 """
1898 """
1898 newentries = []
1899 newentries = []
1899 for entry in entries:
1900 for entry in entries:
1900 spec = entry.get('BUNDLESPEC')
1901 spec = entry.get('BUNDLESPEC')
1901 if spec:
1902 if spec:
1902 try:
1903 try:
1903 parsebundlespec(repo, spec, strict=True)
1904 parsebundlespec(repo, spec, strict=True)
1904 except error.InvalidBundleSpecification as e:
1905 except error.InvalidBundleSpecification as e:
1905 repo.ui.debug(str(e) + '\n')
1906 repo.ui.debug(str(e) + '\n')
1906 continue
1907 continue
1907 except error.UnsupportedBundleSpecification as e:
1908 except error.UnsupportedBundleSpecification as e:
1908 repo.ui.debug('filtering %s because unsupported bundle '
1909 repo.ui.debug('filtering %s because unsupported bundle '
1909 'spec: %s\n' % (entry['URL'], str(e)))
1910 'spec: %s\n' % (entry['URL'], str(e)))
1910 continue
1911 continue
1911
1912
1912 if 'REQUIRESNI' in entry and not sslutil.hassni:
1913 if 'REQUIRESNI' in entry and not sslutil.hassni:
1913 repo.ui.debug('filtering %s because SNI not supported\n' %
1914 repo.ui.debug('filtering %s because SNI not supported\n' %
1914 entry['URL'])
1915 entry['URL'])
1915 continue
1916 continue
1916
1917
1917 newentries.append(entry)
1918 newentries.append(entry)
1918
1919
1919 return newentries
1920 return newentries
1920
1921
1921 class clonebundleentry(object):
1922 class clonebundleentry(object):
1922 """Represents an item in a clone bundles manifest.
1923 """Represents an item in a clone bundles manifest.
1923
1924
1924 This rich class is needed to support sorting since sorted() in Python 3
1925 This rich class is needed to support sorting since sorted() in Python 3
1925 doesn't support ``cmp`` and our comparison is complex enough that ``key=``
1926 doesn't support ``cmp`` and our comparison is complex enough that ``key=``
1926 won't work.
1927 won't work.
1927 """
1928 """
1928
1929
1929 def __init__(self, value, prefers):
1930 def __init__(self, value, prefers):
1930 self.value = value
1931 self.value = value
1931 self.prefers = prefers
1932 self.prefers = prefers
1932
1933
1933 def _cmp(self, other):
1934 def _cmp(self, other):
1934 for prefkey, prefvalue in self.prefers:
1935 for prefkey, prefvalue in self.prefers:
1935 avalue = self.value.get(prefkey)
1936 avalue = self.value.get(prefkey)
1936 bvalue = other.value.get(prefkey)
1937 bvalue = other.value.get(prefkey)
1937
1938
1938 # Special case for b missing attribute and a matches exactly.
1939 # Special case for b missing attribute and a matches exactly.
1939 if avalue is not None and bvalue is None and avalue == prefvalue:
1940 if avalue is not None and bvalue is None and avalue == prefvalue:
1940 return -1
1941 return -1
1941
1942
1942 # Special case for a missing attribute and b matches exactly.
1943 # Special case for a missing attribute and b matches exactly.
1943 if bvalue is not None and avalue is None and bvalue == prefvalue:
1944 if bvalue is not None and avalue is None and bvalue == prefvalue:
1944 return 1
1945 return 1
1945
1946
1946 # We can't compare unless attribute present on both.
1947 # We can't compare unless attribute present on both.
1947 if avalue is None or bvalue is None:
1948 if avalue is None or bvalue is None:
1948 continue
1949 continue
1949
1950
1950 # Same values should fall back to next attribute.
1951 # Same values should fall back to next attribute.
1951 if avalue == bvalue:
1952 if avalue == bvalue:
1952 continue
1953 continue
1953
1954
1954 # Exact matches come first.
1955 # Exact matches come first.
1955 if avalue == prefvalue:
1956 if avalue == prefvalue:
1956 return -1
1957 return -1
1957 if bvalue == prefvalue:
1958 if bvalue == prefvalue:
1958 return 1
1959 return 1
1959
1960
1960 # Fall back to next attribute.
1961 # Fall back to next attribute.
1961 continue
1962 continue
1962
1963
1963 # If we got here we couldn't sort by attributes and prefers. Fall
1964 # If we got here we couldn't sort by attributes and prefers. Fall
1964 # back to index order.
1965 # back to index order.
1965 return 0
1966 return 0
1966
1967
1967 def __lt__(self, other):
1968 def __lt__(self, other):
1968 return self._cmp(other) < 0
1969 return self._cmp(other) < 0
1969
1970
1970 def __gt__(self, other):
1971 def __gt__(self, other):
1971 return self._cmp(other) > 0
1972 return self._cmp(other) > 0
1972
1973
1973 def __eq__(self, other):
1974 def __eq__(self, other):
1974 return self._cmp(other) == 0
1975 return self._cmp(other) == 0
1975
1976
1976 def __le__(self, other):
1977 def __le__(self, other):
1977 return self._cmp(other) <= 0
1978 return self._cmp(other) <= 0
1978
1979
1979 def __ge__(self, other):
1980 def __ge__(self, other):
1980 return self._cmp(other) >= 0
1981 return self._cmp(other) >= 0
1981
1982
1982 def __ne__(self, other):
1983 def __ne__(self, other):
1983 return self._cmp(other) != 0
1984 return self._cmp(other) != 0
1984
1985
1985 def sortclonebundleentries(ui, entries):
1986 def sortclonebundleentries(ui, entries):
1986 prefers = ui.configlist('ui', 'clonebundleprefers')
1987 prefers = ui.configlist('ui', 'clonebundleprefers')
1987 if not prefers:
1988 if not prefers:
1988 return list(entries)
1989 return list(entries)
1989
1990
1990 prefers = [p.split('=', 1) for p in prefers]
1991 prefers = [p.split('=', 1) for p in prefers]
1991
1992
1992 items = sorted(clonebundleentry(v, prefers) for v in entries)
1993 items = sorted(clonebundleentry(v, prefers) for v in entries)
1993 return [i.value for i in items]
1994 return [i.value for i in items]
1994
1995
1995 def trypullbundlefromurl(ui, repo, url):
1996 def trypullbundlefromurl(ui, repo, url):
1996 """Attempt to apply a bundle from a URL."""
1997 """Attempt to apply a bundle from a URL."""
1997 with repo.lock(), repo.transaction('bundleurl') as tr:
1998 with repo.lock(), repo.transaction('bundleurl') as tr:
1998 try:
1999 try:
1999 fh = urlmod.open(ui, url)
2000 fh = urlmod.open(ui, url)
2000 cg = readbundle(ui, fh, 'stream')
2001 cg = readbundle(ui, fh, 'stream')
2001
2002
2002 if isinstance(cg, streamclone.streamcloneapplier):
2003 if isinstance(cg, streamclone.streamcloneapplier):
2003 cg.apply(repo)
2004 cg.apply(repo)
2004 else:
2005 else:
2005 bundle2.applybundle(repo, cg, tr, 'clonebundles', url)
2006 bundle2.applybundle(repo, cg, tr, 'clonebundles', url)
2006 return True
2007 return True
2007 except urlerr.httperror as e:
2008 except urlerr.httperror as e:
2008 ui.warn(_('HTTP error fetching bundle: %s\n') % str(e))
2009 ui.warn(_('HTTP error fetching bundle: %s\n') % str(e))
2009 except urlerr.urlerror as e:
2010 except urlerr.urlerror as e:
2010 ui.warn(_('error fetching bundle: %s\n') % e.reason)
2011 ui.warn(_('error fetching bundle: %s\n') % e.reason)
2011
2012
2012 return False
2013 return False
General Comments 0
You need to be logged in to leave comments. Login now