##// END OF EJS Templates
exchange: don't attempt phase exchange if phase-heads was in bundle...
Martin von Zweigbergk -
r33887:13dc7f29 default
parent child Browse files
Show More
@@ -1,1894 +1,1895 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 phases,
161 phases,
162 pushkey,
162 pushkey,
163 pycompat,
163 pycompat,
164 tags,
164 tags,
165 url,
165 url,
166 util,
166 util,
167 )
167 )
168
168
169 urlerr = util.urlerr
169 urlerr = util.urlerr
170 urlreq = util.urlreq
170 urlreq = util.urlreq
171
171
172 _pack = struct.pack
172 _pack = struct.pack
173 _unpack = struct.unpack
173 _unpack = struct.unpack
174
174
175 _fstreamparamsize = '>i'
175 _fstreamparamsize = '>i'
176 _fpartheadersize = '>i'
176 _fpartheadersize = '>i'
177 _fparttypesize = '>B'
177 _fparttypesize = '>B'
178 _fpartid = '>I'
178 _fpartid = '>I'
179 _fpayloadsize = '>i'
179 _fpayloadsize = '>i'
180 _fpartparamcount = '>BB'
180 _fpartparamcount = '>BB'
181
181
182 _fphasesentry = '>i20s'
182 _fphasesentry = '>i20s'
183
183
184 preferedchunksize = 4096
184 preferedchunksize = 4096
185
185
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187
187
188 def outdebug(ui, message):
188 def outdebug(ui, message):
189 """debug regarding output stream (bundling)"""
189 """debug regarding output stream (bundling)"""
190 if ui.configbool('devel', 'bundle2.debug'):
190 if ui.configbool('devel', 'bundle2.debug'):
191 ui.debug('bundle2-output: %s\n' % message)
191 ui.debug('bundle2-output: %s\n' % message)
192
192
193 def indebug(ui, message):
193 def indebug(ui, message):
194 """debug on input stream (unbundling)"""
194 """debug on input stream (unbundling)"""
195 if ui.configbool('devel', 'bundle2.debug'):
195 if ui.configbool('devel', 'bundle2.debug'):
196 ui.debug('bundle2-input: %s\n' % message)
196 ui.debug('bundle2-input: %s\n' % message)
197
197
198 def validateparttype(parttype):
198 def validateparttype(parttype):
199 """raise ValueError if a parttype contains invalid character"""
199 """raise ValueError if a parttype contains invalid character"""
200 if _parttypeforbidden.search(parttype):
200 if _parttypeforbidden.search(parttype):
201 raise ValueError(parttype)
201 raise ValueError(parttype)
202
202
203 def _makefpartparamsizes(nbparams):
203 def _makefpartparamsizes(nbparams):
204 """return a struct format to read part parameter sizes
204 """return a struct format to read part parameter sizes
205
205
206 The number parameters is variable so we need to build that format
206 The number parameters is variable so we need to build that format
207 dynamically.
207 dynamically.
208 """
208 """
209 return '>'+('BB'*nbparams)
209 return '>'+('BB'*nbparams)
210
210
211 parthandlermapping = {}
211 parthandlermapping = {}
212
212
213 def parthandler(parttype, params=()):
213 def parthandler(parttype, params=()):
214 """decorator that register a function as a bundle2 part handler
214 """decorator that register a function as a bundle2 part handler
215
215
216 eg::
216 eg::
217
217
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 def myparttypehandler(...):
219 def myparttypehandler(...):
220 '''process a part of type "my part".'''
220 '''process a part of type "my part".'''
221 ...
221 ...
222 """
222 """
223 validateparttype(parttype)
223 validateparttype(parttype)
224 def _decorator(func):
224 def _decorator(func):
225 lparttype = parttype.lower() # enforce lower case matching.
225 lparttype = parttype.lower() # enforce lower case matching.
226 assert lparttype not in parthandlermapping
226 assert lparttype not in parthandlermapping
227 parthandlermapping[lparttype] = func
227 parthandlermapping[lparttype] = func
228 func.params = frozenset(params)
228 func.params = frozenset(params)
229 return func
229 return func
230 return _decorator
230 return _decorator
231
231
232 class unbundlerecords(object):
232 class unbundlerecords(object):
233 """keep record of what happens during and unbundle
233 """keep record of what happens during and unbundle
234
234
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 category of record and obj is an arbitrary object.
236 category of record and obj is an arbitrary object.
237
237
238 `records['cat']` will return all entries of this category 'cat'.
238 `records['cat']` will return all entries of this category 'cat'.
239
239
240 Iterating on the object itself will yield `('category', obj)` tuples
240 Iterating on the object itself will yield `('category', obj)` tuples
241 for all entries.
241 for all entries.
242
242
243 All iterations happens in chronological order.
243 All iterations happens in chronological order.
244 """
244 """
245
245
246 def __init__(self):
246 def __init__(self):
247 self._categories = {}
247 self._categories = {}
248 self._sequences = []
248 self._sequences = []
249 self._replies = {}
249 self._replies = {}
250
250
251 def add(self, category, entry, inreplyto=None):
251 def add(self, category, entry, inreplyto=None):
252 """add a new record of a given category.
252 """add a new record of a given category.
253
253
254 The entry can then be retrieved in the list returned by
254 The entry can then be retrieved in the list returned by
255 self['category']."""
255 self['category']."""
256 self._categories.setdefault(category, []).append(entry)
256 self._categories.setdefault(category, []).append(entry)
257 self._sequences.append((category, entry))
257 self._sequences.append((category, entry))
258 if inreplyto is not None:
258 if inreplyto is not None:
259 self.getreplies(inreplyto).add(category, entry)
259 self.getreplies(inreplyto).add(category, entry)
260
260
261 def getreplies(self, partid):
261 def getreplies(self, partid):
262 """get the records that are replies to a specific part"""
262 """get the records that are replies to a specific part"""
263 return self._replies.setdefault(partid, unbundlerecords())
263 return self._replies.setdefault(partid, unbundlerecords())
264
264
265 def __getitem__(self, cat):
265 def __getitem__(self, cat):
266 return tuple(self._categories.get(cat, ()))
266 return tuple(self._categories.get(cat, ()))
267
267
268 def __iter__(self):
268 def __iter__(self):
269 return iter(self._sequences)
269 return iter(self._sequences)
270
270
271 def __len__(self):
271 def __len__(self):
272 return len(self._sequences)
272 return len(self._sequences)
273
273
274 def __nonzero__(self):
274 def __nonzero__(self):
275 return bool(self._sequences)
275 return bool(self._sequences)
276
276
277 __bool__ = __nonzero__
277 __bool__ = __nonzero__
278
278
279 class bundleoperation(object):
279 class bundleoperation(object):
280 """an object that represents a single bundling process
280 """an object that represents a single bundling process
281
281
282 Its purpose is to carry unbundle-related objects and states.
282 Its purpose is to carry unbundle-related objects and states.
283
283
284 A new object should be created at the beginning of each bundle processing.
284 A new object should be created at the beginning of each bundle processing.
285 The object is to be returned by the processing function.
285 The object is to be returned by the processing function.
286
286
287 The object has very little content now it will ultimately contain:
287 The object has very little content now it will ultimately contain:
288 * an access to the repo the bundle is applied to,
288 * an access to the repo the bundle is applied to,
289 * a ui object,
289 * a ui object,
290 * a way to retrieve a transaction to add changes to the repo,
290 * a way to retrieve a transaction to add changes to the repo,
291 * a way to record the result of processing each part,
291 * a way to record the result of processing each part,
292 * a way to construct a bundle response when applicable.
292 * a way to construct a bundle response when applicable.
293 """
293 """
294
294
295 def __init__(self, repo, transactiongetter, captureoutput=True):
295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 self.repo = repo
296 self.repo = repo
297 self.ui = repo.ui
297 self.ui = repo.ui
298 self.records = unbundlerecords()
298 self.records = unbundlerecords()
299 self.reply = None
299 self.reply = None
300 self.captureoutput = captureoutput
300 self.captureoutput = captureoutput
301 self.hookargs = {}
301 self.hookargs = {}
302 self._gettransaction = transactiongetter
302 self._gettransaction = transactiongetter
303
303
304 def gettransaction(self):
304 def gettransaction(self):
305 transaction = self._gettransaction()
305 transaction = self._gettransaction()
306
306
307 if self.hookargs:
307 if self.hookargs:
308 # the ones added to the transaction supercede those added
308 # the ones added to the transaction supercede those added
309 # to the operation.
309 # to the operation.
310 self.hookargs.update(transaction.hookargs)
310 self.hookargs.update(transaction.hookargs)
311 transaction.hookargs = self.hookargs
311 transaction.hookargs = self.hookargs
312
312
313 # mark the hookargs as flushed. further attempts to add to
313 # mark the hookargs as flushed. further attempts to add to
314 # hookargs will result in an abort.
314 # hookargs will result in an abort.
315 self.hookargs = None
315 self.hookargs = None
316
316
317 return transaction
317 return transaction
318
318
319 def addhookargs(self, hookargs):
319 def addhookargs(self, hookargs):
320 if self.hookargs is None:
320 if self.hookargs is None:
321 raise error.ProgrammingError('attempted to add hookargs to '
321 raise error.ProgrammingError('attempted to add hookargs to '
322 'operation after transaction started')
322 'operation after transaction started')
323 self.hookargs.update(hookargs)
323 self.hookargs.update(hookargs)
324
324
325 class TransactionUnavailable(RuntimeError):
325 class TransactionUnavailable(RuntimeError):
326 pass
326 pass
327
327
328 def _notransaction():
328 def _notransaction():
329 """default method to get a transaction while processing a bundle
329 """default method to get a transaction while processing a bundle
330
330
331 Raise an exception to highlight the fact that no transaction was expected
331 Raise an exception to highlight the fact that no transaction was expected
332 to be created"""
332 to be created"""
333 raise TransactionUnavailable()
333 raise TransactionUnavailable()
334
334
335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
336 # transform me into unbundler.apply() as soon as the freeze is lifted
336 # transform me into unbundler.apply() as soon as the freeze is lifted
337 if isinstance(unbundler, unbundle20):
337 if isinstance(unbundler, unbundle20):
338 tr.hookargs['bundle2'] = '1'
338 tr.hookargs['bundle2'] = '1'
339 if source is not None and 'source' not in tr.hookargs:
339 if source is not None and 'source' not in tr.hookargs:
340 tr.hookargs['source'] = source
340 tr.hookargs['source'] = source
341 if url is not None and 'url' not in tr.hookargs:
341 if url is not None and 'url' not in tr.hookargs:
342 tr.hookargs['url'] = url
342 tr.hookargs['url'] = url
343 return processbundle(repo, unbundler, lambda: tr)
343 return processbundle(repo, unbundler, lambda: tr)
344 else:
344 else:
345 # the transactiongetter won't be used, but we might as well set it
345 # the transactiongetter won't be used, but we might as well set it
346 op = bundleoperation(repo, lambda: tr)
346 op = bundleoperation(repo, lambda: tr)
347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
348 return op
348 return op
349
349
350 def processbundle(repo, unbundler, transactiongetter=None, op=None):
350 def processbundle(repo, unbundler, transactiongetter=None, op=None):
351 """This function process a bundle, apply effect to/from a repo
351 """This function process a bundle, apply effect to/from a repo
352
352
353 It iterates over each part then searches for and uses the proper handling
353 It iterates over each part then searches for and uses the proper handling
354 code to process the part. Parts are processed in order.
354 code to process the part. Parts are processed in order.
355
355
356 Unknown Mandatory part will abort the process.
356 Unknown Mandatory part will abort the process.
357
357
358 It is temporarily possible to provide a prebuilt bundleoperation to the
358 It is temporarily possible to provide a prebuilt bundleoperation to the
359 function. This is used to ensure output is properly propagated in case of
359 function. This is used to ensure output is properly propagated in case of
360 an error during the unbundling. This output capturing part will likely be
360 an error during the unbundling. This output capturing part will likely be
361 reworked and this ability will probably go away in the process.
361 reworked and this ability will probably go away in the process.
362 """
362 """
363 if op is None:
363 if op is None:
364 if transactiongetter is None:
364 if transactiongetter is None:
365 transactiongetter = _notransaction
365 transactiongetter = _notransaction
366 op = bundleoperation(repo, transactiongetter)
366 op = bundleoperation(repo, transactiongetter)
367 # todo:
367 # todo:
368 # - replace this is a init function soon.
368 # - replace this is a init function soon.
369 # - exception catching
369 # - exception catching
370 unbundler.params
370 unbundler.params
371 if repo.ui.debugflag:
371 if repo.ui.debugflag:
372 msg = ['bundle2-input-bundle:']
372 msg = ['bundle2-input-bundle:']
373 if unbundler.params:
373 if unbundler.params:
374 msg.append(' %i params' % len(unbundler.params))
374 msg.append(' %i params' % len(unbundler.params))
375 if op._gettransaction is None or op._gettransaction is _notransaction:
375 if op._gettransaction is None or op._gettransaction is _notransaction:
376 msg.append(' no-transaction')
376 msg.append(' no-transaction')
377 else:
377 else:
378 msg.append(' with-transaction')
378 msg.append(' with-transaction')
379 msg.append('\n')
379 msg.append('\n')
380 repo.ui.debug(''.join(msg))
380 repo.ui.debug(''.join(msg))
381 iterparts = enumerate(unbundler.iterparts())
381 iterparts = enumerate(unbundler.iterparts())
382 part = None
382 part = None
383 nbpart = 0
383 nbpart = 0
384 try:
384 try:
385 for nbpart, part in iterparts:
385 for nbpart, part in iterparts:
386 _processpart(op, part)
386 _processpart(op, part)
387 except Exception as exc:
387 except Exception as exc:
388 # Any exceptions seeking to the end of the bundle at this point are
388 # Any exceptions seeking to the end of the bundle at this point are
389 # almost certainly related to the underlying stream being bad.
389 # almost certainly related to the underlying stream being bad.
390 # And, chances are that the exception we're handling is related to
390 # And, chances are that the exception we're handling is related to
391 # getting in that bad state. So, we swallow the seeking error and
391 # getting in that bad state. So, we swallow the seeking error and
392 # re-raise the original error.
392 # re-raise the original error.
393 seekerror = False
393 seekerror = False
394 try:
394 try:
395 for nbpart, part in iterparts:
395 for nbpart, part in iterparts:
396 # consume the bundle content
396 # consume the bundle content
397 part.seek(0, 2)
397 part.seek(0, 2)
398 except Exception:
398 except Exception:
399 seekerror = True
399 seekerror = True
400
400
401 # Small hack to let caller code distinguish exceptions from bundle2
401 # Small hack to let caller code distinguish exceptions from bundle2
402 # processing from processing the old format. This is mostly
402 # processing from processing the old format. This is mostly
403 # needed to handle different return codes to unbundle according to the
403 # needed to handle different return codes to unbundle according to the
404 # type of bundle. We should probably clean up or drop this return code
404 # type of bundle. We should probably clean up or drop this return code
405 # craziness in a future version.
405 # craziness in a future version.
406 exc.duringunbundle2 = True
406 exc.duringunbundle2 = True
407 salvaged = []
407 salvaged = []
408 replycaps = None
408 replycaps = None
409 if op.reply is not None:
409 if op.reply is not None:
410 salvaged = op.reply.salvageoutput()
410 salvaged = op.reply.salvageoutput()
411 replycaps = op.reply.capabilities
411 replycaps = op.reply.capabilities
412 exc._replycaps = replycaps
412 exc._replycaps = replycaps
413 exc._bundle2salvagedoutput = salvaged
413 exc._bundle2salvagedoutput = salvaged
414
414
415 # Re-raising from a variable loses the original stack. So only use
415 # Re-raising from a variable loses the original stack. So only use
416 # that form if we need to.
416 # that form if we need to.
417 if seekerror:
417 if seekerror:
418 raise exc
418 raise exc
419 else:
419 else:
420 raise
420 raise
421 finally:
421 finally:
422 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
422 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
423
423
424 return op
424 return op
425
425
426 def _processchangegroup(op, cg, tr, source, url, **kwargs):
426 def _processchangegroup(op, cg, tr, source, url, **kwargs):
427 ret = cg.apply(op.repo, tr, source, url, **kwargs)
427 ret = cg.apply(op.repo, tr, source, url, **kwargs)
428 op.records.add('changegroup', {
428 op.records.add('changegroup', {
429 'return': ret,
429 'return': ret,
430 })
430 })
431 return ret
431 return ret
432
432
433 def _processpart(op, part):
433 def _processpart(op, part):
434 """process a single part from a bundle
434 """process a single part from a bundle
435
435
436 The part is guaranteed to have been fully consumed when the function exits
436 The part is guaranteed to have been fully consumed when the function exits
437 (even if an exception is raised)."""
437 (even if an exception is raised)."""
438 status = 'unknown' # used by debug output
438 status = 'unknown' # used by debug output
439 hardabort = False
439 hardabort = False
440 try:
440 try:
441 try:
441 try:
442 handler = parthandlermapping.get(part.type)
442 handler = parthandlermapping.get(part.type)
443 if handler is None:
443 if handler is None:
444 status = 'unsupported-type'
444 status = 'unsupported-type'
445 raise error.BundleUnknownFeatureError(parttype=part.type)
445 raise error.BundleUnknownFeatureError(parttype=part.type)
446 indebug(op.ui, 'found a handler for part %r' % part.type)
446 indebug(op.ui, 'found a handler for part %r' % part.type)
447 unknownparams = part.mandatorykeys - handler.params
447 unknownparams = part.mandatorykeys - handler.params
448 if unknownparams:
448 if unknownparams:
449 unknownparams = list(unknownparams)
449 unknownparams = list(unknownparams)
450 unknownparams.sort()
450 unknownparams.sort()
451 status = 'unsupported-params (%s)' % unknownparams
451 status = 'unsupported-params (%s)' % unknownparams
452 raise error.BundleUnknownFeatureError(parttype=part.type,
452 raise error.BundleUnknownFeatureError(parttype=part.type,
453 params=unknownparams)
453 params=unknownparams)
454 status = 'supported'
454 status = 'supported'
455 except error.BundleUnknownFeatureError as exc:
455 except error.BundleUnknownFeatureError as exc:
456 if part.mandatory: # mandatory parts
456 if part.mandatory: # mandatory parts
457 raise
457 raise
458 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
458 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
459 return # skip to part processing
459 return # skip to part processing
460 finally:
460 finally:
461 if op.ui.debugflag:
461 if op.ui.debugflag:
462 msg = ['bundle2-input-part: "%s"' % part.type]
462 msg = ['bundle2-input-part: "%s"' % part.type]
463 if not part.mandatory:
463 if not part.mandatory:
464 msg.append(' (advisory)')
464 msg.append(' (advisory)')
465 nbmp = len(part.mandatorykeys)
465 nbmp = len(part.mandatorykeys)
466 nbap = len(part.params) - nbmp
466 nbap = len(part.params) - nbmp
467 if nbmp or nbap:
467 if nbmp or nbap:
468 msg.append(' (params:')
468 msg.append(' (params:')
469 if nbmp:
469 if nbmp:
470 msg.append(' %i mandatory' % nbmp)
470 msg.append(' %i mandatory' % nbmp)
471 if nbap:
471 if nbap:
472 msg.append(' %i advisory' % nbmp)
472 msg.append(' %i advisory' % nbmp)
473 msg.append(')')
473 msg.append(')')
474 msg.append(' %s\n' % status)
474 msg.append(' %s\n' % status)
475 op.ui.debug(''.join(msg))
475 op.ui.debug(''.join(msg))
476
476
477 # handler is called outside the above try block so that we don't
477 # handler is called outside the above try block so that we don't
478 # risk catching KeyErrors from anything other than the
478 # risk catching KeyErrors from anything other than the
479 # parthandlermapping lookup (any KeyError raised by handler()
479 # parthandlermapping lookup (any KeyError raised by handler()
480 # itself represents a defect of a different variety).
480 # itself represents a defect of a different variety).
481 output = None
481 output = None
482 if op.captureoutput and op.reply is not None:
482 if op.captureoutput and op.reply is not None:
483 op.ui.pushbuffer(error=True, subproc=True)
483 op.ui.pushbuffer(error=True, subproc=True)
484 output = ''
484 output = ''
485 try:
485 try:
486 handler(op, part)
486 handler(op, part)
487 finally:
487 finally:
488 if output is not None:
488 if output is not None:
489 output = op.ui.popbuffer()
489 output = op.ui.popbuffer()
490 if output:
490 if output:
491 outpart = op.reply.newpart('output', data=output,
491 outpart = op.reply.newpart('output', data=output,
492 mandatory=False)
492 mandatory=False)
493 outpart.addparam(
493 outpart.addparam(
494 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
494 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
495 # If exiting or interrupted, do not attempt to seek the stream in the
495 # If exiting or interrupted, do not attempt to seek the stream in the
496 # finally block below. This makes abort faster.
496 # finally block below. This makes abort faster.
497 except (SystemExit, KeyboardInterrupt):
497 except (SystemExit, KeyboardInterrupt):
498 hardabort = True
498 hardabort = True
499 raise
499 raise
500 finally:
500 finally:
501 # consume the part content to not corrupt the stream.
501 # consume the part content to not corrupt the stream.
502 if not hardabort:
502 if not hardabort:
503 part.seek(0, 2)
503 part.seek(0, 2)
504
504
505
505
506 def decodecaps(blob):
506 def decodecaps(blob):
507 """decode a bundle2 caps bytes blob into a dictionary
507 """decode a bundle2 caps bytes blob into a dictionary
508
508
509 The blob is a list of capabilities (one per line)
509 The blob is a list of capabilities (one per line)
510 Capabilities may have values using a line of the form::
510 Capabilities may have values using a line of the form::
511
511
512 capability=value1,value2,value3
512 capability=value1,value2,value3
513
513
514 The values are always a list."""
514 The values are always a list."""
515 caps = {}
515 caps = {}
516 for line in blob.splitlines():
516 for line in blob.splitlines():
517 if not line:
517 if not line:
518 continue
518 continue
519 if '=' not in line:
519 if '=' not in line:
520 key, vals = line, ()
520 key, vals = line, ()
521 else:
521 else:
522 key, vals = line.split('=', 1)
522 key, vals = line.split('=', 1)
523 vals = vals.split(',')
523 vals = vals.split(',')
524 key = urlreq.unquote(key)
524 key = urlreq.unquote(key)
525 vals = [urlreq.unquote(v) for v in vals]
525 vals = [urlreq.unquote(v) for v in vals]
526 caps[key] = vals
526 caps[key] = vals
527 return caps
527 return caps
528
528
529 def encodecaps(caps):
529 def encodecaps(caps):
530 """encode a bundle2 caps dictionary into a bytes blob"""
530 """encode a bundle2 caps dictionary into a bytes blob"""
531 chunks = []
531 chunks = []
532 for ca in sorted(caps):
532 for ca in sorted(caps):
533 vals = caps[ca]
533 vals = caps[ca]
534 ca = urlreq.quote(ca)
534 ca = urlreq.quote(ca)
535 vals = [urlreq.quote(v) for v in vals]
535 vals = [urlreq.quote(v) for v in vals]
536 if vals:
536 if vals:
537 ca = "%s=%s" % (ca, ','.join(vals))
537 ca = "%s=%s" % (ca, ','.join(vals))
538 chunks.append(ca)
538 chunks.append(ca)
539 return '\n'.join(chunks)
539 return '\n'.join(chunks)
540
540
541 bundletypes = {
541 bundletypes = {
542 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
542 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
543 # since the unification ssh accepts a header but there
543 # since the unification ssh accepts a header but there
544 # is no capability signaling it.
544 # is no capability signaling it.
545 "HG20": (), # special-cased below
545 "HG20": (), # special-cased below
546 "HG10UN": ("HG10UN", 'UN'),
546 "HG10UN": ("HG10UN", 'UN'),
547 "HG10BZ": ("HG10", 'BZ'),
547 "HG10BZ": ("HG10", 'BZ'),
548 "HG10GZ": ("HG10GZ", 'GZ'),
548 "HG10GZ": ("HG10GZ", 'GZ'),
549 }
549 }
550
550
551 # hgweb uses this list to communicate its preferred type
551 # hgweb uses this list to communicate its preferred type
552 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
552 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
553
553
554 class bundle20(object):
554 class bundle20(object):
555 """represent an outgoing bundle2 container
555 """represent an outgoing bundle2 container
556
556
557 Use the `addparam` method to add stream level parameter. and `newpart` to
557 Use the `addparam` method to add stream level parameter. and `newpart` to
558 populate it. Then call `getchunks` to retrieve all the binary chunks of
558 populate it. Then call `getchunks` to retrieve all the binary chunks of
559 data that compose the bundle2 container."""
559 data that compose the bundle2 container."""
560
560
561 _magicstring = 'HG20'
561 _magicstring = 'HG20'
562
562
563 def __init__(self, ui, capabilities=()):
563 def __init__(self, ui, capabilities=()):
564 self.ui = ui
564 self.ui = ui
565 self._params = []
565 self._params = []
566 self._parts = []
566 self._parts = []
567 self.capabilities = dict(capabilities)
567 self.capabilities = dict(capabilities)
568 self._compengine = util.compengines.forbundletype('UN')
568 self._compengine = util.compengines.forbundletype('UN')
569 self._compopts = None
569 self._compopts = None
570
570
571 def setcompression(self, alg, compopts=None):
571 def setcompression(self, alg, compopts=None):
572 """setup core part compression to <alg>"""
572 """setup core part compression to <alg>"""
573 if alg in (None, 'UN'):
573 if alg in (None, 'UN'):
574 return
574 return
575 assert not any(n.lower() == 'compression' for n, v in self._params)
575 assert not any(n.lower() == 'compression' for n, v in self._params)
576 self.addparam('Compression', alg)
576 self.addparam('Compression', alg)
577 self._compengine = util.compengines.forbundletype(alg)
577 self._compengine = util.compengines.forbundletype(alg)
578 self._compopts = compopts
578 self._compopts = compopts
579
579
580 @property
580 @property
581 def nbparts(self):
581 def nbparts(self):
582 """total number of parts added to the bundler"""
582 """total number of parts added to the bundler"""
583 return len(self._parts)
583 return len(self._parts)
584
584
585 # methods used to defines the bundle2 content
585 # methods used to defines the bundle2 content
586 def addparam(self, name, value=None):
586 def addparam(self, name, value=None):
587 """add a stream level parameter"""
587 """add a stream level parameter"""
588 if not name:
588 if not name:
589 raise ValueError('empty parameter name')
589 raise ValueError('empty parameter name')
590 if name[0] not in pycompat.bytestr(string.ascii_letters):
590 if name[0] not in pycompat.bytestr(string.ascii_letters):
591 raise ValueError('non letter first character: %r' % name)
591 raise ValueError('non letter first character: %r' % name)
592 self._params.append((name, value))
592 self._params.append((name, value))
593
593
594 def addpart(self, part):
594 def addpart(self, part):
595 """add a new part to the bundle2 container
595 """add a new part to the bundle2 container
596
596
597 Parts contains the actual applicative payload."""
597 Parts contains the actual applicative payload."""
598 assert part.id is None
598 assert part.id is None
599 part.id = len(self._parts) # very cheap counter
599 part.id = len(self._parts) # very cheap counter
600 self._parts.append(part)
600 self._parts.append(part)
601
601
602 def newpart(self, typeid, *args, **kwargs):
602 def newpart(self, typeid, *args, **kwargs):
603 """create a new part and add it to the containers
603 """create a new part and add it to the containers
604
604
605 As the part is directly added to the containers. For now, this means
605 As the part is directly added to the containers. For now, this means
606 that any failure to properly initialize the part after calling
606 that any failure to properly initialize the part after calling
607 ``newpart`` should result in a failure of the whole bundling process.
607 ``newpart`` should result in a failure of the whole bundling process.
608
608
609 You can still fall back to manually create and add if you need better
609 You can still fall back to manually create and add if you need better
610 control."""
610 control."""
611 part = bundlepart(typeid, *args, **kwargs)
611 part = bundlepart(typeid, *args, **kwargs)
612 self.addpart(part)
612 self.addpart(part)
613 return part
613 return part
614
614
615 # methods used to generate the bundle2 stream
615 # methods used to generate the bundle2 stream
616 def getchunks(self):
616 def getchunks(self):
617 if self.ui.debugflag:
617 if self.ui.debugflag:
618 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
618 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
619 if self._params:
619 if self._params:
620 msg.append(' (%i params)' % len(self._params))
620 msg.append(' (%i params)' % len(self._params))
621 msg.append(' %i parts total\n' % len(self._parts))
621 msg.append(' %i parts total\n' % len(self._parts))
622 self.ui.debug(''.join(msg))
622 self.ui.debug(''.join(msg))
623 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
623 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
624 yield self._magicstring
624 yield self._magicstring
625 param = self._paramchunk()
625 param = self._paramchunk()
626 outdebug(self.ui, 'bundle parameter: %s' % param)
626 outdebug(self.ui, 'bundle parameter: %s' % param)
627 yield _pack(_fstreamparamsize, len(param))
627 yield _pack(_fstreamparamsize, len(param))
628 if param:
628 if param:
629 yield param
629 yield param
630 for chunk in self._compengine.compressstream(self._getcorechunk(),
630 for chunk in self._compengine.compressstream(self._getcorechunk(),
631 self._compopts):
631 self._compopts):
632 yield chunk
632 yield chunk
633
633
634 def _paramchunk(self):
634 def _paramchunk(self):
635 """return a encoded version of all stream parameters"""
635 """return a encoded version of all stream parameters"""
636 blocks = []
636 blocks = []
637 for par, value in self._params:
637 for par, value in self._params:
638 par = urlreq.quote(par)
638 par = urlreq.quote(par)
639 if value is not None:
639 if value is not None:
640 value = urlreq.quote(value)
640 value = urlreq.quote(value)
641 par = '%s=%s' % (par, value)
641 par = '%s=%s' % (par, value)
642 blocks.append(par)
642 blocks.append(par)
643 return ' '.join(blocks)
643 return ' '.join(blocks)
644
644
645 def _getcorechunk(self):
645 def _getcorechunk(self):
646 """yield chunk for the core part of the bundle
646 """yield chunk for the core part of the bundle
647
647
648 (all but headers and parameters)"""
648 (all but headers and parameters)"""
649 outdebug(self.ui, 'start of parts')
649 outdebug(self.ui, 'start of parts')
650 for part in self._parts:
650 for part in self._parts:
651 outdebug(self.ui, 'bundle part: "%s"' % part.type)
651 outdebug(self.ui, 'bundle part: "%s"' % part.type)
652 for chunk in part.getchunks(ui=self.ui):
652 for chunk in part.getchunks(ui=self.ui):
653 yield chunk
653 yield chunk
654 outdebug(self.ui, 'end of bundle')
654 outdebug(self.ui, 'end of bundle')
655 yield _pack(_fpartheadersize, 0)
655 yield _pack(_fpartheadersize, 0)
656
656
657
657
658 def salvageoutput(self):
658 def salvageoutput(self):
659 """return a list with a copy of all output parts in the bundle
659 """return a list with a copy of all output parts in the bundle
660
660
661 This is meant to be used during error handling to make sure we preserve
661 This is meant to be used during error handling to make sure we preserve
662 server output"""
662 server output"""
663 salvaged = []
663 salvaged = []
664 for part in self._parts:
664 for part in self._parts:
665 if part.type.startswith('output'):
665 if part.type.startswith('output'):
666 salvaged.append(part.copy())
666 salvaged.append(part.copy())
667 return salvaged
667 return salvaged
668
668
669
669
670 class unpackermixin(object):
670 class unpackermixin(object):
671 """A mixin to extract bytes and struct data from a stream"""
671 """A mixin to extract bytes and struct data from a stream"""
672
672
673 def __init__(self, fp):
673 def __init__(self, fp):
674 self._fp = fp
674 self._fp = fp
675
675
676 def _unpack(self, format):
676 def _unpack(self, format):
677 """unpack this struct format from the stream
677 """unpack this struct format from the stream
678
678
679 This method is meant for internal usage by the bundle2 protocol only.
679 This method is meant for internal usage by the bundle2 protocol only.
680 They directly manipulate the low level stream including bundle2 level
680 They directly manipulate the low level stream including bundle2 level
681 instruction.
681 instruction.
682
682
683 Do not use it to implement higher-level logic or methods."""
683 Do not use it to implement higher-level logic or methods."""
684 data = self._readexact(struct.calcsize(format))
684 data = self._readexact(struct.calcsize(format))
685 return _unpack(format, data)
685 return _unpack(format, data)
686
686
687 def _readexact(self, size):
687 def _readexact(self, size):
688 """read exactly <size> bytes from the stream
688 """read exactly <size> bytes from the stream
689
689
690 This method is meant for internal usage by the bundle2 protocol only.
690 This method is meant for internal usage by the bundle2 protocol only.
691 They directly manipulate the low level stream including bundle2 level
691 They directly manipulate the low level stream including bundle2 level
692 instruction.
692 instruction.
693
693
694 Do not use it to implement higher-level logic or methods."""
694 Do not use it to implement higher-level logic or methods."""
695 return changegroup.readexactly(self._fp, size)
695 return changegroup.readexactly(self._fp, size)
696
696
697 def getunbundler(ui, fp, magicstring=None):
697 def getunbundler(ui, fp, magicstring=None):
698 """return a valid unbundler object for a given magicstring"""
698 """return a valid unbundler object for a given magicstring"""
699 if magicstring is None:
699 if magicstring is None:
700 magicstring = changegroup.readexactly(fp, 4)
700 magicstring = changegroup.readexactly(fp, 4)
701 magic, version = magicstring[0:2], magicstring[2:4]
701 magic, version = magicstring[0:2], magicstring[2:4]
702 if magic != 'HG':
702 if magic != 'HG':
703 ui.debug(
703 ui.debug(
704 "error: invalid magic: %r (version %r), should be 'HG'\n"
704 "error: invalid magic: %r (version %r), should be 'HG'\n"
705 % (magic, version))
705 % (magic, version))
706 raise error.Abort(_('not a Mercurial bundle'))
706 raise error.Abort(_('not a Mercurial bundle'))
707 unbundlerclass = formatmap.get(version)
707 unbundlerclass = formatmap.get(version)
708 if unbundlerclass is None:
708 if unbundlerclass is None:
709 raise error.Abort(_('unknown bundle version %s') % version)
709 raise error.Abort(_('unknown bundle version %s') % version)
710 unbundler = unbundlerclass(ui, fp)
710 unbundler = unbundlerclass(ui, fp)
711 indebug(ui, 'start processing of %s stream' % magicstring)
711 indebug(ui, 'start processing of %s stream' % magicstring)
712 return unbundler
712 return unbundler
713
713
714 class unbundle20(unpackermixin):
714 class unbundle20(unpackermixin):
715 """interpret a bundle2 stream
715 """interpret a bundle2 stream
716
716
717 This class is fed with a binary stream and yields parts through its
717 This class is fed with a binary stream and yields parts through its
718 `iterparts` methods."""
718 `iterparts` methods."""
719
719
720 _magicstring = 'HG20'
720 _magicstring = 'HG20'
721
721
722 def __init__(self, ui, fp):
722 def __init__(self, ui, fp):
723 """If header is specified, we do not read it out of the stream."""
723 """If header is specified, we do not read it out of the stream."""
724 self.ui = ui
724 self.ui = ui
725 self._compengine = util.compengines.forbundletype('UN')
725 self._compengine = util.compengines.forbundletype('UN')
726 self._compressed = None
726 self._compressed = None
727 super(unbundle20, self).__init__(fp)
727 super(unbundle20, self).__init__(fp)
728
728
729 @util.propertycache
729 @util.propertycache
730 def params(self):
730 def params(self):
731 """dictionary of stream level parameters"""
731 """dictionary of stream level parameters"""
732 indebug(self.ui, 'reading bundle2 stream parameters')
732 indebug(self.ui, 'reading bundle2 stream parameters')
733 params = {}
733 params = {}
734 paramssize = self._unpack(_fstreamparamsize)[0]
734 paramssize = self._unpack(_fstreamparamsize)[0]
735 if paramssize < 0:
735 if paramssize < 0:
736 raise error.BundleValueError('negative bundle param size: %i'
736 raise error.BundleValueError('negative bundle param size: %i'
737 % paramssize)
737 % paramssize)
738 if paramssize:
738 if paramssize:
739 params = self._readexact(paramssize)
739 params = self._readexact(paramssize)
740 params = self._processallparams(params)
740 params = self._processallparams(params)
741 return params
741 return params
742
742
743 def _processallparams(self, paramsblock):
743 def _processallparams(self, paramsblock):
744 """"""
744 """"""
745 params = util.sortdict()
745 params = util.sortdict()
746 for p in paramsblock.split(' '):
746 for p in paramsblock.split(' '):
747 p = p.split('=', 1)
747 p = p.split('=', 1)
748 p = [urlreq.unquote(i) for i in p]
748 p = [urlreq.unquote(i) for i in p]
749 if len(p) < 2:
749 if len(p) < 2:
750 p.append(None)
750 p.append(None)
751 self._processparam(*p)
751 self._processparam(*p)
752 params[p[0]] = p[1]
752 params[p[0]] = p[1]
753 return params
753 return params
754
754
755
755
756 def _processparam(self, name, value):
756 def _processparam(self, name, value):
757 """process a parameter, applying its effect if needed
757 """process a parameter, applying its effect if needed
758
758
759 Parameter starting with a lower case letter are advisory and will be
759 Parameter starting with a lower case letter are advisory and will be
760 ignored when unknown. Those starting with an upper case letter are
760 ignored when unknown. Those starting with an upper case letter are
761 mandatory and will this function will raise a KeyError when unknown.
761 mandatory and will this function will raise a KeyError when unknown.
762
762
763 Note: no option are currently supported. Any input will be either
763 Note: no option are currently supported. Any input will be either
764 ignored or failing.
764 ignored or failing.
765 """
765 """
766 if not name:
766 if not name:
767 raise ValueError('empty parameter name')
767 raise ValueError('empty parameter name')
768 if name[0] not in pycompat.bytestr(string.ascii_letters):
768 if name[0] not in pycompat.bytestr(string.ascii_letters):
769 raise ValueError('non letter first character: %r' % name)
769 raise ValueError('non letter first character: %r' % name)
770 try:
770 try:
771 handler = b2streamparamsmap[name.lower()]
771 handler = b2streamparamsmap[name.lower()]
772 except KeyError:
772 except KeyError:
773 if name[0].islower():
773 if name[0].islower():
774 indebug(self.ui, "ignoring unknown parameter %r" % name)
774 indebug(self.ui, "ignoring unknown parameter %r" % name)
775 else:
775 else:
776 raise error.BundleUnknownFeatureError(params=(name,))
776 raise error.BundleUnknownFeatureError(params=(name,))
777 else:
777 else:
778 handler(self, name, value)
778 handler(self, name, value)
779
779
780 def _forwardchunks(self):
780 def _forwardchunks(self):
781 """utility to transfer a bundle2 as binary
781 """utility to transfer a bundle2 as binary
782
782
783 This is made necessary by the fact the 'getbundle' command over 'ssh'
783 This is made necessary by the fact the 'getbundle' command over 'ssh'
784 have no way to know then the reply end, relying on the bundle to be
784 have no way to know then the reply end, relying on the bundle to be
785 interpreted to know its end. This is terrible and we are sorry, but we
785 interpreted to know its end. This is terrible and we are sorry, but we
786 needed to move forward to get general delta enabled.
786 needed to move forward to get general delta enabled.
787 """
787 """
788 yield self._magicstring
788 yield self._magicstring
789 assert 'params' not in vars(self)
789 assert 'params' not in vars(self)
790 paramssize = self._unpack(_fstreamparamsize)[0]
790 paramssize = self._unpack(_fstreamparamsize)[0]
791 if paramssize < 0:
791 if paramssize < 0:
792 raise error.BundleValueError('negative bundle param size: %i'
792 raise error.BundleValueError('negative bundle param size: %i'
793 % paramssize)
793 % paramssize)
794 yield _pack(_fstreamparamsize, paramssize)
794 yield _pack(_fstreamparamsize, paramssize)
795 if paramssize:
795 if paramssize:
796 params = self._readexact(paramssize)
796 params = self._readexact(paramssize)
797 self._processallparams(params)
797 self._processallparams(params)
798 yield params
798 yield params
799 assert self._compengine.bundletype == 'UN'
799 assert self._compengine.bundletype == 'UN'
800 # From there, payload might need to be decompressed
800 # From there, payload might need to be decompressed
801 self._fp = self._compengine.decompressorreader(self._fp)
801 self._fp = self._compengine.decompressorreader(self._fp)
802 emptycount = 0
802 emptycount = 0
803 while emptycount < 2:
803 while emptycount < 2:
804 # so we can brainlessly loop
804 # so we can brainlessly loop
805 assert _fpartheadersize == _fpayloadsize
805 assert _fpartheadersize == _fpayloadsize
806 size = self._unpack(_fpartheadersize)[0]
806 size = self._unpack(_fpartheadersize)[0]
807 yield _pack(_fpartheadersize, size)
807 yield _pack(_fpartheadersize, size)
808 if size:
808 if size:
809 emptycount = 0
809 emptycount = 0
810 else:
810 else:
811 emptycount += 1
811 emptycount += 1
812 continue
812 continue
813 if size == flaginterrupt:
813 if size == flaginterrupt:
814 continue
814 continue
815 elif size < 0:
815 elif size < 0:
816 raise error.BundleValueError('negative chunk size: %i')
816 raise error.BundleValueError('negative chunk size: %i')
817 yield self._readexact(size)
817 yield self._readexact(size)
818
818
819
819
820 def iterparts(self):
820 def iterparts(self):
821 """yield all parts contained in the stream"""
821 """yield all parts contained in the stream"""
822 # make sure param have been loaded
822 # make sure param have been loaded
823 self.params
823 self.params
824 # From there, payload need to be decompressed
824 # From there, payload need to be decompressed
825 self._fp = self._compengine.decompressorreader(self._fp)
825 self._fp = self._compengine.decompressorreader(self._fp)
826 indebug(self.ui, 'start extraction of bundle2 parts')
826 indebug(self.ui, 'start extraction of bundle2 parts')
827 headerblock = self._readpartheader()
827 headerblock = self._readpartheader()
828 while headerblock is not None:
828 while headerblock is not None:
829 part = unbundlepart(self.ui, headerblock, self._fp)
829 part = unbundlepart(self.ui, headerblock, self._fp)
830 yield part
830 yield part
831 part.seek(0, 2)
831 part.seek(0, 2)
832 headerblock = self._readpartheader()
832 headerblock = self._readpartheader()
833 indebug(self.ui, 'end of bundle2 stream')
833 indebug(self.ui, 'end of bundle2 stream')
834
834
835 def _readpartheader(self):
835 def _readpartheader(self):
836 """reads a part header size and return the bytes blob
836 """reads a part header size and return the bytes blob
837
837
838 returns None if empty"""
838 returns None if empty"""
839 headersize = self._unpack(_fpartheadersize)[0]
839 headersize = self._unpack(_fpartheadersize)[0]
840 if headersize < 0:
840 if headersize < 0:
841 raise error.BundleValueError('negative part header size: %i'
841 raise error.BundleValueError('negative part header size: %i'
842 % headersize)
842 % headersize)
843 indebug(self.ui, 'part header size: %i' % headersize)
843 indebug(self.ui, 'part header size: %i' % headersize)
844 if headersize:
844 if headersize:
845 return self._readexact(headersize)
845 return self._readexact(headersize)
846 return None
846 return None
847
847
848 def compressed(self):
848 def compressed(self):
849 self.params # load params
849 self.params # load params
850 return self._compressed
850 return self._compressed
851
851
852 def close(self):
852 def close(self):
853 """close underlying file"""
853 """close underlying file"""
854 if util.safehasattr(self._fp, 'close'):
854 if util.safehasattr(self._fp, 'close'):
855 return self._fp.close()
855 return self._fp.close()
856
856
857 formatmap = {'20': unbundle20}
857 formatmap = {'20': unbundle20}
858
858
859 b2streamparamsmap = {}
859 b2streamparamsmap = {}
860
860
861 def b2streamparamhandler(name):
861 def b2streamparamhandler(name):
862 """register a handler for a stream level parameter"""
862 """register a handler for a stream level parameter"""
863 def decorator(func):
863 def decorator(func):
864 assert name not in formatmap
864 assert name not in formatmap
865 b2streamparamsmap[name] = func
865 b2streamparamsmap[name] = func
866 return func
866 return func
867 return decorator
867 return decorator
868
868
869 @b2streamparamhandler('compression')
869 @b2streamparamhandler('compression')
870 def processcompression(unbundler, param, value):
870 def processcompression(unbundler, param, value):
871 """read compression parameter and install payload decompression"""
871 """read compression parameter and install payload decompression"""
872 if value not in util.compengines.supportedbundletypes:
872 if value not in util.compengines.supportedbundletypes:
873 raise error.BundleUnknownFeatureError(params=(param,),
873 raise error.BundleUnknownFeatureError(params=(param,),
874 values=(value,))
874 values=(value,))
875 unbundler._compengine = util.compengines.forbundletype(value)
875 unbundler._compengine = util.compengines.forbundletype(value)
876 if value is not None:
876 if value is not None:
877 unbundler._compressed = True
877 unbundler._compressed = True
878
878
879 class bundlepart(object):
879 class bundlepart(object):
880 """A bundle2 part contains application level payload
880 """A bundle2 part contains application level payload
881
881
882 The part `type` is used to route the part to the application level
882 The part `type` is used to route the part to the application level
883 handler.
883 handler.
884
884
885 The part payload is contained in ``part.data``. It could be raw bytes or a
885 The part payload is contained in ``part.data``. It could be raw bytes or a
886 generator of byte chunks.
886 generator of byte chunks.
887
887
888 You can add parameters to the part using the ``addparam`` method.
888 You can add parameters to the part using the ``addparam`` method.
889 Parameters can be either mandatory (default) or advisory. Remote side
889 Parameters can be either mandatory (default) or advisory. Remote side
890 should be able to safely ignore the advisory ones.
890 should be able to safely ignore the advisory ones.
891
891
892 Both data and parameters cannot be modified after the generation has begun.
892 Both data and parameters cannot be modified after the generation has begun.
893 """
893 """
894
894
895 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
895 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
896 data='', mandatory=True):
896 data='', mandatory=True):
897 validateparttype(parttype)
897 validateparttype(parttype)
898 self.id = None
898 self.id = None
899 self.type = parttype
899 self.type = parttype
900 self._data = data
900 self._data = data
901 self._mandatoryparams = list(mandatoryparams)
901 self._mandatoryparams = list(mandatoryparams)
902 self._advisoryparams = list(advisoryparams)
902 self._advisoryparams = list(advisoryparams)
903 # checking for duplicated entries
903 # checking for duplicated entries
904 self._seenparams = set()
904 self._seenparams = set()
905 for pname, __ in self._mandatoryparams + self._advisoryparams:
905 for pname, __ in self._mandatoryparams + self._advisoryparams:
906 if pname in self._seenparams:
906 if pname in self._seenparams:
907 raise error.ProgrammingError('duplicated params: %s' % pname)
907 raise error.ProgrammingError('duplicated params: %s' % pname)
908 self._seenparams.add(pname)
908 self._seenparams.add(pname)
909 # status of the part's generation:
909 # status of the part's generation:
910 # - None: not started,
910 # - None: not started,
911 # - False: currently generated,
911 # - False: currently generated,
912 # - True: generation done.
912 # - True: generation done.
913 self._generated = None
913 self._generated = None
914 self.mandatory = mandatory
914 self.mandatory = mandatory
915
915
916 def __repr__(self):
916 def __repr__(self):
917 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
917 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
918 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
918 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
919 % (cls, id(self), self.id, self.type, self.mandatory))
919 % (cls, id(self), self.id, self.type, self.mandatory))
920
920
921 def copy(self):
921 def copy(self):
922 """return a copy of the part
922 """return a copy of the part
923
923
924 The new part have the very same content but no partid assigned yet.
924 The new part have the very same content but no partid assigned yet.
925 Parts with generated data cannot be copied."""
925 Parts with generated data cannot be copied."""
926 assert not util.safehasattr(self.data, 'next')
926 assert not util.safehasattr(self.data, 'next')
927 return self.__class__(self.type, self._mandatoryparams,
927 return self.__class__(self.type, self._mandatoryparams,
928 self._advisoryparams, self._data, self.mandatory)
928 self._advisoryparams, self._data, self.mandatory)
929
929
930 # methods used to defines the part content
930 # methods used to defines the part content
931 @property
931 @property
932 def data(self):
932 def data(self):
933 return self._data
933 return self._data
934
934
935 @data.setter
935 @data.setter
936 def data(self, data):
936 def data(self, data):
937 if self._generated is not None:
937 if self._generated is not None:
938 raise error.ReadOnlyPartError('part is being generated')
938 raise error.ReadOnlyPartError('part is being generated')
939 self._data = data
939 self._data = data
940
940
941 @property
941 @property
942 def mandatoryparams(self):
942 def mandatoryparams(self):
943 # make it an immutable tuple to force people through ``addparam``
943 # make it an immutable tuple to force people through ``addparam``
944 return tuple(self._mandatoryparams)
944 return tuple(self._mandatoryparams)
945
945
946 @property
946 @property
947 def advisoryparams(self):
947 def advisoryparams(self):
948 # make it an immutable tuple to force people through ``addparam``
948 # make it an immutable tuple to force people through ``addparam``
949 return tuple(self._advisoryparams)
949 return tuple(self._advisoryparams)
950
950
951 def addparam(self, name, value='', mandatory=True):
951 def addparam(self, name, value='', mandatory=True):
952 """add a parameter to the part
952 """add a parameter to the part
953
953
954 If 'mandatory' is set to True, the remote handler must claim support
954 If 'mandatory' is set to True, the remote handler must claim support
955 for this parameter or the unbundling will be aborted.
955 for this parameter or the unbundling will be aborted.
956
956
957 The 'name' and 'value' cannot exceed 255 bytes each.
957 The 'name' and 'value' cannot exceed 255 bytes each.
958 """
958 """
959 if self._generated is not None:
959 if self._generated is not None:
960 raise error.ReadOnlyPartError('part is being generated')
960 raise error.ReadOnlyPartError('part is being generated')
961 if name in self._seenparams:
961 if name in self._seenparams:
962 raise ValueError('duplicated params: %s' % name)
962 raise ValueError('duplicated params: %s' % name)
963 self._seenparams.add(name)
963 self._seenparams.add(name)
964 params = self._advisoryparams
964 params = self._advisoryparams
965 if mandatory:
965 if mandatory:
966 params = self._mandatoryparams
966 params = self._mandatoryparams
967 params.append((name, value))
967 params.append((name, value))
968
968
969 # methods used to generates the bundle2 stream
969 # methods used to generates the bundle2 stream
970 def getchunks(self, ui):
970 def getchunks(self, ui):
971 if self._generated is not None:
971 if self._generated is not None:
972 raise error.ProgrammingError('part can only be consumed once')
972 raise error.ProgrammingError('part can only be consumed once')
973 self._generated = False
973 self._generated = False
974
974
975 if ui.debugflag:
975 if ui.debugflag:
976 msg = ['bundle2-output-part: "%s"' % self.type]
976 msg = ['bundle2-output-part: "%s"' % self.type]
977 if not self.mandatory:
977 if not self.mandatory:
978 msg.append(' (advisory)')
978 msg.append(' (advisory)')
979 nbmp = len(self.mandatoryparams)
979 nbmp = len(self.mandatoryparams)
980 nbap = len(self.advisoryparams)
980 nbap = len(self.advisoryparams)
981 if nbmp or nbap:
981 if nbmp or nbap:
982 msg.append(' (params:')
982 msg.append(' (params:')
983 if nbmp:
983 if nbmp:
984 msg.append(' %i mandatory' % nbmp)
984 msg.append(' %i mandatory' % nbmp)
985 if nbap:
985 if nbap:
986 msg.append(' %i advisory' % nbmp)
986 msg.append(' %i advisory' % nbmp)
987 msg.append(')')
987 msg.append(')')
988 if not self.data:
988 if not self.data:
989 msg.append(' empty payload')
989 msg.append(' empty payload')
990 elif util.safehasattr(self.data, 'next'):
990 elif util.safehasattr(self.data, 'next'):
991 msg.append(' streamed payload')
991 msg.append(' streamed payload')
992 else:
992 else:
993 msg.append(' %i bytes payload' % len(self.data))
993 msg.append(' %i bytes payload' % len(self.data))
994 msg.append('\n')
994 msg.append('\n')
995 ui.debug(''.join(msg))
995 ui.debug(''.join(msg))
996
996
997 #### header
997 #### header
998 if self.mandatory:
998 if self.mandatory:
999 parttype = self.type.upper()
999 parttype = self.type.upper()
1000 else:
1000 else:
1001 parttype = self.type.lower()
1001 parttype = self.type.lower()
1002 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1002 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1003 ## parttype
1003 ## parttype
1004 header = [_pack(_fparttypesize, len(parttype)),
1004 header = [_pack(_fparttypesize, len(parttype)),
1005 parttype, _pack(_fpartid, self.id),
1005 parttype, _pack(_fpartid, self.id),
1006 ]
1006 ]
1007 ## parameters
1007 ## parameters
1008 # count
1008 # count
1009 manpar = self.mandatoryparams
1009 manpar = self.mandatoryparams
1010 advpar = self.advisoryparams
1010 advpar = self.advisoryparams
1011 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1011 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1012 # size
1012 # size
1013 parsizes = []
1013 parsizes = []
1014 for key, value in manpar:
1014 for key, value in manpar:
1015 parsizes.append(len(key))
1015 parsizes.append(len(key))
1016 parsizes.append(len(value))
1016 parsizes.append(len(value))
1017 for key, value in advpar:
1017 for key, value in advpar:
1018 parsizes.append(len(key))
1018 parsizes.append(len(key))
1019 parsizes.append(len(value))
1019 parsizes.append(len(value))
1020 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1020 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1021 header.append(paramsizes)
1021 header.append(paramsizes)
1022 # key, value
1022 # key, value
1023 for key, value in manpar:
1023 for key, value in manpar:
1024 header.append(key)
1024 header.append(key)
1025 header.append(value)
1025 header.append(value)
1026 for key, value in advpar:
1026 for key, value in advpar:
1027 header.append(key)
1027 header.append(key)
1028 header.append(value)
1028 header.append(value)
1029 ## finalize header
1029 ## finalize header
1030 headerchunk = ''.join(header)
1030 headerchunk = ''.join(header)
1031 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1031 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1032 yield _pack(_fpartheadersize, len(headerchunk))
1032 yield _pack(_fpartheadersize, len(headerchunk))
1033 yield headerchunk
1033 yield headerchunk
1034 ## payload
1034 ## payload
1035 try:
1035 try:
1036 for chunk in self._payloadchunks():
1036 for chunk in self._payloadchunks():
1037 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1037 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1038 yield _pack(_fpayloadsize, len(chunk))
1038 yield _pack(_fpayloadsize, len(chunk))
1039 yield chunk
1039 yield chunk
1040 except GeneratorExit:
1040 except GeneratorExit:
1041 # GeneratorExit means that nobody is listening for our
1041 # GeneratorExit means that nobody is listening for our
1042 # results anyway, so just bail quickly rather than trying
1042 # results anyway, so just bail quickly rather than trying
1043 # to produce an error part.
1043 # to produce an error part.
1044 ui.debug('bundle2-generatorexit\n')
1044 ui.debug('bundle2-generatorexit\n')
1045 raise
1045 raise
1046 except BaseException as exc:
1046 except BaseException as exc:
1047 bexc = util.forcebytestr(exc)
1047 bexc = util.forcebytestr(exc)
1048 # backup exception data for later
1048 # backup exception data for later
1049 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1049 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1050 % bexc)
1050 % bexc)
1051 tb = sys.exc_info()[2]
1051 tb = sys.exc_info()[2]
1052 msg = 'unexpected error: %s' % bexc
1052 msg = 'unexpected error: %s' % bexc
1053 interpart = bundlepart('error:abort', [('message', msg)],
1053 interpart = bundlepart('error:abort', [('message', msg)],
1054 mandatory=False)
1054 mandatory=False)
1055 interpart.id = 0
1055 interpart.id = 0
1056 yield _pack(_fpayloadsize, -1)
1056 yield _pack(_fpayloadsize, -1)
1057 for chunk in interpart.getchunks(ui=ui):
1057 for chunk in interpart.getchunks(ui=ui):
1058 yield chunk
1058 yield chunk
1059 outdebug(ui, 'closing payload chunk')
1059 outdebug(ui, 'closing payload chunk')
1060 # abort current part payload
1060 # abort current part payload
1061 yield _pack(_fpayloadsize, 0)
1061 yield _pack(_fpayloadsize, 0)
1062 pycompat.raisewithtb(exc, tb)
1062 pycompat.raisewithtb(exc, tb)
1063 # end of payload
1063 # end of payload
1064 outdebug(ui, 'closing payload chunk')
1064 outdebug(ui, 'closing payload chunk')
1065 yield _pack(_fpayloadsize, 0)
1065 yield _pack(_fpayloadsize, 0)
1066 self._generated = True
1066 self._generated = True
1067
1067
1068 def _payloadchunks(self):
1068 def _payloadchunks(self):
1069 """yield chunks of a the part payload
1069 """yield chunks of a the part payload
1070
1070
1071 Exists to handle the different methods to provide data to a part."""
1071 Exists to handle the different methods to provide data to a part."""
1072 # we only support fixed size data now.
1072 # we only support fixed size data now.
1073 # This will be improved in the future.
1073 # This will be improved in the future.
1074 if (util.safehasattr(self.data, 'next')
1074 if (util.safehasattr(self.data, 'next')
1075 or util.safehasattr(self.data, '__next__')):
1075 or util.safehasattr(self.data, '__next__')):
1076 buff = util.chunkbuffer(self.data)
1076 buff = util.chunkbuffer(self.data)
1077 chunk = buff.read(preferedchunksize)
1077 chunk = buff.read(preferedchunksize)
1078 while chunk:
1078 while chunk:
1079 yield chunk
1079 yield chunk
1080 chunk = buff.read(preferedchunksize)
1080 chunk = buff.read(preferedchunksize)
1081 elif len(self.data):
1081 elif len(self.data):
1082 yield self.data
1082 yield self.data
1083
1083
1084
1084
1085 flaginterrupt = -1
1085 flaginterrupt = -1
1086
1086
1087 class interrupthandler(unpackermixin):
1087 class interrupthandler(unpackermixin):
1088 """read one part and process it with restricted capability
1088 """read one part and process it with restricted capability
1089
1089
1090 This allows to transmit exception raised on the producer size during part
1090 This allows to transmit exception raised on the producer size during part
1091 iteration while the consumer is reading a part.
1091 iteration while the consumer is reading a part.
1092
1092
1093 Part processed in this manner only have access to a ui object,"""
1093 Part processed in this manner only have access to a ui object,"""
1094
1094
1095 def __init__(self, ui, fp):
1095 def __init__(self, ui, fp):
1096 super(interrupthandler, self).__init__(fp)
1096 super(interrupthandler, self).__init__(fp)
1097 self.ui = ui
1097 self.ui = ui
1098
1098
1099 def _readpartheader(self):
1099 def _readpartheader(self):
1100 """reads a part header size and return the bytes blob
1100 """reads a part header size and return the bytes blob
1101
1101
1102 returns None if empty"""
1102 returns None if empty"""
1103 headersize = self._unpack(_fpartheadersize)[0]
1103 headersize = self._unpack(_fpartheadersize)[0]
1104 if headersize < 0:
1104 if headersize < 0:
1105 raise error.BundleValueError('negative part header size: %i'
1105 raise error.BundleValueError('negative part header size: %i'
1106 % headersize)
1106 % headersize)
1107 indebug(self.ui, 'part header size: %i\n' % headersize)
1107 indebug(self.ui, 'part header size: %i\n' % headersize)
1108 if headersize:
1108 if headersize:
1109 return self._readexact(headersize)
1109 return self._readexact(headersize)
1110 return None
1110 return None
1111
1111
1112 def __call__(self):
1112 def __call__(self):
1113
1113
1114 self.ui.debug('bundle2-input-stream-interrupt:'
1114 self.ui.debug('bundle2-input-stream-interrupt:'
1115 ' opening out of band context\n')
1115 ' opening out of band context\n')
1116 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1116 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1117 headerblock = self._readpartheader()
1117 headerblock = self._readpartheader()
1118 if headerblock is None:
1118 if headerblock is None:
1119 indebug(self.ui, 'no part found during interruption.')
1119 indebug(self.ui, 'no part found during interruption.')
1120 return
1120 return
1121 part = unbundlepart(self.ui, headerblock, self._fp)
1121 part = unbundlepart(self.ui, headerblock, self._fp)
1122 op = interruptoperation(self.ui)
1122 op = interruptoperation(self.ui)
1123 _processpart(op, part)
1123 _processpart(op, part)
1124 self.ui.debug('bundle2-input-stream-interrupt:'
1124 self.ui.debug('bundle2-input-stream-interrupt:'
1125 ' closing out of band context\n')
1125 ' closing out of band context\n')
1126
1126
1127 class interruptoperation(object):
1127 class interruptoperation(object):
1128 """A limited operation to be use by part handler during interruption
1128 """A limited operation to be use by part handler during interruption
1129
1129
1130 It only have access to an ui object.
1130 It only have access to an ui object.
1131 """
1131 """
1132
1132
1133 def __init__(self, ui):
1133 def __init__(self, ui):
1134 self.ui = ui
1134 self.ui = ui
1135 self.reply = None
1135 self.reply = None
1136 self.captureoutput = False
1136 self.captureoutput = False
1137
1137
1138 @property
1138 @property
1139 def repo(self):
1139 def repo(self):
1140 raise error.ProgrammingError('no repo access from stream interruption')
1140 raise error.ProgrammingError('no repo access from stream interruption')
1141
1141
1142 def gettransaction(self):
1142 def gettransaction(self):
1143 raise TransactionUnavailable('no repo access from stream interruption')
1143 raise TransactionUnavailable('no repo access from stream interruption')
1144
1144
1145 class unbundlepart(unpackermixin):
1145 class unbundlepart(unpackermixin):
1146 """a bundle part read from a bundle"""
1146 """a bundle part read from a bundle"""
1147
1147
1148 def __init__(self, ui, header, fp):
1148 def __init__(self, ui, header, fp):
1149 super(unbundlepart, self).__init__(fp)
1149 super(unbundlepart, self).__init__(fp)
1150 self._seekable = (util.safehasattr(fp, 'seek') and
1150 self._seekable = (util.safehasattr(fp, 'seek') and
1151 util.safehasattr(fp, 'tell'))
1151 util.safehasattr(fp, 'tell'))
1152 self.ui = ui
1152 self.ui = ui
1153 # unbundle state attr
1153 # unbundle state attr
1154 self._headerdata = header
1154 self._headerdata = header
1155 self._headeroffset = 0
1155 self._headeroffset = 0
1156 self._initialized = False
1156 self._initialized = False
1157 self.consumed = False
1157 self.consumed = False
1158 # part data
1158 # part data
1159 self.id = None
1159 self.id = None
1160 self.type = None
1160 self.type = None
1161 self.mandatoryparams = None
1161 self.mandatoryparams = None
1162 self.advisoryparams = None
1162 self.advisoryparams = None
1163 self.params = None
1163 self.params = None
1164 self.mandatorykeys = ()
1164 self.mandatorykeys = ()
1165 self._payloadstream = None
1165 self._payloadstream = None
1166 self._readheader()
1166 self._readheader()
1167 self._mandatory = None
1167 self._mandatory = None
1168 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1168 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1169 self._pos = 0
1169 self._pos = 0
1170
1170
1171 def _fromheader(self, size):
1171 def _fromheader(self, size):
1172 """return the next <size> byte from the header"""
1172 """return the next <size> byte from the header"""
1173 offset = self._headeroffset
1173 offset = self._headeroffset
1174 data = self._headerdata[offset:(offset + size)]
1174 data = self._headerdata[offset:(offset + size)]
1175 self._headeroffset = offset + size
1175 self._headeroffset = offset + size
1176 return data
1176 return data
1177
1177
1178 def _unpackheader(self, format):
1178 def _unpackheader(self, format):
1179 """read given format from header
1179 """read given format from header
1180
1180
1181 This automatically compute the size of the format to read."""
1181 This automatically compute the size of the format to read."""
1182 data = self._fromheader(struct.calcsize(format))
1182 data = self._fromheader(struct.calcsize(format))
1183 return _unpack(format, data)
1183 return _unpack(format, data)
1184
1184
1185 def _initparams(self, mandatoryparams, advisoryparams):
1185 def _initparams(self, mandatoryparams, advisoryparams):
1186 """internal function to setup all logic related parameters"""
1186 """internal function to setup all logic related parameters"""
1187 # make it read only to prevent people touching it by mistake.
1187 # make it read only to prevent people touching it by mistake.
1188 self.mandatoryparams = tuple(mandatoryparams)
1188 self.mandatoryparams = tuple(mandatoryparams)
1189 self.advisoryparams = tuple(advisoryparams)
1189 self.advisoryparams = tuple(advisoryparams)
1190 # user friendly UI
1190 # user friendly UI
1191 self.params = util.sortdict(self.mandatoryparams)
1191 self.params = util.sortdict(self.mandatoryparams)
1192 self.params.update(self.advisoryparams)
1192 self.params.update(self.advisoryparams)
1193 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1193 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1194
1194
1195 def _payloadchunks(self, chunknum=0):
1195 def _payloadchunks(self, chunknum=0):
1196 '''seek to specified chunk and start yielding data'''
1196 '''seek to specified chunk and start yielding data'''
1197 if len(self._chunkindex) == 0:
1197 if len(self._chunkindex) == 0:
1198 assert chunknum == 0, 'Must start with chunk 0'
1198 assert chunknum == 0, 'Must start with chunk 0'
1199 self._chunkindex.append((0, self._tellfp()))
1199 self._chunkindex.append((0, self._tellfp()))
1200 else:
1200 else:
1201 assert chunknum < len(self._chunkindex), \
1201 assert chunknum < len(self._chunkindex), \
1202 'Unknown chunk %d' % chunknum
1202 'Unknown chunk %d' % chunknum
1203 self._seekfp(self._chunkindex[chunknum][1])
1203 self._seekfp(self._chunkindex[chunknum][1])
1204
1204
1205 pos = self._chunkindex[chunknum][0]
1205 pos = self._chunkindex[chunknum][0]
1206 payloadsize = self._unpack(_fpayloadsize)[0]
1206 payloadsize = self._unpack(_fpayloadsize)[0]
1207 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1207 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1208 while payloadsize:
1208 while payloadsize:
1209 if payloadsize == flaginterrupt:
1209 if payloadsize == flaginterrupt:
1210 # interruption detection, the handler will now read a
1210 # interruption detection, the handler will now read a
1211 # single part and process it.
1211 # single part and process it.
1212 interrupthandler(self.ui, self._fp)()
1212 interrupthandler(self.ui, self._fp)()
1213 elif payloadsize < 0:
1213 elif payloadsize < 0:
1214 msg = 'negative payload chunk size: %i' % payloadsize
1214 msg = 'negative payload chunk size: %i' % payloadsize
1215 raise error.BundleValueError(msg)
1215 raise error.BundleValueError(msg)
1216 else:
1216 else:
1217 result = self._readexact(payloadsize)
1217 result = self._readexact(payloadsize)
1218 chunknum += 1
1218 chunknum += 1
1219 pos += payloadsize
1219 pos += payloadsize
1220 if chunknum == len(self._chunkindex):
1220 if chunknum == len(self._chunkindex):
1221 self._chunkindex.append((pos, self._tellfp()))
1221 self._chunkindex.append((pos, self._tellfp()))
1222 yield result
1222 yield result
1223 payloadsize = self._unpack(_fpayloadsize)[0]
1223 payloadsize = self._unpack(_fpayloadsize)[0]
1224 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1224 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1225
1225
1226 def _findchunk(self, pos):
1226 def _findchunk(self, pos):
1227 '''for a given payload position, return a chunk number and offset'''
1227 '''for a given payload position, return a chunk number and offset'''
1228 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1228 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1229 if ppos == pos:
1229 if ppos == pos:
1230 return chunk, 0
1230 return chunk, 0
1231 elif ppos > pos:
1231 elif ppos > pos:
1232 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1232 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1233 raise ValueError('Unknown chunk')
1233 raise ValueError('Unknown chunk')
1234
1234
1235 def _readheader(self):
1235 def _readheader(self):
1236 """read the header and setup the object"""
1236 """read the header and setup the object"""
1237 typesize = self._unpackheader(_fparttypesize)[0]
1237 typesize = self._unpackheader(_fparttypesize)[0]
1238 self.type = self._fromheader(typesize)
1238 self.type = self._fromheader(typesize)
1239 indebug(self.ui, 'part type: "%s"' % self.type)
1239 indebug(self.ui, 'part type: "%s"' % self.type)
1240 self.id = self._unpackheader(_fpartid)[0]
1240 self.id = self._unpackheader(_fpartid)[0]
1241 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1241 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1242 # extract mandatory bit from type
1242 # extract mandatory bit from type
1243 self.mandatory = (self.type != self.type.lower())
1243 self.mandatory = (self.type != self.type.lower())
1244 self.type = self.type.lower()
1244 self.type = self.type.lower()
1245 ## reading parameters
1245 ## reading parameters
1246 # param count
1246 # param count
1247 mancount, advcount = self._unpackheader(_fpartparamcount)
1247 mancount, advcount = self._unpackheader(_fpartparamcount)
1248 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1248 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1249 # param size
1249 # param size
1250 fparamsizes = _makefpartparamsizes(mancount + advcount)
1250 fparamsizes = _makefpartparamsizes(mancount + advcount)
1251 paramsizes = self._unpackheader(fparamsizes)
1251 paramsizes = self._unpackheader(fparamsizes)
1252 # make it a list of couple again
1252 # make it a list of couple again
1253 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1253 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1254 # split mandatory from advisory
1254 # split mandatory from advisory
1255 mansizes = paramsizes[:mancount]
1255 mansizes = paramsizes[:mancount]
1256 advsizes = paramsizes[mancount:]
1256 advsizes = paramsizes[mancount:]
1257 # retrieve param value
1257 # retrieve param value
1258 manparams = []
1258 manparams = []
1259 for key, value in mansizes:
1259 for key, value in mansizes:
1260 manparams.append((self._fromheader(key), self._fromheader(value)))
1260 manparams.append((self._fromheader(key), self._fromheader(value)))
1261 advparams = []
1261 advparams = []
1262 for key, value in advsizes:
1262 for key, value in advsizes:
1263 advparams.append((self._fromheader(key), self._fromheader(value)))
1263 advparams.append((self._fromheader(key), self._fromheader(value)))
1264 self._initparams(manparams, advparams)
1264 self._initparams(manparams, advparams)
1265 ## part payload
1265 ## part payload
1266 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1266 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1267 # we read the data, tell it
1267 # we read the data, tell it
1268 self._initialized = True
1268 self._initialized = True
1269
1269
1270 def read(self, size=None):
1270 def read(self, size=None):
1271 """read payload data"""
1271 """read payload data"""
1272 if not self._initialized:
1272 if not self._initialized:
1273 self._readheader()
1273 self._readheader()
1274 if size is None:
1274 if size is None:
1275 data = self._payloadstream.read()
1275 data = self._payloadstream.read()
1276 else:
1276 else:
1277 data = self._payloadstream.read(size)
1277 data = self._payloadstream.read(size)
1278 self._pos += len(data)
1278 self._pos += len(data)
1279 if size is None or len(data) < size:
1279 if size is None or len(data) < size:
1280 if not self.consumed and self._pos:
1280 if not self.consumed and self._pos:
1281 self.ui.debug('bundle2-input-part: total payload size %i\n'
1281 self.ui.debug('bundle2-input-part: total payload size %i\n'
1282 % self._pos)
1282 % self._pos)
1283 self.consumed = True
1283 self.consumed = True
1284 return data
1284 return data
1285
1285
1286 def tell(self):
1286 def tell(self):
1287 return self._pos
1287 return self._pos
1288
1288
1289 def seek(self, offset, whence=0):
1289 def seek(self, offset, whence=0):
1290 if whence == 0:
1290 if whence == 0:
1291 newpos = offset
1291 newpos = offset
1292 elif whence == 1:
1292 elif whence == 1:
1293 newpos = self._pos + offset
1293 newpos = self._pos + offset
1294 elif whence == 2:
1294 elif whence == 2:
1295 if not self.consumed:
1295 if not self.consumed:
1296 self.read()
1296 self.read()
1297 newpos = self._chunkindex[-1][0] - offset
1297 newpos = self._chunkindex[-1][0] - offset
1298 else:
1298 else:
1299 raise ValueError('Unknown whence value: %r' % (whence,))
1299 raise ValueError('Unknown whence value: %r' % (whence,))
1300
1300
1301 if newpos > self._chunkindex[-1][0] and not self.consumed:
1301 if newpos > self._chunkindex[-1][0] and not self.consumed:
1302 self.read()
1302 self.read()
1303 if not 0 <= newpos <= self._chunkindex[-1][0]:
1303 if not 0 <= newpos <= self._chunkindex[-1][0]:
1304 raise ValueError('Offset out of range')
1304 raise ValueError('Offset out of range')
1305
1305
1306 if self._pos != newpos:
1306 if self._pos != newpos:
1307 chunk, internaloffset = self._findchunk(newpos)
1307 chunk, internaloffset = self._findchunk(newpos)
1308 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1308 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1309 adjust = self.read(internaloffset)
1309 adjust = self.read(internaloffset)
1310 if len(adjust) != internaloffset:
1310 if len(adjust) != internaloffset:
1311 raise error.Abort(_('Seek failed\n'))
1311 raise error.Abort(_('Seek failed\n'))
1312 self._pos = newpos
1312 self._pos = newpos
1313
1313
1314 def _seekfp(self, offset, whence=0):
1314 def _seekfp(self, offset, whence=0):
1315 """move the underlying file pointer
1315 """move the underlying file pointer
1316
1316
1317 This method is meant for internal usage by the bundle2 protocol only.
1317 This method is meant for internal usage by the bundle2 protocol only.
1318 They directly manipulate the low level stream including bundle2 level
1318 They directly manipulate the low level stream including bundle2 level
1319 instruction.
1319 instruction.
1320
1320
1321 Do not use it to implement higher-level logic or methods."""
1321 Do not use it to implement higher-level logic or methods."""
1322 if self._seekable:
1322 if self._seekable:
1323 return self._fp.seek(offset, whence)
1323 return self._fp.seek(offset, whence)
1324 else:
1324 else:
1325 raise NotImplementedError(_('File pointer is not seekable'))
1325 raise NotImplementedError(_('File pointer is not seekable'))
1326
1326
1327 def _tellfp(self):
1327 def _tellfp(self):
1328 """return the file offset, or None if file is not seekable
1328 """return the file offset, or None if file is not seekable
1329
1329
1330 This method is meant for internal usage by the bundle2 protocol only.
1330 This method is meant for internal usage by the bundle2 protocol only.
1331 They directly manipulate the low level stream including bundle2 level
1331 They directly manipulate the low level stream including bundle2 level
1332 instruction.
1332 instruction.
1333
1333
1334 Do not use it to implement higher-level logic or methods."""
1334 Do not use it to implement higher-level logic or methods."""
1335 if self._seekable:
1335 if self._seekable:
1336 try:
1336 try:
1337 return self._fp.tell()
1337 return self._fp.tell()
1338 except IOError as e:
1338 except IOError as e:
1339 if e.errno == errno.ESPIPE:
1339 if e.errno == errno.ESPIPE:
1340 self._seekable = False
1340 self._seekable = False
1341 else:
1341 else:
1342 raise
1342 raise
1343 return None
1343 return None
1344
1344
1345 # These are only the static capabilities.
1345 # These are only the static capabilities.
1346 # Check the 'getrepocaps' function for the rest.
1346 # Check the 'getrepocaps' function for the rest.
1347 capabilities = {'HG20': (),
1347 capabilities = {'HG20': (),
1348 'error': ('abort', 'unsupportedcontent', 'pushraced',
1348 'error': ('abort', 'unsupportedcontent', 'pushraced',
1349 'pushkey'),
1349 'pushkey'),
1350 'listkeys': (),
1350 'listkeys': (),
1351 'pushkey': (),
1351 'pushkey': (),
1352 'digests': tuple(sorted(util.DIGESTS.keys())),
1352 'digests': tuple(sorted(util.DIGESTS.keys())),
1353 'remote-changegroup': ('http', 'https'),
1353 'remote-changegroup': ('http', 'https'),
1354 'hgtagsfnodes': (),
1354 'hgtagsfnodes': (),
1355 }
1355 }
1356
1356
1357 def getrepocaps(repo, allowpushback=False):
1357 def getrepocaps(repo, allowpushback=False):
1358 """return the bundle2 capabilities for a given repo
1358 """return the bundle2 capabilities for a given repo
1359
1359
1360 Exists to allow extensions (like evolution) to mutate the capabilities.
1360 Exists to allow extensions (like evolution) to mutate the capabilities.
1361 """
1361 """
1362 caps = capabilities.copy()
1362 caps = capabilities.copy()
1363 caps['changegroup'] = tuple(sorted(
1363 caps['changegroup'] = tuple(sorted(
1364 changegroup.supportedincomingversions(repo)))
1364 changegroup.supportedincomingversions(repo)))
1365 if obsolete.isenabled(repo, obsolete.exchangeopt):
1365 if obsolete.isenabled(repo, obsolete.exchangeopt):
1366 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1366 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1367 caps['obsmarkers'] = supportedformat
1367 caps['obsmarkers'] = supportedformat
1368 if allowpushback:
1368 if allowpushback:
1369 caps['pushback'] = ()
1369 caps['pushback'] = ()
1370 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1370 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1371 if cpmode == 'check-related':
1371 if cpmode == 'check-related':
1372 caps['checkheads'] = ('related',)
1372 caps['checkheads'] = ('related',)
1373 return caps
1373 return caps
1374
1374
1375 def bundle2caps(remote):
1375 def bundle2caps(remote):
1376 """return the bundle capabilities of a peer as dict"""
1376 """return the bundle capabilities of a peer as dict"""
1377 raw = remote.capable('bundle2')
1377 raw = remote.capable('bundle2')
1378 if not raw and raw != '':
1378 if not raw and raw != '':
1379 return {}
1379 return {}
1380 capsblob = urlreq.unquote(remote.capable('bundle2'))
1380 capsblob = urlreq.unquote(remote.capable('bundle2'))
1381 return decodecaps(capsblob)
1381 return decodecaps(capsblob)
1382
1382
1383 def obsmarkersversion(caps):
1383 def obsmarkersversion(caps):
1384 """extract the list of supported obsmarkers versions from a bundle2caps dict
1384 """extract the list of supported obsmarkers versions from a bundle2caps dict
1385 """
1385 """
1386 obscaps = caps.get('obsmarkers', ())
1386 obscaps = caps.get('obsmarkers', ())
1387 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1387 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1388
1388
1389 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1389 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1390 vfs=None, compression=None, compopts=None):
1390 vfs=None, compression=None, compopts=None):
1391 if bundletype.startswith('HG10'):
1391 if bundletype.startswith('HG10'):
1392 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1392 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1393 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1393 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1394 compression=compression, compopts=compopts)
1394 compression=compression, compopts=compopts)
1395 elif not bundletype.startswith('HG20'):
1395 elif not bundletype.startswith('HG20'):
1396 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1396 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1397
1397
1398 caps = {}
1398 caps = {}
1399 if 'obsolescence' in opts:
1399 if 'obsolescence' in opts:
1400 caps['obsmarkers'] = ('V1',)
1400 caps['obsmarkers'] = ('V1',)
1401 bundle = bundle20(ui, caps)
1401 bundle = bundle20(ui, caps)
1402 bundle.setcompression(compression, compopts)
1402 bundle.setcompression(compression, compopts)
1403 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1403 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1404 chunkiter = bundle.getchunks()
1404 chunkiter = bundle.getchunks()
1405
1405
1406 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1406 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1407
1407
1408 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1408 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1409 # We should eventually reconcile this logic with the one behind
1409 # We should eventually reconcile this logic with the one behind
1410 # 'exchange.getbundle2partsgenerator'.
1410 # 'exchange.getbundle2partsgenerator'.
1411 #
1411 #
1412 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1412 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1413 # different right now. So we keep them separated for now for the sake of
1413 # different right now. So we keep them separated for now for the sake of
1414 # simplicity.
1414 # simplicity.
1415
1415
1416 # we always want a changegroup in such bundle
1416 # we always want a changegroup in such bundle
1417 cgversion = opts.get('cg.version')
1417 cgversion = opts.get('cg.version')
1418 if cgversion is None:
1418 if cgversion is None:
1419 cgversion = changegroup.safeversion(repo)
1419 cgversion = changegroup.safeversion(repo)
1420 cg = changegroup.getchangegroup(repo, source, outgoing,
1420 cg = changegroup.getchangegroup(repo, source, outgoing,
1421 version=cgversion)
1421 version=cgversion)
1422 part = bundler.newpart('changegroup', data=cg.getchunks())
1422 part = bundler.newpart('changegroup', data=cg.getchunks())
1423 part.addparam('version', cg.version)
1423 part.addparam('version', cg.version)
1424 if 'clcount' in cg.extras:
1424 if 'clcount' in cg.extras:
1425 part.addparam('nbchanges', str(cg.extras['clcount']),
1425 part.addparam('nbchanges', str(cg.extras['clcount']),
1426 mandatory=False)
1426 mandatory=False)
1427 if opts.get('phases') and repo.revs('%ln and secret()',
1427 if opts.get('phases') and repo.revs('%ln and secret()',
1428 outgoing.missingheads):
1428 outgoing.missingheads):
1429 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1429 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1430
1430
1431 addparttagsfnodescache(repo, bundler, outgoing)
1431 addparttagsfnodescache(repo, bundler, outgoing)
1432
1432
1433 if opts.get('obsolescence', False):
1433 if opts.get('obsolescence', False):
1434 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1434 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1435 buildobsmarkerspart(bundler, obsmarkers)
1435 buildobsmarkerspart(bundler, obsmarkers)
1436
1436
1437 if opts.get('phases', False):
1437 if opts.get('phases', False):
1438 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1438 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1439 phasedata = []
1439 phasedata = []
1440 for phase in phases.allphases:
1440 for phase in phases.allphases:
1441 for head in headsbyphase[phase]:
1441 for head in headsbyphase[phase]:
1442 phasedata.append(_pack(_fphasesentry, phase, head))
1442 phasedata.append(_pack(_fphasesentry, phase, head))
1443 bundler.newpart('phase-heads', data=''.join(phasedata))
1443 bundler.newpart('phase-heads', data=''.join(phasedata))
1444
1444
1445 def addparttagsfnodescache(repo, bundler, outgoing):
1445 def addparttagsfnodescache(repo, bundler, outgoing):
1446 # we include the tags fnode cache for the bundle changeset
1446 # we include the tags fnode cache for the bundle changeset
1447 # (as an optional parts)
1447 # (as an optional parts)
1448 cache = tags.hgtagsfnodescache(repo.unfiltered())
1448 cache = tags.hgtagsfnodescache(repo.unfiltered())
1449 chunks = []
1449 chunks = []
1450
1450
1451 # .hgtags fnodes are only relevant for head changesets. While we could
1451 # .hgtags fnodes are only relevant for head changesets. While we could
1452 # transfer values for all known nodes, there will likely be little to
1452 # transfer values for all known nodes, there will likely be little to
1453 # no benefit.
1453 # no benefit.
1454 #
1454 #
1455 # We don't bother using a generator to produce output data because
1455 # We don't bother using a generator to produce output data because
1456 # a) we only have 40 bytes per head and even esoteric numbers of heads
1456 # a) we only have 40 bytes per head and even esoteric numbers of heads
1457 # consume little memory (1M heads is 40MB) b) we don't want to send the
1457 # consume little memory (1M heads is 40MB) b) we don't want to send the
1458 # part if we don't have entries and knowing if we have entries requires
1458 # part if we don't have entries and knowing if we have entries requires
1459 # cache lookups.
1459 # cache lookups.
1460 for node in outgoing.missingheads:
1460 for node in outgoing.missingheads:
1461 # Don't compute missing, as this may slow down serving.
1461 # Don't compute missing, as this may slow down serving.
1462 fnode = cache.getfnode(node, computemissing=False)
1462 fnode = cache.getfnode(node, computemissing=False)
1463 if fnode is not None:
1463 if fnode is not None:
1464 chunks.extend([node, fnode])
1464 chunks.extend([node, fnode])
1465
1465
1466 if chunks:
1466 if chunks:
1467 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1467 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1468
1468
1469 def buildobsmarkerspart(bundler, markers):
1469 def buildobsmarkerspart(bundler, markers):
1470 """add an obsmarker part to the bundler with <markers>
1470 """add an obsmarker part to the bundler with <markers>
1471
1471
1472 No part is created if markers is empty.
1472 No part is created if markers is empty.
1473 Raises ValueError if the bundler doesn't support any known obsmarker format.
1473 Raises ValueError if the bundler doesn't support any known obsmarker format.
1474 """
1474 """
1475 if not markers:
1475 if not markers:
1476 return None
1476 return None
1477
1477
1478 remoteversions = obsmarkersversion(bundler.capabilities)
1478 remoteversions = obsmarkersversion(bundler.capabilities)
1479 version = obsolete.commonversion(remoteversions)
1479 version = obsolete.commonversion(remoteversions)
1480 if version is None:
1480 if version is None:
1481 raise ValueError('bundler does not support common obsmarker format')
1481 raise ValueError('bundler does not support common obsmarker format')
1482 stream = obsolete.encodemarkers(markers, True, version=version)
1482 stream = obsolete.encodemarkers(markers, True, version=version)
1483 return bundler.newpart('obsmarkers', data=stream)
1483 return bundler.newpart('obsmarkers', data=stream)
1484
1484
1485 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1485 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1486 compopts=None):
1486 compopts=None):
1487 """Write a bundle file and return its filename.
1487 """Write a bundle file and return its filename.
1488
1488
1489 Existing files will not be overwritten.
1489 Existing files will not be overwritten.
1490 If no filename is specified, a temporary file is created.
1490 If no filename is specified, a temporary file is created.
1491 bz2 compression can be turned off.
1491 bz2 compression can be turned off.
1492 The bundle file will be deleted in case of errors.
1492 The bundle file will be deleted in case of errors.
1493 """
1493 """
1494
1494
1495 if bundletype == "HG20":
1495 if bundletype == "HG20":
1496 bundle = bundle20(ui)
1496 bundle = bundle20(ui)
1497 bundle.setcompression(compression, compopts)
1497 bundle.setcompression(compression, compopts)
1498 part = bundle.newpart('changegroup', data=cg.getchunks())
1498 part = bundle.newpart('changegroup', data=cg.getchunks())
1499 part.addparam('version', cg.version)
1499 part.addparam('version', cg.version)
1500 if 'clcount' in cg.extras:
1500 if 'clcount' in cg.extras:
1501 part.addparam('nbchanges', str(cg.extras['clcount']),
1501 part.addparam('nbchanges', str(cg.extras['clcount']),
1502 mandatory=False)
1502 mandatory=False)
1503 chunkiter = bundle.getchunks()
1503 chunkiter = bundle.getchunks()
1504 else:
1504 else:
1505 # compression argument is only for the bundle2 case
1505 # compression argument is only for the bundle2 case
1506 assert compression is None
1506 assert compression is None
1507 if cg.version != '01':
1507 if cg.version != '01':
1508 raise error.Abort(_('old bundle types only supports v1 '
1508 raise error.Abort(_('old bundle types only supports v1 '
1509 'changegroups'))
1509 'changegroups'))
1510 header, comp = bundletypes[bundletype]
1510 header, comp = bundletypes[bundletype]
1511 if comp not in util.compengines.supportedbundletypes:
1511 if comp not in util.compengines.supportedbundletypes:
1512 raise error.Abort(_('unknown stream compression type: %s')
1512 raise error.Abort(_('unknown stream compression type: %s')
1513 % comp)
1513 % comp)
1514 compengine = util.compengines.forbundletype(comp)
1514 compengine = util.compengines.forbundletype(comp)
1515 def chunkiter():
1515 def chunkiter():
1516 yield header
1516 yield header
1517 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1517 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1518 yield chunk
1518 yield chunk
1519 chunkiter = chunkiter()
1519 chunkiter = chunkiter()
1520
1520
1521 # parse the changegroup data, otherwise we will block
1521 # parse the changegroup data, otherwise we will block
1522 # in case of sshrepo because we don't know the end of the stream
1522 # in case of sshrepo because we don't know the end of the stream
1523 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1523 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1524
1524
1525 def combinechangegroupresults(op):
1525 def combinechangegroupresults(op):
1526 """logic to combine 0 or more addchangegroup results into one"""
1526 """logic to combine 0 or more addchangegroup results into one"""
1527 results = [r.get('return', 0)
1527 results = [r.get('return', 0)
1528 for r in op.records['changegroup']]
1528 for r in op.records['changegroup']]
1529 changedheads = 0
1529 changedheads = 0
1530 result = 1
1530 result = 1
1531 for ret in results:
1531 for ret in results:
1532 # If any changegroup result is 0, return 0
1532 # If any changegroup result is 0, return 0
1533 if ret == 0:
1533 if ret == 0:
1534 result = 0
1534 result = 0
1535 break
1535 break
1536 if ret < -1:
1536 if ret < -1:
1537 changedheads += ret + 1
1537 changedheads += ret + 1
1538 elif ret > 1:
1538 elif ret > 1:
1539 changedheads += ret - 1
1539 changedheads += ret - 1
1540 if changedheads > 0:
1540 if changedheads > 0:
1541 result = 1 + changedheads
1541 result = 1 + changedheads
1542 elif changedheads < 0:
1542 elif changedheads < 0:
1543 result = -1 + changedheads
1543 result = -1 + changedheads
1544 return result
1544 return result
1545
1545
1546 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1546 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1547 'targetphase'))
1547 'targetphase'))
1548 def handlechangegroup(op, inpart):
1548 def handlechangegroup(op, inpart):
1549 """apply a changegroup part on the repo
1549 """apply a changegroup part on the repo
1550
1550
1551 This is a very early implementation that will massive rework before being
1551 This is a very early implementation that will massive rework before being
1552 inflicted to any end-user.
1552 inflicted to any end-user.
1553 """
1553 """
1554 tr = op.gettransaction()
1554 tr = op.gettransaction()
1555 unpackerversion = inpart.params.get('version', '01')
1555 unpackerversion = inpart.params.get('version', '01')
1556 # We should raise an appropriate exception here
1556 # We should raise an appropriate exception here
1557 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1557 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1558 # the source and url passed here are overwritten by the one contained in
1558 # the source and url passed here are overwritten by the one contained in
1559 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1559 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1560 nbchangesets = None
1560 nbchangesets = None
1561 if 'nbchanges' in inpart.params:
1561 if 'nbchanges' in inpart.params:
1562 nbchangesets = int(inpart.params.get('nbchanges'))
1562 nbchangesets = int(inpart.params.get('nbchanges'))
1563 if ('treemanifest' in inpart.params and
1563 if ('treemanifest' in inpart.params and
1564 'treemanifest' not in op.repo.requirements):
1564 'treemanifest' not in op.repo.requirements):
1565 if len(op.repo.changelog) != 0:
1565 if len(op.repo.changelog) != 0:
1566 raise error.Abort(_(
1566 raise error.Abort(_(
1567 "bundle contains tree manifests, but local repo is "
1567 "bundle contains tree manifests, but local repo is "
1568 "non-empty and does not use tree manifests"))
1568 "non-empty and does not use tree manifests"))
1569 op.repo.requirements.add('treemanifest')
1569 op.repo.requirements.add('treemanifest')
1570 op.repo._applyopenerreqs()
1570 op.repo._applyopenerreqs()
1571 op.repo._writerequirements()
1571 op.repo._writerequirements()
1572 extrakwargs = {}
1572 extrakwargs = {}
1573 targetphase = inpart.params.get('targetphase')
1573 targetphase = inpart.params.get('targetphase')
1574 if targetphase is not None:
1574 if targetphase is not None:
1575 extrakwargs['targetphase'] = int(targetphase)
1575 extrakwargs['targetphase'] = int(targetphase)
1576 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1576 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1577 expectedtotal=nbchangesets, **extrakwargs)
1577 expectedtotal=nbchangesets, **extrakwargs)
1578 if op.reply is not None:
1578 if op.reply is not None:
1579 # This is definitely not the final form of this
1579 # This is definitely not the final form of this
1580 # return. But one need to start somewhere.
1580 # return. But one need to start somewhere.
1581 part = op.reply.newpart('reply:changegroup', mandatory=False)
1581 part = op.reply.newpart('reply:changegroup', mandatory=False)
1582 part.addparam(
1582 part.addparam(
1583 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1583 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1584 part.addparam('return', '%i' % ret, mandatory=False)
1584 part.addparam('return', '%i' % ret, mandatory=False)
1585 assert not inpart.read()
1585 assert not inpart.read()
1586
1586
1587 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1587 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1588 ['digest:%s' % k for k in util.DIGESTS.keys()])
1588 ['digest:%s' % k for k in util.DIGESTS.keys()])
1589 @parthandler('remote-changegroup', _remotechangegroupparams)
1589 @parthandler('remote-changegroup', _remotechangegroupparams)
1590 def handleremotechangegroup(op, inpart):
1590 def handleremotechangegroup(op, inpart):
1591 """apply a bundle10 on the repo, given an url and validation information
1591 """apply a bundle10 on the repo, given an url and validation information
1592
1592
1593 All the information about the remote bundle to import are given as
1593 All the information about the remote bundle to import are given as
1594 parameters. The parameters include:
1594 parameters. The parameters include:
1595 - url: the url to the bundle10.
1595 - url: the url to the bundle10.
1596 - size: the bundle10 file size. It is used to validate what was
1596 - size: the bundle10 file size. It is used to validate what was
1597 retrieved by the client matches the server knowledge about the bundle.
1597 retrieved by the client matches the server knowledge about the bundle.
1598 - digests: a space separated list of the digest types provided as
1598 - digests: a space separated list of the digest types provided as
1599 parameters.
1599 parameters.
1600 - digest:<digest-type>: the hexadecimal representation of the digest with
1600 - digest:<digest-type>: the hexadecimal representation of the digest with
1601 that name. Like the size, it is used to validate what was retrieved by
1601 that name. Like the size, it is used to validate what was retrieved by
1602 the client matches what the server knows about the bundle.
1602 the client matches what the server knows about the bundle.
1603
1603
1604 When multiple digest types are given, all of them are checked.
1604 When multiple digest types are given, all of them are checked.
1605 """
1605 """
1606 try:
1606 try:
1607 raw_url = inpart.params['url']
1607 raw_url = inpart.params['url']
1608 except KeyError:
1608 except KeyError:
1609 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1609 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1610 parsed_url = util.url(raw_url)
1610 parsed_url = util.url(raw_url)
1611 if parsed_url.scheme not in capabilities['remote-changegroup']:
1611 if parsed_url.scheme not in capabilities['remote-changegroup']:
1612 raise error.Abort(_('remote-changegroup does not support %s urls') %
1612 raise error.Abort(_('remote-changegroup does not support %s urls') %
1613 parsed_url.scheme)
1613 parsed_url.scheme)
1614
1614
1615 try:
1615 try:
1616 size = int(inpart.params['size'])
1616 size = int(inpart.params['size'])
1617 except ValueError:
1617 except ValueError:
1618 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1618 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1619 % 'size')
1619 % 'size')
1620 except KeyError:
1620 except KeyError:
1621 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1621 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1622
1622
1623 digests = {}
1623 digests = {}
1624 for typ in inpart.params.get('digests', '').split():
1624 for typ in inpart.params.get('digests', '').split():
1625 param = 'digest:%s' % typ
1625 param = 'digest:%s' % typ
1626 try:
1626 try:
1627 value = inpart.params[param]
1627 value = inpart.params[param]
1628 except KeyError:
1628 except KeyError:
1629 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1629 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1630 param)
1630 param)
1631 digests[typ] = value
1631 digests[typ] = value
1632
1632
1633 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1633 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1634
1634
1635 tr = op.gettransaction()
1635 tr = op.gettransaction()
1636 from . import exchange
1636 from . import exchange
1637 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1637 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1638 if not isinstance(cg, changegroup.cg1unpacker):
1638 if not isinstance(cg, changegroup.cg1unpacker):
1639 raise error.Abort(_('%s: not a bundle version 1.0') %
1639 raise error.Abort(_('%s: not a bundle version 1.0') %
1640 util.hidepassword(raw_url))
1640 util.hidepassword(raw_url))
1641 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1641 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1642 if op.reply is not None:
1642 if op.reply is not None:
1643 # This is definitely not the final form of this
1643 # This is definitely not the final form of this
1644 # return. But one need to start somewhere.
1644 # return. But one need to start somewhere.
1645 part = op.reply.newpart('reply:changegroup')
1645 part = op.reply.newpart('reply:changegroup')
1646 part.addparam(
1646 part.addparam(
1647 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1647 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1648 part.addparam('return', '%i' % ret, mandatory=False)
1648 part.addparam('return', '%i' % ret, mandatory=False)
1649 try:
1649 try:
1650 real_part.validate()
1650 real_part.validate()
1651 except error.Abort as e:
1651 except error.Abort as e:
1652 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1652 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1653 (util.hidepassword(raw_url), str(e)))
1653 (util.hidepassword(raw_url), str(e)))
1654 assert not inpart.read()
1654 assert not inpart.read()
1655
1655
1656 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1656 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1657 def handlereplychangegroup(op, inpart):
1657 def handlereplychangegroup(op, inpart):
1658 ret = int(inpart.params['return'])
1658 ret = int(inpart.params['return'])
1659 replyto = int(inpart.params['in-reply-to'])
1659 replyto = int(inpart.params['in-reply-to'])
1660 op.records.add('changegroup', {'return': ret}, replyto)
1660 op.records.add('changegroup', {'return': ret}, replyto)
1661
1661
1662 @parthandler('check:heads')
1662 @parthandler('check:heads')
1663 def handlecheckheads(op, inpart):
1663 def handlecheckheads(op, inpart):
1664 """check that head of the repo did not change
1664 """check that head of the repo did not change
1665
1665
1666 This is used to detect a push race when using unbundle.
1666 This is used to detect a push race when using unbundle.
1667 This replaces the "heads" argument of unbundle."""
1667 This replaces the "heads" argument of unbundle."""
1668 h = inpart.read(20)
1668 h = inpart.read(20)
1669 heads = []
1669 heads = []
1670 while len(h) == 20:
1670 while len(h) == 20:
1671 heads.append(h)
1671 heads.append(h)
1672 h = inpart.read(20)
1672 h = inpart.read(20)
1673 assert not h
1673 assert not h
1674 # Trigger a transaction so that we are guaranteed to have the lock now.
1674 # Trigger a transaction so that we are guaranteed to have the lock now.
1675 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1675 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1676 op.gettransaction()
1676 op.gettransaction()
1677 if sorted(heads) != sorted(op.repo.heads()):
1677 if sorted(heads) != sorted(op.repo.heads()):
1678 raise error.PushRaced('repository changed while pushing - '
1678 raise error.PushRaced('repository changed while pushing - '
1679 'please try again')
1679 'please try again')
1680
1680
1681 @parthandler('check:updated-heads')
1681 @parthandler('check:updated-heads')
1682 def handlecheckupdatedheads(op, inpart):
1682 def handlecheckupdatedheads(op, inpart):
1683 """check for race on the heads touched by a push
1683 """check for race on the heads touched by a push
1684
1684
1685 This is similar to 'check:heads' but focus on the heads actually updated
1685 This is similar to 'check:heads' but focus on the heads actually updated
1686 during the push. If other activities happen on unrelated heads, it is
1686 during the push. If other activities happen on unrelated heads, it is
1687 ignored.
1687 ignored.
1688
1688
1689 This allow server with high traffic to avoid push contention as long as
1689 This allow server with high traffic to avoid push contention as long as
1690 unrelated parts of the graph are involved."""
1690 unrelated parts of the graph are involved."""
1691 h = inpart.read(20)
1691 h = inpart.read(20)
1692 heads = []
1692 heads = []
1693 while len(h) == 20:
1693 while len(h) == 20:
1694 heads.append(h)
1694 heads.append(h)
1695 h = inpart.read(20)
1695 h = inpart.read(20)
1696 assert not h
1696 assert not h
1697 # trigger a transaction so that we are guaranteed to have the lock now.
1697 # trigger a transaction so that we are guaranteed to have the lock now.
1698 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1698 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1699 op.gettransaction()
1699 op.gettransaction()
1700
1700
1701 currentheads = set()
1701 currentheads = set()
1702 for ls in op.repo.branchmap().itervalues():
1702 for ls in op.repo.branchmap().itervalues():
1703 currentheads.update(ls)
1703 currentheads.update(ls)
1704
1704
1705 for h in heads:
1705 for h in heads:
1706 if h not in currentheads:
1706 if h not in currentheads:
1707 raise error.PushRaced('repository changed while pushing - '
1707 raise error.PushRaced('repository changed while pushing - '
1708 'please try again')
1708 'please try again')
1709
1709
1710 @parthandler('output')
1710 @parthandler('output')
1711 def handleoutput(op, inpart):
1711 def handleoutput(op, inpart):
1712 """forward output captured on the server to the client"""
1712 """forward output captured on the server to the client"""
1713 for line in inpart.read().splitlines():
1713 for line in inpart.read().splitlines():
1714 op.ui.status(_('remote: %s\n') % line)
1714 op.ui.status(_('remote: %s\n') % line)
1715
1715
1716 @parthandler('replycaps')
1716 @parthandler('replycaps')
1717 def handlereplycaps(op, inpart):
1717 def handlereplycaps(op, inpart):
1718 """Notify that a reply bundle should be created
1718 """Notify that a reply bundle should be created
1719
1719
1720 The payload contains the capabilities information for the reply"""
1720 The payload contains the capabilities information for the reply"""
1721 caps = decodecaps(inpart.read())
1721 caps = decodecaps(inpart.read())
1722 if op.reply is None:
1722 if op.reply is None:
1723 op.reply = bundle20(op.ui, caps)
1723 op.reply = bundle20(op.ui, caps)
1724
1724
1725 class AbortFromPart(error.Abort):
1725 class AbortFromPart(error.Abort):
1726 """Sub-class of Abort that denotes an error from a bundle2 part."""
1726 """Sub-class of Abort that denotes an error from a bundle2 part."""
1727
1727
1728 @parthandler('error:abort', ('message', 'hint'))
1728 @parthandler('error:abort', ('message', 'hint'))
1729 def handleerrorabort(op, inpart):
1729 def handleerrorabort(op, inpart):
1730 """Used to transmit abort error over the wire"""
1730 """Used to transmit abort error over the wire"""
1731 raise AbortFromPart(inpart.params['message'],
1731 raise AbortFromPart(inpart.params['message'],
1732 hint=inpart.params.get('hint'))
1732 hint=inpart.params.get('hint'))
1733
1733
1734 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1734 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1735 'in-reply-to'))
1735 'in-reply-to'))
1736 def handleerrorpushkey(op, inpart):
1736 def handleerrorpushkey(op, inpart):
1737 """Used to transmit failure of a mandatory pushkey over the wire"""
1737 """Used to transmit failure of a mandatory pushkey over the wire"""
1738 kwargs = {}
1738 kwargs = {}
1739 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1739 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1740 value = inpart.params.get(name)
1740 value = inpart.params.get(name)
1741 if value is not None:
1741 if value is not None:
1742 kwargs[name] = value
1742 kwargs[name] = value
1743 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1743 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1744
1744
1745 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1745 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1746 def handleerrorunsupportedcontent(op, inpart):
1746 def handleerrorunsupportedcontent(op, inpart):
1747 """Used to transmit unknown content error over the wire"""
1747 """Used to transmit unknown content error over the wire"""
1748 kwargs = {}
1748 kwargs = {}
1749 parttype = inpart.params.get('parttype')
1749 parttype = inpart.params.get('parttype')
1750 if parttype is not None:
1750 if parttype is not None:
1751 kwargs['parttype'] = parttype
1751 kwargs['parttype'] = parttype
1752 params = inpart.params.get('params')
1752 params = inpart.params.get('params')
1753 if params is not None:
1753 if params is not None:
1754 kwargs['params'] = params.split('\0')
1754 kwargs['params'] = params.split('\0')
1755
1755
1756 raise error.BundleUnknownFeatureError(**kwargs)
1756 raise error.BundleUnknownFeatureError(**kwargs)
1757
1757
1758 @parthandler('error:pushraced', ('message',))
1758 @parthandler('error:pushraced', ('message',))
1759 def handleerrorpushraced(op, inpart):
1759 def handleerrorpushraced(op, inpart):
1760 """Used to transmit push race error over the wire"""
1760 """Used to transmit push race error over the wire"""
1761 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1761 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1762
1762
1763 @parthandler('listkeys', ('namespace',))
1763 @parthandler('listkeys', ('namespace',))
1764 def handlelistkeys(op, inpart):
1764 def handlelistkeys(op, inpart):
1765 """retrieve pushkey namespace content stored in a bundle2"""
1765 """retrieve pushkey namespace content stored in a bundle2"""
1766 namespace = inpart.params['namespace']
1766 namespace = inpart.params['namespace']
1767 r = pushkey.decodekeys(inpart.read())
1767 r = pushkey.decodekeys(inpart.read())
1768 op.records.add('listkeys', (namespace, r))
1768 op.records.add('listkeys', (namespace, r))
1769
1769
1770 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1770 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1771 def handlepushkey(op, inpart):
1771 def handlepushkey(op, inpart):
1772 """process a pushkey request"""
1772 """process a pushkey request"""
1773 dec = pushkey.decode
1773 dec = pushkey.decode
1774 namespace = dec(inpart.params['namespace'])
1774 namespace = dec(inpart.params['namespace'])
1775 key = dec(inpart.params['key'])
1775 key = dec(inpart.params['key'])
1776 old = dec(inpart.params['old'])
1776 old = dec(inpart.params['old'])
1777 new = dec(inpart.params['new'])
1777 new = dec(inpart.params['new'])
1778 # Grab the transaction to ensure that we have the lock before performing the
1778 # Grab the transaction to ensure that we have the lock before performing the
1779 # pushkey.
1779 # pushkey.
1780 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1780 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1781 op.gettransaction()
1781 op.gettransaction()
1782 ret = op.repo.pushkey(namespace, key, old, new)
1782 ret = op.repo.pushkey(namespace, key, old, new)
1783 record = {'namespace': namespace,
1783 record = {'namespace': namespace,
1784 'key': key,
1784 'key': key,
1785 'old': old,
1785 'old': old,
1786 'new': new}
1786 'new': new}
1787 op.records.add('pushkey', record)
1787 op.records.add('pushkey', record)
1788 if op.reply is not None:
1788 if op.reply is not None:
1789 rpart = op.reply.newpart('reply:pushkey')
1789 rpart = op.reply.newpart('reply:pushkey')
1790 rpart.addparam(
1790 rpart.addparam(
1791 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1791 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1792 rpart.addparam('return', '%i' % ret, mandatory=False)
1792 rpart.addparam('return', '%i' % ret, mandatory=False)
1793 if inpart.mandatory and not ret:
1793 if inpart.mandatory and not ret:
1794 kwargs = {}
1794 kwargs = {}
1795 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1795 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1796 if key in inpart.params:
1796 if key in inpart.params:
1797 kwargs[key] = inpart.params[key]
1797 kwargs[key] = inpart.params[key]
1798 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1798 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1799
1799
1800 def _readphaseheads(inpart):
1800 def _readphaseheads(inpart):
1801 headsbyphase = [[] for i in phases.allphases]
1801 headsbyphase = [[] for i in phases.allphases]
1802 entrysize = struct.calcsize(_fphasesentry)
1802 entrysize = struct.calcsize(_fphasesentry)
1803 while True:
1803 while True:
1804 entry = inpart.read(entrysize)
1804 entry = inpart.read(entrysize)
1805 if len(entry) < entrysize:
1805 if len(entry) < entrysize:
1806 if entry:
1806 if entry:
1807 raise error.Abort(_('bad phase-heads bundle part'))
1807 raise error.Abort(_('bad phase-heads bundle part'))
1808 break
1808 break
1809 phase, node = struct.unpack(_fphasesentry, entry)
1809 phase, node = struct.unpack(_fphasesentry, entry)
1810 headsbyphase[phase].append(node)
1810 headsbyphase[phase].append(node)
1811 return headsbyphase
1811 return headsbyphase
1812
1812
1813 @parthandler('phase-heads')
1813 @parthandler('phase-heads')
1814 def handlephases(op, inpart):
1814 def handlephases(op, inpart):
1815 """apply phases from bundle part to repo"""
1815 """apply phases from bundle part to repo"""
1816 headsbyphase = _readphaseheads(inpart)
1816 headsbyphase = _readphaseheads(inpart)
1817 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1817 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1818 op.records.add('phase-heads', {})
1818
1819
1819 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1820 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1820 def handlepushkeyreply(op, inpart):
1821 def handlepushkeyreply(op, inpart):
1821 """retrieve the result of a pushkey request"""
1822 """retrieve the result of a pushkey request"""
1822 ret = int(inpart.params['return'])
1823 ret = int(inpart.params['return'])
1823 partid = int(inpart.params['in-reply-to'])
1824 partid = int(inpart.params['in-reply-to'])
1824 op.records.add('pushkey', {'return': ret}, partid)
1825 op.records.add('pushkey', {'return': ret}, partid)
1825
1826
1826 @parthandler('obsmarkers')
1827 @parthandler('obsmarkers')
1827 def handleobsmarker(op, inpart):
1828 def handleobsmarker(op, inpart):
1828 """add a stream of obsmarkers to the repo"""
1829 """add a stream of obsmarkers to the repo"""
1829 tr = op.gettransaction()
1830 tr = op.gettransaction()
1830 markerdata = inpart.read()
1831 markerdata = inpart.read()
1831 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1832 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1832 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1833 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1833 % len(markerdata))
1834 % len(markerdata))
1834 # The mergemarkers call will crash if marker creation is not enabled.
1835 # The mergemarkers call will crash if marker creation is not enabled.
1835 # we want to avoid this if the part is advisory.
1836 # we want to avoid this if the part is advisory.
1836 if not inpart.mandatory and op.repo.obsstore.readonly:
1837 if not inpart.mandatory and op.repo.obsstore.readonly:
1837 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1838 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1838 return
1839 return
1839 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1840 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1840 op.repo.invalidatevolatilesets()
1841 op.repo.invalidatevolatilesets()
1841 if new:
1842 if new:
1842 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1843 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1843 op.records.add('obsmarkers', {'new': new})
1844 op.records.add('obsmarkers', {'new': new})
1844 if op.reply is not None:
1845 if op.reply is not None:
1845 rpart = op.reply.newpart('reply:obsmarkers')
1846 rpart = op.reply.newpart('reply:obsmarkers')
1846 rpart.addparam(
1847 rpart.addparam(
1847 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1848 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1848 rpart.addparam('new', '%i' % new, mandatory=False)
1849 rpart.addparam('new', '%i' % new, mandatory=False)
1849
1850
1850
1851
1851 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1852 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1852 def handleobsmarkerreply(op, inpart):
1853 def handleobsmarkerreply(op, inpart):
1853 """retrieve the result of a pushkey request"""
1854 """retrieve the result of a pushkey request"""
1854 ret = int(inpart.params['new'])
1855 ret = int(inpart.params['new'])
1855 partid = int(inpart.params['in-reply-to'])
1856 partid = int(inpart.params['in-reply-to'])
1856 op.records.add('obsmarkers', {'new': ret}, partid)
1857 op.records.add('obsmarkers', {'new': ret}, partid)
1857
1858
1858 @parthandler('hgtagsfnodes')
1859 @parthandler('hgtagsfnodes')
1859 def handlehgtagsfnodes(op, inpart):
1860 def handlehgtagsfnodes(op, inpart):
1860 """Applies .hgtags fnodes cache entries to the local repo.
1861 """Applies .hgtags fnodes cache entries to the local repo.
1861
1862
1862 Payload is pairs of 20 byte changeset nodes and filenodes.
1863 Payload is pairs of 20 byte changeset nodes and filenodes.
1863 """
1864 """
1864 # Grab the transaction so we ensure that we have the lock at this point.
1865 # Grab the transaction so we ensure that we have the lock at this point.
1865 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1866 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1866 op.gettransaction()
1867 op.gettransaction()
1867 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1868 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1868
1869
1869 count = 0
1870 count = 0
1870 while True:
1871 while True:
1871 node = inpart.read(20)
1872 node = inpart.read(20)
1872 fnode = inpart.read(20)
1873 fnode = inpart.read(20)
1873 if len(node) < 20 or len(fnode) < 20:
1874 if len(node) < 20 or len(fnode) < 20:
1874 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1875 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1875 break
1876 break
1876 cache.setfnode(node, fnode)
1877 cache.setfnode(node, fnode)
1877 count += 1
1878 count += 1
1878
1879
1879 cache.write()
1880 cache.write()
1880 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1881 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1881
1882
1882 @parthandler('pushvars')
1883 @parthandler('pushvars')
1883 def bundle2getvars(op, part):
1884 def bundle2getvars(op, part):
1884 '''unbundle a bundle2 containing shellvars on the server'''
1885 '''unbundle a bundle2 containing shellvars on the server'''
1885 # An option to disable unbundling on server-side for security reasons
1886 # An option to disable unbundling on server-side for security reasons
1886 if op.ui.configbool('push', 'pushvars.server'):
1887 if op.ui.configbool('push', 'pushvars.server'):
1887 hookargs = {}
1888 hookargs = {}
1888 for key, value in part.advisoryparams:
1889 for key, value in part.advisoryparams:
1889 key = key.upper()
1890 key = key.upper()
1890 # We want pushed variables to have USERVAR_ prepended so we know
1891 # We want pushed variables to have USERVAR_ prepended so we know
1891 # they came from the --pushvar flag.
1892 # they came from the --pushvar flag.
1892 key = "USERVAR_" + key
1893 key = "USERVAR_" + key
1893 hookargs[key] = value
1894 hookargs[key] = value
1894 op.addhookargs(hookargs)
1895 op.addhookargs(hookargs)
@@ -1,2013 +1,2017 b''
1 # exchange.py - utility to exchange data between repos.
1 # exchange.py - utility to exchange data between repos.
2 #
2 #
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import hashlib
11 import hashlib
12
12
13 from .i18n import _
13 from .i18n import _
14 from .node import (
14 from .node import (
15 hex,
15 hex,
16 nullid,
16 nullid,
17 )
17 )
18 from . import (
18 from . import (
19 bookmarks as bookmod,
19 bookmarks as bookmod,
20 bundle2,
20 bundle2,
21 changegroup,
21 changegroup,
22 discovery,
22 discovery,
23 error,
23 error,
24 lock as lockmod,
24 lock as lockmod,
25 obsolete,
25 obsolete,
26 phases,
26 phases,
27 pushkey,
27 pushkey,
28 pycompat,
28 pycompat,
29 scmutil,
29 scmutil,
30 sslutil,
30 sslutil,
31 streamclone,
31 streamclone,
32 url as urlmod,
32 url as urlmod,
33 util,
33 util,
34 )
34 )
35
35
36 urlerr = util.urlerr
36 urlerr = util.urlerr
37 urlreq = util.urlreq
37 urlreq = util.urlreq
38
38
39 # Maps bundle version human names to changegroup versions.
39 # Maps bundle version human names to changegroup versions.
40 _bundlespeccgversions = {'v1': '01',
40 _bundlespeccgversions = {'v1': '01',
41 'v2': '02',
41 'v2': '02',
42 'packed1': 's1',
42 'packed1': 's1',
43 'bundle2': '02', #legacy
43 'bundle2': '02', #legacy
44 }
44 }
45
45
46 # Compression engines allowed in version 1. THIS SHOULD NEVER CHANGE.
46 # Compression engines allowed in version 1. THIS SHOULD NEVER CHANGE.
47 _bundlespecv1compengines = {'gzip', 'bzip2', 'none'}
47 _bundlespecv1compengines = {'gzip', 'bzip2', 'none'}
48
48
49 def parsebundlespec(repo, spec, strict=True, externalnames=False):
49 def parsebundlespec(repo, spec, strict=True, externalnames=False):
50 """Parse a bundle string specification into parts.
50 """Parse a bundle string specification into parts.
51
51
52 Bundle specifications denote a well-defined bundle/exchange format.
52 Bundle specifications denote a well-defined bundle/exchange format.
53 The content of a given specification should not change over time in
53 The content of a given specification should not change over time in
54 order to ensure that bundles produced by a newer version of Mercurial are
54 order to ensure that bundles produced by a newer version of Mercurial are
55 readable from an older version.
55 readable from an older version.
56
56
57 The string currently has the form:
57 The string currently has the form:
58
58
59 <compression>-<type>[;<parameter0>[;<parameter1>]]
59 <compression>-<type>[;<parameter0>[;<parameter1>]]
60
60
61 Where <compression> is one of the supported compression formats
61 Where <compression> is one of the supported compression formats
62 and <type> is (currently) a version string. A ";" can follow the type and
62 and <type> is (currently) a version string. A ";" can follow the type and
63 all text afterwards is interpreted as URI encoded, ";" delimited key=value
63 all text afterwards is interpreted as URI encoded, ";" delimited key=value
64 pairs.
64 pairs.
65
65
66 If ``strict`` is True (the default) <compression> is required. Otherwise,
66 If ``strict`` is True (the default) <compression> is required. Otherwise,
67 it is optional.
67 it is optional.
68
68
69 If ``externalnames`` is False (the default), the human-centric names will
69 If ``externalnames`` is False (the default), the human-centric names will
70 be converted to their internal representation.
70 be converted to their internal representation.
71
71
72 Returns a 3-tuple of (compression, version, parameters). Compression will
72 Returns a 3-tuple of (compression, version, parameters). Compression will
73 be ``None`` if not in strict mode and a compression isn't defined.
73 be ``None`` if not in strict mode and a compression isn't defined.
74
74
75 An ``InvalidBundleSpecification`` is raised when the specification is
75 An ``InvalidBundleSpecification`` is raised when the specification is
76 not syntactically well formed.
76 not syntactically well formed.
77
77
78 An ``UnsupportedBundleSpecification`` is raised when the compression or
78 An ``UnsupportedBundleSpecification`` is raised when the compression or
79 bundle type/version is not recognized.
79 bundle type/version is not recognized.
80
80
81 Note: this function will likely eventually return a more complex data
81 Note: this function will likely eventually return a more complex data
82 structure, including bundle2 part information.
82 structure, including bundle2 part information.
83 """
83 """
84 def parseparams(s):
84 def parseparams(s):
85 if ';' not in s:
85 if ';' not in s:
86 return s, {}
86 return s, {}
87
87
88 params = {}
88 params = {}
89 version, paramstr = s.split(';', 1)
89 version, paramstr = s.split(';', 1)
90
90
91 for p in paramstr.split(';'):
91 for p in paramstr.split(';'):
92 if '=' not in p:
92 if '=' not in p:
93 raise error.InvalidBundleSpecification(
93 raise error.InvalidBundleSpecification(
94 _('invalid bundle specification: '
94 _('invalid bundle specification: '
95 'missing "=" in parameter: %s') % p)
95 'missing "=" in parameter: %s') % p)
96
96
97 key, value = p.split('=', 1)
97 key, value = p.split('=', 1)
98 key = urlreq.unquote(key)
98 key = urlreq.unquote(key)
99 value = urlreq.unquote(value)
99 value = urlreq.unquote(value)
100 params[key] = value
100 params[key] = value
101
101
102 return version, params
102 return version, params
103
103
104
104
105 if strict and '-' not in spec:
105 if strict and '-' not in spec:
106 raise error.InvalidBundleSpecification(
106 raise error.InvalidBundleSpecification(
107 _('invalid bundle specification; '
107 _('invalid bundle specification; '
108 'must be prefixed with compression: %s') % spec)
108 'must be prefixed with compression: %s') % spec)
109
109
110 if '-' in spec:
110 if '-' in spec:
111 compression, version = spec.split('-', 1)
111 compression, version = spec.split('-', 1)
112
112
113 if compression not in util.compengines.supportedbundlenames:
113 if compression not in util.compengines.supportedbundlenames:
114 raise error.UnsupportedBundleSpecification(
114 raise error.UnsupportedBundleSpecification(
115 _('%s compression is not supported') % compression)
115 _('%s compression is not supported') % compression)
116
116
117 version, params = parseparams(version)
117 version, params = parseparams(version)
118
118
119 if version not in _bundlespeccgversions:
119 if version not in _bundlespeccgversions:
120 raise error.UnsupportedBundleSpecification(
120 raise error.UnsupportedBundleSpecification(
121 _('%s is not a recognized bundle version') % version)
121 _('%s is not a recognized bundle version') % version)
122 else:
122 else:
123 # Value could be just the compression or just the version, in which
123 # Value could be just the compression or just the version, in which
124 # case some defaults are assumed (but only when not in strict mode).
124 # case some defaults are assumed (but only when not in strict mode).
125 assert not strict
125 assert not strict
126
126
127 spec, params = parseparams(spec)
127 spec, params = parseparams(spec)
128
128
129 if spec in util.compengines.supportedbundlenames:
129 if spec in util.compengines.supportedbundlenames:
130 compression = spec
130 compression = spec
131 version = 'v1'
131 version = 'v1'
132 # Generaldelta repos require v2.
132 # Generaldelta repos require v2.
133 if 'generaldelta' in repo.requirements:
133 if 'generaldelta' in repo.requirements:
134 version = 'v2'
134 version = 'v2'
135 # Modern compression engines require v2.
135 # Modern compression engines require v2.
136 if compression not in _bundlespecv1compengines:
136 if compression not in _bundlespecv1compengines:
137 version = 'v2'
137 version = 'v2'
138 elif spec in _bundlespeccgversions:
138 elif spec in _bundlespeccgversions:
139 if spec == 'packed1':
139 if spec == 'packed1':
140 compression = 'none'
140 compression = 'none'
141 else:
141 else:
142 compression = 'bzip2'
142 compression = 'bzip2'
143 version = spec
143 version = spec
144 else:
144 else:
145 raise error.UnsupportedBundleSpecification(
145 raise error.UnsupportedBundleSpecification(
146 _('%s is not a recognized bundle specification') % spec)
146 _('%s is not a recognized bundle specification') % spec)
147
147
148 # Bundle version 1 only supports a known set of compression engines.
148 # Bundle version 1 only supports a known set of compression engines.
149 if version == 'v1' and compression not in _bundlespecv1compengines:
149 if version == 'v1' and compression not in _bundlespecv1compengines:
150 raise error.UnsupportedBundleSpecification(
150 raise error.UnsupportedBundleSpecification(
151 _('compression engine %s is not supported on v1 bundles') %
151 _('compression engine %s is not supported on v1 bundles') %
152 compression)
152 compression)
153
153
154 # The specification for packed1 can optionally declare the data formats
154 # The specification for packed1 can optionally declare the data formats
155 # required to apply it. If we see this metadata, compare against what the
155 # required to apply it. If we see this metadata, compare against what the
156 # repo supports and error if the bundle isn't compatible.
156 # repo supports and error if the bundle isn't compatible.
157 if version == 'packed1' and 'requirements' in params:
157 if version == 'packed1' and 'requirements' in params:
158 requirements = set(params['requirements'].split(','))
158 requirements = set(params['requirements'].split(','))
159 missingreqs = requirements - repo.supportedformats
159 missingreqs = requirements - repo.supportedformats
160 if missingreqs:
160 if missingreqs:
161 raise error.UnsupportedBundleSpecification(
161 raise error.UnsupportedBundleSpecification(
162 _('missing support for repository features: %s') %
162 _('missing support for repository features: %s') %
163 ', '.join(sorted(missingreqs)))
163 ', '.join(sorted(missingreqs)))
164
164
165 if not externalnames:
165 if not externalnames:
166 engine = util.compengines.forbundlename(compression)
166 engine = util.compengines.forbundlename(compression)
167 compression = engine.bundletype()[1]
167 compression = engine.bundletype()[1]
168 version = _bundlespeccgversions[version]
168 version = _bundlespeccgversions[version]
169 return compression, version, params
169 return compression, version, params
170
170
171 def readbundle(ui, fh, fname, vfs=None):
171 def readbundle(ui, fh, fname, vfs=None):
172 header = changegroup.readexactly(fh, 4)
172 header = changegroup.readexactly(fh, 4)
173
173
174 alg = None
174 alg = None
175 if not fname:
175 if not fname:
176 fname = "stream"
176 fname = "stream"
177 if not header.startswith('HG') and header.startswith('\0'):
177 if not header.startswith('HG') and header.startswith('\0'):
178 fh = changegroup.headerlessfixup(fh, header)
178 fh = changegroup.headerlessfixup(fh, header)
179 header = "HG10"
179 header = "HG10"
180 alg = 'UN'
180 alg = 'UN'
181 elif vfs:
181 elif vfs:
182 fname = vfs.join(fname)
182 fname = vfs.join(fname)
183
183
184 magic, version = header[0:2], header[2:4]
184 magic, version = header[0:2], header[2:4]
185
185
186 if magic != 'HG':
186 if magic != 'HG':
187 raise error.Abort(_('%s: not a Mercurial bundle') % fname)
187 raise error.Abort(_('%s: not a Mercurial bundle') % fname)
188 if version == '10':
188 if version == '10':
189 if alg is None:
189 if alg is None:
190 alg = changegroup.readexactly(fh, 2)
190 alg = changegroup.readexactly(fh, 2)
191 return changegroup.cg1unpacker(fh, alg)
191 return changegroup.cg1unpacker(fh, alg)
192 elif version.startswith('2'):
192 elif version.startswith('2'):
193 return bundle2.getunbundler(ui, fh, magicstring=magic + version)
193 return bundle2.getunbundler(ui, fh, magicstring=magic + version)
194 elif version == 'S1':
194 elif version == 'S1':
195 return streamclone.streamcloneapplier(fh)
195 return streamclone.streamcloneapplier(fh)
196 else:
196 else:
197 raise error.Abort(_('%s: unknown bundle version %s') % (fname, version))
197 raise error.Abort(_('%s: unknown bundle version %s') % (fname, version))
198
198
199 def getbundlespec(ui, fh):
199 def getbundlespec(ui, fh):
200 """Infer the bundlespec from a bundle file handle.
200 """Infer the bundlespec from a bundle file handle.
201
201
202 The input file handle is seeked and the original seek position is not
202 The input file handle is seeked and the original seek position is not
203 restored.
203 restored.
204 """
204 """
205 def speccompression(alg):
205 def speccompression(alg):
206 try:
206 try:
207 return util.compengines.forbundletype(alg).bundletype()[0]
207 return util.compengines.forbundletype(alg).bundletype()[0]
208 except KeyError:
208 except KeyError:
209 return None
209 return None
210
210
211 b = readbundle(ui, fh, None)
211 b = readbundle(ui, fh, None)
212 if isinstance(b, changegroup.cg1unpacker):
212 if isinstance(b, changegroup.cg1unpacker):
213 alg = b._type
213 alg = b._type
214 if alg == '_truncatedBZ':
214 if alg == '_truncatedBZ':
215 alg = 'BZ'
215 alg = 'BZ'
216 comp = speccompression(alg)
216 comp = speccompression(alg)
217 if not comp:
217 if not comp:
218 raise error.Abort(_('unknown compression algorithm: %s') % alg)
218 raise error.Abort(_('unknown compression algorithm: %s') % alg)
219 return '%s-v1' % comp
219 return '%s-v1' % comp
220 elif isinstance(b, bundle2.unbundle20):
220 elif isinstance(b, bundle2.unbundle20):
221 if 'Compression' in b.params:
221 if 'Compression' in b.params:
222 comp = speccompression(b.params['Compression'])
222 comp = speccompression(b.params['Compression'])
223 if not comp:
223 if not comp:
224 raise error.Abort(_('unknown compression algorithm: %s') % comp)
224 raise error.Abort(_('unknown compression algorithm: %s') % comp)
225 else:
225 else:
226 comp = 'none'
226 comp = 'none'
227
227
228 version = None
228 version = None
229 for part in b.iterparts():
229 for part in b.iterparts():
230 if part.type == 'changegroup':
230 if part.type == 'changegroup':
231 version = part.params['version']
231 version = part.params['version']
232 if version in ('01', '02'):
232 if version in ('01', '02'):
233 version = 'v2'
233 version = 'v2'
234 else:
234 else:
235 raise error.Abort(_('changegroup version %s does not have '
235 raise error.Abort(_('changegroup version %s does not have '
236 'a known bundlespec') % version,
236 'a known bundlespec') % version,
237 hint=_('try upgrading your Mercurial '
237 hint=_('try upgrading your Mercurial '
238 'client'))
238 'client'))
239
239
240 if not version:
240 if not version:
241 raise error.Abort(_('could not identify changegroup version in '
241 raise error.Abort(_('could not identify changegroup version in '
242 'bundle'))
242 'bundle'))
243
243
244 return '%s-%s' % (comp, version)
244 return '%s-%s' % (comp, version)
245 elif isinstance(b, streamclone.streamcloneapplier):
245 elif isinstance(b, streamclone.streamcloneapplier):
246 requirements = streamclone.readbundle1header(fh)[2]
246 requirements = streamclone.readbundle1header(fh)[2]
247 params = 'requirements=%s' % ','.join(sorted(requirements))
247 params = 'requirements=%s' % ','.join(sorted(requirements))
248 return 'none-packed1;%s' % urlreq.quote(params)
248 return 'none-packed1;%s' % urlreq.quote(params)
249 else:
249 else:
250 raise error.Abort(_('unknown bundle type: %s') % b)
250 raise error.Abort(_('unknown bundle type: %s') % b)
251
251
252 def _computeoutgoing(repo, heads, common):
252 def _computeoutgoing(repo, heads, common):
253 """Computes which revs are outgoing given a set of common
253 """Computes which revs are outgoing given a set of common
254 and a set of heads.
254 and a set of heads.
255
255
256 This is a separate function so extensions can have access to
256 This is a separate function so extensions can have access to
257 the logic.
257 the logic.
258
258
259 Returns a discovery.outgoing object.
259 Returns a discovery.outgoing object.
260 """
260 """
261 cl = repo.changelog
261 cl = repo.changelog
262 if common:
262 if common:
263 hasnode = cl.hasnode
263 hasnode = cl.hasnode
264 common = [n for n in common if hasnode(n)]
264 common = [n for n in common if hasnode(n)]
265 else:
265 else:
266 common = [nullid]
266 common = [nullid]
267 if not heads:
267 if not heads:
268 heads = cl.heads()
268 heads = cl.heads()
269 return discovery.outgoing(repo, common, heads)
269 return discovery.outgoing(repo, common, heads)
270
270
271 def _forcebundle1(op):
271 def _forcebundle1(op):
272 """return true if a pull/push must use bundle1
272 """return true if a pull/push must use bundle1
273
273
274 This function is used to allow testing of the older bundle version"""
274 This function is used to allow testing of the older bundle version"""
275 ui = op.repo.ui
275 ui = op.repo.ui
276 forcebundle1 = False
276 forcebundle1 = False
277 # The goal is this config is to allow developer to choose the bundle
277 # The goal is this config is to allow developer to choose the bundle
278 # version used during exchanged. This is especially handy during test.
278 # version used during exchanged. This is especially handy during test.
279 # Value is a list of bundle version to be picked from, highest version
279 # Value is a list of bundle version to be picked from, highest version
280 # should be used.
280 # should be used.
281 #
281 #
282 # developer config: devel.legacy.exchange
282 # developer config: devel.legacy.exchange
283 exchange = ui.configlist('devel', 'legacy.exchange')
283 exchange = ui.configlist('devel', 'legacy.exchange')
284 forcebundle1 = 'bundle2' not in exchange and 'bundle1' in exchange
284 forcebundle1 = 'bundle2' not in exchange and 'bundle1' in exchange
285 return forcebundle1 or not op.remote.capable('bundle2')
285 return forcebundle1 or not op.remote.capable('bundle2')
286
286
287 class pushoperation(object):
287 class pushoperation(object):
288 """A object that represent a single push operation
288 """A object that represent a single push operation
289
289
290 Its purpose is to carry push related state and very common operations.
290 Its purpose is to carry push related state and very common operations.
291
291
292 A new pushoperation should be created at the beginning of each push and
292 A new pushoperation should be created at the beginning of each push and
293 discarded afterward.
293 discarded afterward.
294 """
294 """
295
295
296 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
296 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
297 bookmarks=(), pushvars=None):
297 bookmarks=(), pushvars=None):
298 # repo we push from
298 # repo we push from
299 self.repo = repo
299 self.repo = repo
300 self.ui = repo.ui
300 self.ui = repo.ui
301 # repo we push to
301 # repo we push to
302 self.remote = remote
302 self.remote = remote
303 # force option provided
303 # force option provided
304 self.force = force
304 self.force = force
305 # revs to be pushed (None is "all")
305 # revs to be pushed (None is "all")
306 self.revs = revs
306 self.revs = revs
307 # bookmark explicitly pushed
307 # bookmark explicitly pushed
308 self.bookmarks = bookmarks
308 self.bookmarks = bookmarks
309 # allow push of new branch
309 # allow push of new branch
310 self.newbranch = newbranch
310 self.newbranch = newbranch
311 # step already performed
311 # step already performed
312 # (used to check what steps have been already performed through bundle2)
312 # (used to check what steps have been already performed through bundle2)
313 self.stepsdone = set()
313 self.stepsdone = set()
314 # Integer version of the changegroup push result
314 # Integer version of the changegroup push result
315 # - None means nothing to push
315 # - None means nothing to push
316 # - 0 means HTTP error
316 # - 0 means HTTP error
317 # - 1 means we pushed and remote head count is unchanged *or*
317 # - 1 means we pushed and remote head count is unchanged *or*
318 # we have outgoing changesets but refused to push
318 # we have outgoing changesets but refused to push
319 # - other values as described by addchangegroup()
319 # - other values as described by addchangegroup()
320 self.cgresult = None
320 self.cgresult = None
321 # Boolean value for the bookmark push
321 # Boolean value for the bookmark push
322 self.bkresult = None
322 self.bkresult = None
323 # discover.outgoing object (contains common and outgoing data)
323 # discover.outgoing object (contains common and outgoing data)
324 self.outgoing = None
324 self.outgoing = None
325 # all remote topological heads before the push
325 # all remote topological heads before the push
326 self.remoteheads = None
326 self.remoteheads = None
327 # Details of the remote branch pre and post push
327 # Details of the remote branch pre and post push
328 #
328 #
329 # mapping: {'branch': ([remoteheads],
329 # mapping: {'branch': ([remoteheads],
330 # [newheads],
330 # [newheads],
331 # [unsyncedheads],
331 # [unsyncedheads],
332 # [discardedheads])}
332 # [discardedheads])}
333 # - branch: the branch name
333 # - branch: the branch name
334 # - remoteheads: the list of remote heads known locally
334 # - remoteheads: the list of remote heads known locally
335 # None if the branch is new
335 # None if the branch is new
336 # - newheads: the new remote heads (known locally) with outgoing pushed
336 # - newheads: the new remote heads (known locally) with outgoing pushed
337 # - unsyncedheads: the list of remote heads unknown locally.
337 # - unsyncedheads: the list of remote heads unknown locally.
338 # - discardedheads: the list of remote heads made obsolete by the push
338 # - discardedheads: the list of remote heads made obsolete by the push
339 self.pushbranchmap = None
339 self.pushbranchmap = None
340 # testable as a boolean indicating if any nodes are missing locally.
340 # testable as a boolean indicating if any nodes are missing locally.
341 self.incoming = None
341 self.incoming = None
342 # phases changes that must be pushed along side the changesets
342 # phases changes that must be pushed along side the changesets
343 self.outdatedphases = None
343 self.outdatedphases = None
344 # phases changes that must be pushed if changeset push fails
344 # phases changes that must be pushed if changeset push fails
345 self.fallbackoutdatedphases = None
345 self.fallbackoutdatedphases = None
346 # outgoing obsmarkers
346 # outgoing obsmarkers
347 self.outobsmarkers = set()
347 self.outobsmarkers = set()
348 # outgoing bookmarks
348 # outgoing bookmarks
349 self.outbookmarks = []
349 self.outbookmarks = []
350 # transaction manager
350 # transaction manager
351 self.trmanager = None
351 self.trmanager = None
352 # map { pushkey partid -> callback handling failure}
352 # map { pushkey partid -> callback handling failure}
353 # used to handle exception from mandatory pushkey part failure
353 # used to handle exception from mandatory pushkey part failure
354 self.pkfailcb = {}
354 self.pkfailcb = {}
355 # an iterable of pushvars or None
355 # an iterable of pushvars or None
356 self.pushvars = pushvars
356 self.pushvars = pushvars
357
357
358 @util.propertycache
358 @util.propertycache
359 def futureheads(self):
359 def futureheads(self):
360 """future remote heads if the changeset push succeeds"""
360 """future remote heads if the changeset push succeeds"""
361 return self.outgoing.missingheads
361 return self.outgoing.missingheads
362
362
363 @util.propertycache
363 @util.propertycache
364 def fallbackheads(self):
364 def fallbackheads(self):
365 """future remote heads if the changeset push fails"""
365 """future remote heads if the changeset push fails"""
366 if self.revs is None:
366 if self.revs is None:
367 # not target to push, all common are relevant
367 # not target to push, all common are relevant
368 return self.outgoing.commonheads
368 return self.outgoing.commonheads
369 unfi = self.repo.unfiltered()
369 unfi = self.repo.unfiltered()
370 # I want cheads = heads(::missingheads and ::commonheads)
370 # I want cheads = heads(::missingheads and ::commonheads)
371 # (missingheads is revs with secret changeset filtered out)
371 # (missingheads is revs with secret changeset filtered out)
372 #
372 #
373 # This can be expressed as:
373 # This can be expressed as:
374 # cheads = ( (missingheads and ::commonheads)
374 # cheads = ( (missingheads and ::commonheads)
375 # + (commonheads and ::missingheads))"
375 # + (commonheads and ::missingheads))"
376 # )
376 # )
377 #
377 #
378 # while trying to push we already computed the following:
378 # while trying to push we already computed the following:
379 # common = (::commonheads)
379 # common = (::commonheads)
380 # missing = ((commonheads::missingheads) - commonheads)
380 # missing = ((commonheads::missingheads) - commonheads)
381 #
381 #
382 # We can pick:
382 # We can pick:
383 # * missingheads part of common (::commonheads)
383 # * missingheads part of common (::commonheads)
384 common = self.outgoing.common
384 common = self.outgoing.common
385 nm = self.repo.changelog.nodemap
385 nm = self.repo.changelog.nodemap
386 cheads = [node for node in self.revs if nm[node] in common]
386 cheads = [node for node in self.revs if nm[node] in common]
387 # and
387 # and
388 # * commonheads parents on missing
388 # * commonheads parents on missing
389 revset = unfi.set('%ln and parents(roots(%ln))',
389 revset = unfi.set('%ln and parents(roots(%ln))',
390 self.outgoing.commonheads,
390 self.outgoing.commonheads,
391 self.outgoing.missing)
391 self.outgoing.missing)
392 cheads.extend(c.node() for c in revset)
392 cheads.extend(c.node() for c in revset)
393 return cheads
393 return cheads
394
394
395 @property
395 @property
396 def commonheads(self):
396 def commonheads(self):
397 """set of all common heads after changeset bundle push"""
397 """set of all common heads after changeset bundle push"""
398 if self.cgresult:
398 if self.cgresult:
399 return self.futureheads
399 return self.futureheads
400 else:
400 else:
401 return self.fallbackheads
401 return self.fallbackheads
402
402
403 # mapping of message used when pushing bookmark
403 # mapping of message used when pushing bookmark
404 bookmsgmap = {'update': (_("updating bookmark %s\n"),
404 bookmsgmap = {'update': (_("updating bookmark %s\n"),
405 _('updating bookmark %s failed!\n')),
405 _('updating bookmark %s failed!\n')),
406 'export': (_("exporting bookmark %s\n"),
406 'export': (_("exporting bookmark %s\n"),
407 _('exporting bookmark %s failed!\n')),
407 _('exporting bookmark %s failed!\n')),
408 'delete': (_("deleting remote bookmark %s\n"),
408 'delete': (_("deleting remote bookmark %s\n"),
409 _('deleting remote bookmark %s failed!\n')),
409 _('deleting remote bookmark %s failed!\n')),
410 }
410 }
411
411
412
412
413 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=(),
413 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=(),
414 opargs=None):
414 opargs=None):
415 '''Push outgoing changesets (limited by revs) from a local
415 '''Push outgoing changesets (limited by revs) from a local
416 repository to remote. Return an integer:
416 repository to remote. Return an integer:
417 - None means nothing to push
417 - None means nothing to push
418 - 0 means HTTP error
418 - 0 means HTTP error
419 - 1 means we pushed and remote head count is unchanged *or*
419 - 1 means we pushed and remote head count is unchanged *or*
420 we have outgoing changesets but refused to push
420 we have outgoing changesets but refused to push
421 - other values as described by addchangegroup()
421 - other values as described by addchangegroup()
422 '''
422 '''
423 if opargs is None:
423 if opargs is None:
424 opargs = {}
424 opargs = {}
425 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks,
425 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks,
426 **opargs)
426 **opargs)
427 if pushop.remote.local():
427 if pushop.remote.local():
428 missing = (set(pushop.repo.requirements)
428 missing = (set(pushop.repo.requirements)
429 - pushop.remote.local().supported)
429 - pushop.remote.local().supported)
430 if missing:
430 if missing:
431 msg = _("required features are not"
431 msg = _("required features are not"
432 " supported in the destination:"
432 " supported in the destination:"
433 " %s") % (', '.join(sorted(missing)))
433 " %s") % (', '.join(sorted(missing)))
434 raise error.Abort(msg)
434 raise error.Abort(msg)
435
435
436 if not pushop.remote.canpush():
436 if not pushop.remote.canpush():
437 raise error.Abort(_("destination does not support push"))
437 raise error.Abort(_("destination does not support push"))
438
438
439 if not pushop.remote.capable('unbundle'):
439 if not pushop.remote.capable('unbundle'):
440 raise error.Abort(_('cannot push: destination does not support the '
440 raise error.Abort(_('cannot push: destination does not support the '
441 'unbundle wire protocol command'))
441 'unbundle wire protocol command'))
442
442
443 # get lock as we might write phase data
443 # get lock as we might write phase data
444 wlock = lock = None
444 wlock = lock = None
445 try:
445 try:
446 # bundle2 push may receive a reply bundle touching bookmarks or other
446 # bundle2 push may receive a reply bundle touching bookmarks or other
447 # things requiring the wlock. Take it now to ensure proper ordering.
447 # things requiring the wlock. Take it now to ensure proper ordering.
448 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
448 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
449 if (not _forcebundle1(pushop)) and maypushback:
449 if (not _forcebundle1(pushop)) and maypushback:
450 wlock = pushop.repo.wlock()
450 wlock = pushop.repo.wlock()
451 lock = pushop.repo.lock()
451 lock = pushop.repo.lock()
452 pushop.trmanager = transactionmanager(pushop.repo,
452 pushop.trmanager = transactionmanager(pushop.repo,
453 'push-response',
453 'push-response',
454 pushop.remote.url())
454 pushop.remote.url())
455 except IOError as err:
455 except IOError as err:
456 if err.errno != errno.EACCES:
456 if err.errno != errno.EACCES:
457 raise
457 raise
458 # source repo cannot be locked.
458 # source repo cannot be locked.
459 # We do not abort the push, but just disable the local phase
459 # We do not abort the push, but just disable the local phase
460 # synchronisation.
460 # synchronisation.
461 msg = 'cannot lock source repository: %s\n' % err
461 msg = 'cannot lock source repository: %s\n' % err
462 pushop.ui.debug(msg)
462 pushop.ui.debug(msg)
463
463
464 with wlock or util.nullcontextmanager(), \
464 with wlock or util.nullcontextmanager(), \
465 lock or util.nullcontextmanager(), \
465 lock or util.nullcontextmanager(), \
466 pushop.trmanager or util.nullcontextmanager():
466 pushop.trmanager or util.nullcontextmanager():
467 pushop.repo.checkpush(pushop)
467 pushop.repo.checkpush(pushop)
468 _pushdiscovery(pushop)
468 _pushdiscovery(pushop)
469 if not _forcebundle1(pushop):
469 if not _forcebundle1(pushop):
470 _pushbundle2(pushop)
470 _pushbundle2(pushop)
471 _pushchangeset(pushop)
471 _pushchangeset(pushop)
472 _pushsyncphase(pushop)
472 _pushsyncphase(pushop)
473 _pushobsolete(pushop)
473 _pushobsolete(pushop)
474 _pushbookmark(pushop)
474 _pushbookmark(pushop)
475
475
476 return pushop
476 return pushop
477
477
478 # list of steps to perform discovery before push
478 # list of steps to perform discovery before push
479 pushdiscoveryorder = []
479 pushdiscoveryorder = []
480
480
481 # Mapping between step name and function
481 # Mapping between step name and function
482 #
482 #
483 # This exists to help extensions wrap steps if necessary
483 # This exists to help extensions wrap steps if necessary
484 pushdiscoverymapping = {}
484 pushdiscoverymapping = {}
485
485
486 def pushdiscovery(stepname):
486 def pushdiscovery(stepname):
487 """decorator for function performing discovery before push
487 """decorator for function performing discovery before push
488
488
489 The function is added to the step -> function mapping and appended to the
489 The function is added to the step -> function mapping and appended to the
490 list of steps. Beware that decorated function will be added in order (this
490 list of steps. Beware that decorated function will be added in order (this
491 may matter).
491 may matter).
492
492
493 You can only use this decorator for a new step, if you want to wrap a step
493 You can only use this decorator for a new step, if you want to wrap a step
494 from an extension, change the pushdiscovery dictionary directly."""
494 from an extension, change the pushdiscovery dictionary directly."""
495 def dec(func):
495 def dec(func):
496 assert stepname not in pushdiscoverymapping
496 assert stepname not in pushdiscoverymapping
497 pushdiscoverymapping[stepname] = func
497 pushdiscoverymapping[stepname] = func
498 pushdiscoveryorder.append(stepname)
498 pushdiscoveryorder.append(stepname)
499 return func
499 return func
500 return dec
500 return dec
501
501
502 def _pushdiscovery(pushop):
502 def _pushdiscovery(pushop):
503 """Run all discovery steps"""
503 """Run all discovery steps"""
504 for stepname in pushdiscoveryorder:
504 for stepname in pushdiscoveryorder:
505 step = pushdiscoverymapping[stepname]
505 step = pushdiscoverymapping[stepname]
506 step(pushop)
506 step(pushop)
507
507
508 @pushdiscovery('changeset')
508 @pushdiscovery('changeset')
509 def _pushdiscoverychangeset(pushop):
509 def _pushdiscoverychangeset(pushop):
510 """discover the changeset that need to be pushed"""
510 """discover the changeset that need to be pushed"""
511 fci = discovery.findcommonincoming
511 fci = discovery.findcommonincoming
512 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
512 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
513 common, inc, remoteheads = commoninc
513 common, inc, remoteheads = commoninc
514 fco = discovery.findcommonoutgoing
514 fco = discovery.findcommonoutgoing
515 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
515 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
516 commoninc=commoninc, force=pushop.force)
516 commoninc=commoninc, force=pushop.force)
517 pushop.outgoing = outgoing
517 pushop.outgoing = outgoing
518 pushop.remoteheads = remoteheads
518 pushop.remoteheads = remoteheads
519 pushop.incoming = inc
519 pushop.incoming = inc
520
520
521 @pushdiscovery('phase')
521 @pushdiscovery('phase')
522 def _pushdiscoveryphase(pushop):
522 def _pushdiscoveryphase(pushop):
523 """discover the phase that needs to be pushed
523 """discover the phase that needs to be pushed
524
524
525 (computed for both success and failure case for changesets push)"""
525 (computed for both success and failure case for changesets push)"""
526 outgoing = pushop.outgoing
526 outgoing = pushop.outgoing
527 unfi = pushop.repo.unfiltered()
527 unfi = pushop.repo.unfiltered()
528 remotephases = pushop.remote.listkeys('phases')
528 remotephases = pushop.remote.listkeys('phases')
529 publishing = remotephases.get('publishing', False)
529 publishing = remotephases.get('publishing', False)
530 if (pushop.ui.configbool('ui', '_usedassubrepo')
530 if (pushop.ui.configbool('ui', '_usedassubrepo')
531 and remotephases # server supports phases
531 and remotephases # server supports phases
532 and not pushop.outgoing.missing # no changesets to be pushed
532 and not pushop.outgoing.missing # no changesets to be pushed
533 and publishing):
533 and publishing):
534 # When:
534 # When:
535 # - this is a subrepo push
535 # - this is a subrepo push
536 # - and remote support phase
536 # - and remote support phase
537 # - and no changeset are to be pushed
537 # - and no changeset are to be pushed
538 # - and remote is publishing
538 # - and remote is publishing
539 # We may be in issue 3871 case!
539 # We may be in issue 3871 case!
540 # We drop the possible phase synchronisation done by
540 # We drop the possible phase synchronisation done by
541 # courtesy to publish changesets possibly locally draft
541 # courtesy to publish changesets possibly locally draft
542 # on the remote.
542 # on the remote.
543 remotephases = {'publishing': 'True'}
543 remotephases = {'publishing': 'True'}
544 ana = phases.analyzeremotephases(pushop.repo,
544 ana = phases.analyzeremotephases(pushop.repo,
545 pushop.fallbackheads,
545 pushop.fallbackheads,
546 remotephases)
546 remotephases)
547 pheads, droots = ana
547 pheads, droots = ana
548 extracond = ''
548 extracond = ''
549 if not publishing:
549 if not publishing:
550 extracond = ' and public()'
550 extracond = ' and public()'
551 revset = 'heads((%%ln::%%ln) %s)' % extracond
551 revset = 'heads((%%ln::%%ln) %s)' % extracond
552 # Get the list of all revs draft on remote by public here.
552 # Get the list of all revs draft on remote by public here.
553 # XXX Beware that revset break if droots is not strictly
553 # XXX Beware that revset break if droots is not strictly
554 # XXX root we may want to ensure it is but it is costly
554 # XXX root we may want to ensure it is but it is costly
555 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
555 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
556 if not outgoing.missing:
556 if not outgoing.missing:
557 future = fallback
557 future = fallback
558 else:
558 else:
559 # adds changeset we are going to push as draft
559 # adds changeset we are going to push as draft
560 #
560 #
561 # should not be necessary for publishing server, but because of an
561 # should not be necessary for publishing server, but because of an
562 # issue fixed in xxxxx we have to do it anyway.
562 # issue fixed in xxxxx we have to do it anyway.
563 fdroots = list(unfi.set('roots(%ln + %ln::)',
563 fdroots = list(unfi.set('roots(%ln + %ln::)',
564 outgoing.missing, droots))
564 outgoing.missing, droots))
565 fdroots = [f.node() for f in fdroots]
565 fdroots = [f.node() for f in fdroots]
566 future = list(unfi.set(revset, fdroots, pushop.futureheads))
566 future = list(unfi.set(revset, fdroots, pushop.futureheads))
567 pushop.outdatedphases = future
567 pushop.outdatedphases = future
568 pushop.fallbackoutdatedphases = fallback
568 pushop.fallbackoutdatedphases = fallback
569
569
570 @pushdiscovery('obsmarker')
570 @pushdiscovery('obsmarker')
571 def _pushdiscoveryobsmarkers(pushop):
571 def _pushdiscoveryobsmarkers(pushop):
572 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
572 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
573 and pushop.repo.obsstore
573 and pushop.repo.obsstore
574 and 'obsolete' in pushop.remote.listkeys('namespaces')):
574 and 'obsolete' in pushop.remote.listkeys('namespaces')):
575 repo = pushop.repo
575 repo = pushop.repo
576 # very naive computation, that can be quite expensive on big repo.
576 # very naive computation, that can be quite expensive on big repo.
577 # However: evolution is currently slow on them anyway.
577 # However: evolution is currently slow on them anyway.
578 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
578 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
579 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
579 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
580
580
581 @pushdiscovery('bookmarks')
581 @pushdiscovery('bookmarks')
582 def _pushdiscoverybookmarks(pushop):
582 def _pushdiscoverybookmarks(pushop):
583 ui = pushop.ui
583 ui = pushop.ui
584 repo = pushop.repo.unfiltered()
584 repo = pushop.repo.unfiltered()
585 remote = pushop.remote
585 remote = pushop.remote
586 ui.debug("checking for updated bookmarks\n")
586 ui.debug("checking for updated bookmarks\n")
587 ancestors = ()
587 ancestors = ()
588 if pushop.revs:
588 if pushop.revs:
589 revnums = map(repo.changelog.rev, pushop.revs)
589 revnums = map(repo.changelog.rev, pushop.revs)
590 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
590 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
591 remotebookmark = remote.listkeys('bookmarks')
591 remotebookmark = remote.listkeys('bookmarks')
592
592
593 explicit = set([repo._bookmarks.expandname(bookmark)
593 explicit = set([repo._bookmarks.expandname(bookmark)
594 for bookmark in pushop.bookmarks])
594 for bookmark in pushop.bookmarks])
595
595
596 remotebookmark = bookmod.unhexlifybookmarks(remotebookmark)
596 remotebookmark = bookmod.unhexlifybookmarks(remotebookmark)
597 comp = bookmod.comparebookmarks(repo, repo._bookmarks, remotebookmark)
597 comp = bookmod.comparebookmarks(repo, repo._bookmarks, remotebookmark)
598
598
599 def safehex(x):
599 def safehex(x):
600 if x is None:
600 if x is None:
601 return x
601 return x
602 return hex(x)
602 return hex(x)
603
603
604 def hexifycompbookmarks(bookmarks):
604 def hexifycompbookmarks(bookmarks):
605 for b, scid, dcid in bookmarks:
605 for b, scid, dcid in bookmarks:
606 yield b, safehex(scid), safehex(dcid)
606 yield b, safehex(scid), safehex(dcid)
607
607
608 comp = [hexifycompbookmarks(marks) for marks in comp]
608 comp = [hexifycompbookmarks(marks) for marks in comp]
609 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
609 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
610
610
611 for b, scid, dcid in advsrc:
611 for b, scid, dcid in advsrc:
612 if b in explicit:
612 if b in explicit:
613 explicit.remove(b)
613 explicit.remove(b)
614 if not ancestors or repo[scid].rev() in ancestors:
614 if not ancestors or repo[scid].rev() in ancestors:
615 pushop.outbookmarks.append((b, dcid, scid))
615 pushop.outbookmarks.append((b, dcid, scid))
616 # search added bookmark
616 # search added bookmark
617 for b, scid, dcid in addsrc:
617 for b, scid, dcid in addsrc:
618 if b in explicit:
618 if b in explicit:
619 explicit.remove(b)
619 explicit.remove(b)
620 pushop.outbookmarks.append((b, '', scid))
620 pushop.outbookmarks.append((b, '', scid))
621 # search for overwritten bookmark
621 # search for overwritten bookmark
622 for b, scid, dcid in list(advdst) + list(diverge) + list(differ):
622 for b, scid, dcid in list(advdst) + list(diverge) + list(differ):
623 if b in explicit:
623 if b in explicit:
624 explicit.remove(b)
624 explicit.remove(b)
625 pushop.outbookmarks.append((b, dcid, scid))
625 pushop.outbookmarks.append((b, dcid, scid))
626 # search for bookmark to delete
626 # search for bookmark to delete
627 for b, scid, dcid in adddst:
627 for b, scid, dcid in adddst:
628 if b in explicit:
628 if b in explicit:
629 explicit.remove(b)
629 explicit.remove(b)
630 # treat as "deleted locally"
630 # treat as "deleted locally"
631 pushop.outbookmarks.append((b, dcid, ''))
631 pushop.outbookmarks.append((b, dcid, ''))
632 # identical bookmarks shouldn't get reported
632 # identical bookmarks shouldn't get reported
633 for b, scid, dcid in same:
633 for b, scid, dcid in same:
634 if b in explicit:
634 if b in explicit:
635 explicit.remove(b)
635 explicit.remove(b)
636
636
637 if explicit:
637 if explicit:
638 explicit = sorted(explicit)
638 explicit = sorted(explicit)
639 # we should probably list all of them
639 # we should probably list all of them
640 ui.warn(_('bookmark %s does not exist on the local '
640 ui.warn(_('bookmark %s does not exist on the local '
641 'or remote repository!\n') % explicit[0])
641 'or remote repository!\n') % explicit[0])
642 pushop.bkresult = 2
642 pushop.bkresult = 2
643
643
644 pushop.outbookmarks.sort()
644 pushop.outbookmarks.sort()
645
645
646 def _pushcheckoutgoing(pushop):
646 def _pushcheckoutgoing(pushop):
647 outgoing = pushop.outgoing
647 outgoing = pushop.outgoing
648 unfi = pushop.repo.unfiltered()
648 unfi = pushop.repo.unfiltered()
649 if not outgoing.missing:
649 if not outgoing.missing:
650 # nothing to push
650 # nothing to push
651 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
651 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
652 return False
652 return False
653 # something to push
653 # something to push
654 if not pushop.force:
654 if not pushop.force:
655 # if repo.obsstore == False --> no obsolete
655 # if repo.obsstore == False --> no obsolete
656 # then, save the iteration
656 # then, save the iteration
657 if unfi.obsstore:
657 if unfi.obsstore:
658 # this message are here for 80 char limit reason
658 # this message are here for 80 char limit reason
659 mso = _("push includes obsolete changeset: %s!")
659 mso = _("push includes obsolete changeset: %s!")
660 mspd = _("push includes phase-divergent changeset: %s!")
660 mspd = _("push includes phase-divergent changeset: %s!")
661 mscd = _("push includes content-divergent changeset: %s!")
661 mscd = _("push includes content-divergent changeset: %s!")
662 mst = {"orphan": _("push includes orphan changeset: %s!"),
662 mst = {"orphan": _("push includes orphan changeset: %s!"),
663 "phase-divergent": mspd,
663 "phase-divergent": mspd,
664 "content-divergent": mscd}
664 "content-divergent": mscd}
665 # If we are to push if there is at least one
665 # If we are to push if there is at least one
666 # obsolete or unstable changeset in missing, at
666 # obsolete or unstable changeset in missing, at
667 # least one of the missinghead will be obsolete or
667 # least one of the missinghead will be obsolete or
668 # unstable. So checking heads only is ok
668 # unstable. So checking heads only is ok
669 for node in outgoing.missingheads:
669 for node in outgoing.missingheads:
670 ctx = unfi[node]
670 ctx = unfi[node]
671 if ctx.obsolete():
671 if ctx.obsolete():
672 raise error.Abort(mso % ctx)
672 raise error.Abort(mso % ctx)
673 elif ctx.isunstable():
673 elif ctx.isunstable():
674 # TODO print more than one instability in the abort
674 # TODO print more than one instability in the abort
675 # message
675 # message
676 raise error.Abort(mst[ctx.instabilities()[0]] % ctx)
676 raise error.Abort(mst[ctx.instabilities()[0]] % ctx)
677
677
678 discovery.checkheads(pushop)
678 discovery.checkheads(pushop)
679 return True
679 return True
680
680
681 # List of names of steps to perform for an outgoing bundle2, order matters.
681 # List of names of steps to perform for an outgoing bundle2, order matters.
682 b2partsgenorder = []
682 b2partsgenorder = []
683
683
684 # Mapping between step name and function
684 # Mapping between step name and function
685 #
685 #
686 # This exists to help extensions wrap steps if necessary
686 # This exists to help extensions wrap steps if necessary
687 b2partsgenmapping = {}
687 b2partsgenmapping = {}
688
688
689 def b2partsgenerator(stepname, idx=None):
689 def b2partsgenerator(stepname, idx=None):
690 """decorator for function generating bundle2 part
690 """decorator for function generating bundle2 part
691
691
692 The function is added to the step -> function mapping and appended to the
692 The function is added to the step -> function mapping and appended to the
693 list of steps. Beware that decorated functions will be added in order
693 list of steps. Beware that decorated functions will be added in order
694 (this may matter).
694 (this may matter).
695
695
696 You can only use this decorator for new steps, if you want to wrap a step
696 You can only use this decorator for new steps, if you want to wrap a step
697 from an extension, attack the b2partsgenmapping dictionary directly."""
697 from an extension, attack the b2partsgenmapping dictionary directly."""
698 def dec(func):
698 def dec(func):
699 assert stepname not in b2partsgenmapping
699 assert stepname not in b2partsgenmapping
700 b2partsgenmapping[stepname] = func
700 b2partsgenmapping[stepname] = func
701 if idx is None:
701 if idx is None:
702 b2partsgenorder.append(stepname)
702 b2partsgenorder.append(stepname)
703 else:
703 else:
704 b2partsgenorder.insert(idx, stepname)
704 b2partsgenorder.insert(idx, stepname)
705 return func
705 return func
706 return dec
706 return dec
707
707
708 def _pushb2ctxcheckheads(pushop, bundler):
708 def _pushb2ctxcheckheads(pushop, bundler):
709 """Generate race condition checking parts
709 """Generate race condition checking parts
710
710
711 Exists as an independent function to aid extensions
711 Exists as an independent function to aid extensions
712 """
712 """
713 # * 'force' do not check for push race,
713 # * 'force' do not check for push race,
714 # * if we don't push anything, there are nothing to check.
714 # * if we don't push anything, there are nothing to check.
715 if not pushop.force and pushop.outgoing.missingheads:
715 if not pushop.force and pushop.outgoing.missingheads:
716 allowunrelated = 'related' in bundler.capabilities.get('checkheads', ())
716 allowunrelated = 'related' in bundler.capabilities.get('checkheads', ())
717 emptyremote = pushop.pushbranchmap is None
717 emptyremote = pushop.pushbranchmap is None
718 if not allowunrelated or emptyremote:
718 if not allowunrelated or emptyremote:
719 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
719 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
720 else:
720 else:
721 affected = set()
721 affected = set()
722 for branch, heads in pushop.pushbranchmap.iteritems():
722 for branch, heads in pushop.pushbranchmap.iteritems():
723 remoteheads, newheads, unsyncedheads, discardedheads = heads
723 remoteheads, newheads, unsyncedheads, discardedheads = heads
724 if remoteheads is not None:
724 if remoteheads is not None:
725 remote = set(remoteheads)
725 remote = set(remoteheads)
726 affected |= set(discardedheads) & remote
726 affected |= set(discardedheads) & remote
727 affected |= remote - set(newheads)
727 affected |= remote - set(newheads)
728 if affected:
728 if affected:
729 data = iter(sorted(affected))
729 data = iter(sorted(affected))
730 bundler.newpart('check:updated-heads', data=data)
730 bundler.newpart('check:updated-heads', data=data)
731
731
732 @b2partsgenerator('changeset')
732 @b2partsgenerator('changeset')
733 def _pushb2ctx(pushop, bundler):
733 def _pushb2ctx(pushop, bundler):
734 """handle changegroup push through bundle2
734 """handle changegroup push through bundle2
735
735
736 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
736 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
737 """
737 """
738 if 'changesets' in pushop.stepsdone:
738 if 'changesets' in pushop.stepsdone:
739 return
739 return
740 pushop.stepsdone.add('changesets')
740 pushop.stepsdone.add('changesets')
741 # Send known heads to the server for race detection.
741 # Send known heads to the server for race detection.
742 if not _pushcheckoutgoing(pushop):
742 if not _pushcheckoutgoing(pushop):
743 return
743 return
744 pushop.repo.prepushoutgoinghooks(pushop)
744 pushop.repo.prepushoutgoinghooks(pushop)
745
745
746 _pushb2ctxcheckheads(pushop, bundler)
746 _pushb2ctxcheckheads(pushop, bundler)
747
747
748 b2caps = bundle2.bundle2caps(pushop.remote)
748 b2caps = bundle2.bundle2caps(pushop.remote)
749 version = '01'
749 version = '01'
750 cgversions = b2caps.get('changegroup')
750 cgversions = b2caps.get('changegroup')
751 if cgversions: # 3.1 and 3.2 ship with an empty value
751 if cgversions: # 3.1 and 3.2 ship with an empty value
752 cgversions = [v for v in cgversions
752 cgversions = [v for v in cgversions
753 if v in changegroup.supportedoutgoingversions(
753 if v in changegroup.supportedoutgoingversions(
754 pushop.repo)]
754 pushop.repo)]
755 if not cgversions:
755 if not cgversions:
756 raise ValueError(_('no common changegroup version'))
756 raise ValueError(_('no common changegroup version'))
757 version = max(cgversions)
757 version = max(cgversions)
758 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
758 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
759 pushop.outgoing,
759 pushop.outgoing,
760 version=version)
760 version=version)
761 cgpart = bundler.newpart('changegroup', data=cg)
761 cgpart = bundler.newpart('changegroup', data=cg)
762 if cgversions:
762 if cgversions:
763 cgpart.addparam('version', version)
763 cgpart.addparam('version', version)
764 if 'treemanifest' in pushop.repo.requirements:
764 if 'treemanifest' in pushop.repo.requirements:
765 cgpart.addparam('treemanifest', '1')
765 cgpart.addparam('treemanifest', '1')
766 def handlereply(op):
766 def handlereply(op):
767 """extract addchangegroup returns from server reply"""
767 """extract addchangegroup returns from server reply"""
768 cgreplies = op.records.getreplies(cgpart.id)
768 cgreplies = op.records.getreplies(cgpart.id)
769 assert len(cgreplies['changegroup']) == 1
769 assert len(cgreplies['changegroup']) == 1
770 pushop.cgresult = cgreplies['changegroup'][0]['return']
770 pushop.cgresult = cgreplies['changegroup'][0]['return']
771 return handlereply
771 return handlereply
772
772
773 @b2partsgenerator('phase')
773 @b2partsgenerator('phase')
774 def _pushb2phases(pushop, bundler):
774 def _pushb2phases(pushop, bundler):
775 """handle phase push through bundle2"""
775 """handle phase push through bundle2"""
776 if 'phases' in pushop.stepsdone:
776 if 'phases' in pushop.stepsdone:
777 return
777 return
778 b2caps = bundle2.bundle2caps(pushop.remote)
778 b2caps = bundle2.bundle2caps(pushop.remote)
779 if not 'pushkey' in b2caps:
779 if not 'pushkey' in b2caps:
780 return
780 return
781 pushop.stepsdone.add('phases')
781 pushop.stepsdone.add('phases')
782 part2node = []
782 part2node = []
783
783
784 def handlefailure(pushop, exc):
784 def handlefailure(pushop, exc):
785 targetid = int(exc.partid)
785 targetid = int(exc.partid)
786 for partid, node in part2node:
786 for partid, node in part2node:
787 if partid == targetid:
787 if partid == targetid:
788 raise error.Abort(_('updating %s to public failed') % node)
788 raise error.Abort(_('updating %s to public failed') % node)
789
789
790 enc = pushkey.encode
790 enc = pushkey.encode
791 for newremotehead in pushop.outdatedphases:
791 for newremotehead in pushop.outdatedphases:
792 part = bundler.newpart('pushkey')
792 part = bundler.newpart('pushkey')
793 part.addparam('namespace', enc('phases'))
793 part.addparam('namespace', enc('phases'))
794 part.addparam('key', enc(newremotehead.hex()))
794 part.addparam('key', enc(newremotehead.hex()))
795 part.addparam('old', enc(str(phases.draft)))
795 part.addparam('old', enc(str(phases.draft)))
796 part.addparam('new', enc(str(phases.public)))
796 part.addparam('new', enc(str(phases.public)))
797 part2node.append((part.id, newremotehead))
797 part2node.append((part.id, newremotehead))
798 pushop.pkfailcb[part.id] = handlefailure
798 pushop.pkfailcb[part.id] = handlefailure
799
799
800 def handlereply(op):
800 def handlereply(op):
801 for partid, node in part2node:
801 for partid, node in part2node:
802 partrep = op.records.getreplies(partid)
802 partrep = op.records.getreplies(partid)
803 results = partrep['pushkey']
803 results = partrep['pushkey']
804 assert len(results) <= 1
804 assert len(results) <= 1
805 msg = None
805 msg = None
806 if not results:
806 if not results:
807 msg = _('server ignored update of %s to public!\n') % node
807 msg = _('server ignored update of %s to public!\n') % node
808 elif not int(results[0]['return']):
808 elif not int(results[0]['return']):
809 msg = _('updating %s to public failed!\n') % node
809 msg = _('updating %s to public failed!\n') % node
810 if msg is not None:
810 if msg is not None:
811 pushop.ui.warn(msg)
811 pushop.ui.warn(msg)
812 return handlereply
812 return handlereply
813
813
814 @b2partsgenerator('obsmarkers')
814 @b2partsgenerator('obsmarkers')
815 def _pushb2obsmarkers(pushop, bundler):
815 def _pushb2obsmarkers(pushop, bundler):
816 if 'obsmarkers' in pushop.stepsdone:
816 if 'obsmarkers' in pushop.stepsdone:
817 return
817 return
818 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
818 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
819 if obsolete.commonversion(remoteversions) is None:
819 if obsolete.commonversion(remoteversions) is None:
820 return
820 return
821 pushop.stepsdone.add('obsmarkers')
821 pushop.stepsdone.add('obsmarkers')
822 if pushop.outobsmarkers:
822 if pushop.outobsmarkers:
823 markers = sorted(pushop.outobsmarkers)
823 markers = sorted(pushop.outobsmarkers)
824 bundle2.buildobsmarkerspart(bundler, markers)
824 bundle2.buildobsmarkerspart(bundler, markers)
825
825
826 @b2partsgenerator('bookmarks')
826 @b2partsgenerator('bookmarks')
827 def _pushb2bookmarks(pushop, bundler):
827 def _pushb2bookmarks(pushop, bundler):
828 """handle bookmark push through bundle2"""
828 """handle bookmark push through bundle2"""
829 if 'bookmarks' in pushop.stepsdone:
829 if 'bookmarks' in pushop.stepsdone:
830 return
830 return
831 b2caps = bundle2.bundle2caps(pushop.remote)
831 b2caps = bundle2.bundle2caps(pushop.remote)
832 if 'pushkey' not in b2caps:
832 if 'pushkey' not in b2caps:
833 return
833 return
834 pushop.stepsdone.add('bookmarks')
834 pushop.stepsdone.add('bookmarks')
835 part2book = []
835 part2book = []
836 enc = pushkey.encode
836 enc = pushkey.encode
837
837
838 def handlefailure(pushop, exc):
838 def handlefailure(pushop, exc):
839 targetid = int(exc.partid)
839 targetid = int(exc.partid)
840 for partid, book, action in part2book:
840 for partid, book, action in part2book:
841 if partid == targetid:
841 if partid == targetid:
842 raise error.Abort(bookmsgmap[action][1].rstrip() % book)
842 raise error.Abort(bookmsgmap[action][1].rstrip() % book)
843 # we should not be called for part we did not generated
843 # we should not be called for part we did not generated
844 assert False
844 assert False
845
845
846 for book, old, new in pushop.outbookmarks:
846 for book, old, new in pushop.outbookmarks:
847 part = bundler.newpart('pushkey')
847 part = bundler.newpart('pushkey')
848 part.addparam('namespace', enc('bookmarks'))
848 part.addparam('namespace', enc('bookmarks'))
849 part.addparam('key', enc(book))
849 part.addparam('key', enc(book))
850 part.addparam('old', enc(old))
850 part.addparam('old', enc(old))
851 part.addparam('new', enc(new))
851 part.addparam('new', enc(new))
852 action = 'update'
852 action = 'update'
853 if not old:
853 if not old:
854 action = 'export'
854 action = 'export'
855 elif not new:
855 elif not new:
856 action = 'delete'
856 action = 'delete'
857 part2book.append((part.id, book, action))
857 part2book.append((part.id, book, action))
858 pushop.pkfailcb[part.id] = handlefailure
858 pushop.pkfailcb[part.id] = handlefailure
859
859
860 def handlereply(op):
860 def handlereply(op):
861 ui = pushop.ui
861 ui = pushop.ui
862 for partid, book, action in part2book:
862 for partid, book, action in part2book:
863 partrep = op.records.getreplies(partid)
863 partrep = op.records.getreplies(partid)
864 results = partrep['pushkey']
864 results = partrep['pushkey']
865 assert len(results) <= 1
865 assert len(results) <= 1
866 if not results:
866 if not results:
867 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
867 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
868 else:
868 else:
869 ret = int(results[0]['return'])
869 ret = int(results[0]['return'])
870 if ret:
870 if ret:
871 ui.status(bookmsgmap[action][0] % book)
871 ui.status(bookmsgmap[action][0] % book)
872 else:
872 else:
873 ui.warn(bookmsgmap[action][1] % book)
873 ui.warn(bookmsgmap[action][1] % book)
874 if pushop.bkresult is not None:
874 if pushop.bkresult is not None:
875 pushop.bkresult = 1
875 pushop.bkresult = 1
876 return handlereply
876 return handlereply
877
877
878 @b2partsgenerator('pushvars', idx=0)
878 @b2partsgenerator('pushvars', idx=0)
879 def _getbundlesendvars(pushop, bundler):
879 def _getbundlesendvars(pushop, bundler):
880 '''send shellvars via bundle2'''
880 '''send shellvars via bundle2'''
881 pushvars = pushop.pushvars
881 pushvars = pushop.pushvars
882 if pushvars:
882 if pushvars:
883 shellvars = {}
883 shellvars = {}
884 for raw in pushvars:
884 for raw in pushvars:
885 if '=' not in raw:
885 if '=' not in raw:
886 msg = ("unable to parse variable '%s', should follow "
886 msg = ("unable to parse variable '%s', should follow "
887 "'KEY=VALUE' or 'KEY=' format")
887 "'KEY=VALUE' or 'KEY=' format")
888 raise error.Abort(msg % raw)
888 raise error.Abort(msg % raw)
889 k, v = raw.split('=', 1)
889 k, v = raw.split('=', 1)
890 shellvars[k] = v
890 shellvars[k] = v
891
891
892 part = bundler.newpart('pushvars')
892 part = bundler.newpart('pushvars')
893
893
894 for key, value in shellvars.iteritems():
894 for key, value in shellvars.iteritems():
895 part.addparam(key, value, mandatory=False)
895 part.addparam(key, value, mandatory=False)
896
896
897 def _pushbundle2(pushop):
897 def _pushbundle2(pushop):
898 """push data to the remote using bundle2
898 """push data to the remote using bundle2
899
899
900 The only currently supported type of data is changegroup but this will
900 The only currently supported type of data is changegroup but this will
901 evolve in the future."""
901 evolve in the future."""
902 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
902 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
903 pushback = (pushop.trmanager
903 pushback = (pushop.trmanager
904 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
904 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
905
905
906 # create reply capability
906 # create reply capability
907 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
907 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
908 allowpushback=pushback))
908 allowpushback=pushback))
909 bundler.newpart('replycaps', data=capsblob)
909 bundler.newpart('replycaps', data=capsblob)
910 replyhandlers = []
910 replyhandlers = []
911 for partgenname in b2partsgenorder:
911 for partgenname in b2partsgenorder:
912 partgen = b2partsgenmapping[partgenname]
912 partgen = b2partsgenmapping[partgenname]
913 ret = partgen(pushop, bundler)
913 ret = partgen(pushop, bundler)
914 if callable(ret):
914 if callable(ret):
915 replyhandlers.append(ret)
915 replyhandlers.append(ret)
916 # do not push if nothing to push
916 # do not push if nothing to push
917 if bundler.nbparts <= 1:
917 if bundler.nbparts <= 1:
918 return
918 return
919 stream = util.chunkbuffer(bundler.getchunks())
919 stream = util.chunkbuffer(bundler.getchunks())
920 try:
920 try:
921 try:
921 try:
922 reply = pushop.remote.unbundle(
922 reply = pushop.remote.unbundle(
923 stream, ['force'], pushop.remote.url())
923 stream, ['force'], pushop.remote.url())
924 except error.BundleValueError as exc:
924 except error.BundleValueError as exc:
925 raise error.Abort(_('missing support for %s') % exc)
925 raise error.Abort(_('missing support for %s') % exc)
926 try:
926 try:
927 trgetter = None
927 trgetter = None
928 if pushback:
928 if pushback:
929 trgetter = pushop.trmanager.transaction
929 trgetter = pushop.trmanager.transaction
930 op = bundle2.processbundle(pushop.repo, reply, trgetter)
930 op = bundle2.processbundle(pushop.repo, reply, trgetter)
931 except error.BundleValueError as exc:
931 except error.BundleValueError as exc:
932 raise error.Abort(_('missing support for %s') % exc)
932 raise error.Abort(_('missing support for %s') % exc)
933 except bundle2.AbortFromPart as exc:
933 except bundle2.AbortFromPart as exc:
934 pushop.ui.status(_('remote: %s\n') % exc)
934 pushop.ui.status(_('remote: %s\n') % exc)
935 if exc.hint is not None:
935 if exc.hint is not None:
936 pushop.ui.status(_('remote: %s\n') % ('(%s)' % exc.hint))
936 pushop.ui.status(_('remote: %s\n') % ('(%s)' % exc.hint))
937 raise error.Abort(_('push failed on remote'))
937 raise error.Abort(_('push failed on remote'))
938 except error.PushkeyFailed as exc:
938 except error.PushkeyFailed as exc:
939 partid = int(exc.partid)
939 partid = int(exc.partid)
940 if partid not in pushop.pkfailcb:
940 if partid not in pushop.pkfailcb:
941 raise
941 raise
942 pushop.pkfailcb[partid](pushop, exc)
942 pushop.pkfailcb[partid](pushop, exc)
943 for rephand in replyhandlers:
943 for rephand in replyhandlers:
944 rephand(op)
944 rephand(op)
945
945
946 def _pushchangeset(pushop):
946 def _pushchangeset(pushop):
947 """Make the actual push of changeset bundle to remote repo"""
947 """Make the actual push of changeset bundle to remote repo"""
948 if 'changesets' in pushop.stepsdone:
948 if 'changesets' in pushop.stepsdone:
949 return
949 return
950 pushop.stepsdone.add('changesets')
950 pushop.stepsdone.add('changesets')
951 if not _pushcheckoutgoing(pushop):
951 if not _pushcheckoutgoing(pushop):
952 return
952 return
953
953
954 # Should have verified this in push().
954 # Should have verified this in push().
955 assert pushop.remote.capable('unbundle')
955 assert pushop.remote.capable('unbundle')
956
956
957 pushop.repo.prepushoutgoinghooks(pushop)
957 pushop.repo.prepushoutgoinghooks(pushop)
958 outgoing = pushop.outgoing
958 outgoing = pushop.outgoing
959 # TODO: get bundlecaps from remote
959 # TODO: get bundlecaps from remote
960 bundlecaps = None
960 bundlecaps = None
961 # create a changegroup from local
961 # create a changegroup from local
962 if pushop.revs is None and not (outgoing.excluded
962 if pushop.revs is None and not (outgoing.excluded
963 or pushop.repo.changelog.filteredrevs):
963 or pushop.repo.changelog.filteredrevs):
964 # push everything,
964 # push everything,
965 # use the fast path, no race possible on push
965 # use the fast path, no race possible on push
966 bundler = changegroup.cg1packer(pushop.repo, bundlecaps)
966 bundler = changegroup.cg1packer(pushop.repo, bundlecaps)
967 cg = changegroup.getsubset(pushop.repo,
967 cg = changegroup.getsubset(pushop.repo,
968 outgoing,
968 outgoing,
969 bundler,
969 bundler,
970 'push',
970 'push',
971 fastpath=True)
971 fastpath=True)
972 else:
972 else:
973 cg = changegroup.getchangegroup(pushop.repo, 'push', outgoing,
973 cg = changegroup.getchangegroup(pushop.repo, 'push', outgoing,
974 bundlecaps=bundlecaps)
974 bundlecaps=bundlecaps)
975
975
976 # apply changegroup to remote
976 # apply changegroup to remote
977 # local repo finds heads on server, finds out what
977 # local repo finds heads on server, finds out what
978 # revs it must push. once revs transferred, if server
978 # revs it must push. once revs transferred, if server
979 # finds it has different heads (someone else won
979 # finds it has different heads (someone else won
980 # commit/push race), server aborts.
980 # commit/push race), server aborts.
981 if pushop.force:
981 if pushop.force:
982 remoteheads = ['force']
982 remoteheads = ['force']
983 else:
983 else:
984 remoteheads = pushop.remoteheads
984 remoteheads = pushop.remoteheads
985 # ssh: return remote's addchangegroup()
985 # ssh: return remote's addchangegroup()
986 # http: return remote's addchangegroup() or 0 for error
986 # http: return remote's addchangegroup() or 0 for error
987 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
987 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
988 pushop.repo.url())
988 pushop.repo.url())
989
989
990 def _pushsyncphase(pushop):
990 def _pushsyncphase(pushop):
991 """synchronise phase information locally and remotely"""
991 """synchronise phase information locally and remotely"""
992 cheads = pushop.commonheads
992 cheads = pushop.commonheads
993 # even when we don't push, exchanging phase data is useful
993 # even when we don't push, exchanging phase data is useful
994 remotephases = pushop.remote.listkeys('phases')
994 remotephases = pushop.remote.listkeys('phases')
995 if (pushop.ui.configbool('ui', '_usedassubrepo')
995 if (pushop.ui.configbool('ui', '_usedassubrepo')
996 and remotephases # server supports phases
996 and remotephases # server supports phases
997 and pushop.cgresult is None # nothing was pushed
997 and pushop.cgresult is None # nothing was pushed
998 and remotephases.get('publishing', False)):
998 and remotephases.get('publishing', False)):
999 # When:
999 # When:
1000 # - this is a subrepo push
1000 # - this is a subrepo push
1001 # - and remote support phase
1001 # - and remote support phase
1002 # - and no changeset was pushed
1002 # - and no changeset was pushed
1003 # - and remote is publishing
1003 # - and remote is publishing
1004 # We may be in issue 3871 case!
1004 # We may be in issue 3871 case!
1005 # We drop the possible phase synchronisation done by
1005 # We drop the possible phase synchronisation done by
1006 # courtesy to publish changesets possibly locally draft
1006 # courtesy to publish changesets possibly locally draft
1007 # on the remote.
1007 # on the remote.
1008 remotephases = {'publishing': 'True'}
1008 remotephases = {'publishing': 'True'}
1009 if not remotephases: # old server or public only reply from non-publishing
1009 if not remotephases: # old server or public only reply from non-publishing
1010 _localphasemove(pushop, cheads)
1010 _localphasemove(pushop, cheads)
1011 # don't push any phase data as there is nothing to push
1011 # don't push any phase data as there is nothing to push
1012 else:
1012 else:
1013 ana = phases.analyzeremotephases(pushop.repo, cheads,
1013 ana = phases.analyzeremotephases(pushop.repo, cheads,
1014 remotephases)
1014 remotephases)
1015 pheads, droots = ana
1015 pheads, droots = ana
1016 ### Apply remote phase on local
1016 ### Apply remote phase on local
1017 if remotephases.get('publishing', False):
1017 if remotephases.get('publishing', False):
1018 _localphasemove(pushop, cheads)
1018 _localphasemove(pushop, cheads)
1019 else: # publish = False
1019 else: # publish = False
1020 _localphasemove(pushop, pheads)
1020 _localphasemove(pushop, pheads)
1021 _localphasemove(pushop, cheads, phases.draft)
1021 _localphasemove(pushop, cheads, phases.draft)
1022 ### Apply local phase on remote
1022 ### Apply local phase on remote
1023
1023
1024 if pushop.cgresult:
1024 if pushop.cgresult:
1025 if 'phases' in pushop.stepsdone:
1025 if 'phases' in pushop.stepsdone:
1026 # phases already pushed though bundle2
1026 # phases already pushed though bundle2
1027 return
1027 return
1028 outdated = pushop.outdatedphases
1028 outdated = pushop.outdatedphases
1029 else:
1029 else:
1030 outdated = pushop.fallbackoutdatedphases
1030 outdated = pushop.fallbackoutdatedphases
1031
1031
1032 pushop.stepsdone.add('phases')
1032 pushop.stepsdone.add('phases')
1033
1033
1034 # filter heads already turned public by the push
1034 # filter heads already turned public by the push
1035 outdated = [c for c in outdated if c.node() not in pheads]
1035 outdated = [c for c in outdated if c.node() not in pheads]
1036 # fallback to independent pushkey command
1036 # fallback to independent pushkey command
1037 for newremotehead in outdated:
1037 for newremotehead in outdated:
1038 r = pushop.remote.pushkey('phases',
1038 r = pushop.remote.pushkey('phases',
1039 newremotehead.hex(),
1039 newremotehead.hex(),
1040 str(phases.draft),
1040 str(phases.draft),
1041 str(phases.public))
1041 str(phases.public))
1042 if not r:
1042 if not r:
1043 pushop.ui.warn(_('updating %s to public failed!\n')
1043 pushop.ui.warn(_('updating %s to public failed!\n')
1044 % newremotehead)
1044 % newremotehead)
1045
1045
1046 def _localphasemove(pushop, nodes, phase=phases.public):
1046 def _localphasemove(pushop, nodes, phase=phases.public):
1047 """move <nodes> to <phase> in the local source repo"""
1047 """move <nodes> to <phase> in the local source repo"""
1048 if pushop.trmanager:
1048 if pushop.trmanager:
1049 phases.advanceboundary(pushop.repo,
1049 phases.advanceboundary(pushop.repo,
1050 pushop.trmanager.transaction(),
1050 pushop.trmanager.transaction(),
1051 phase,
1051 phase,
1052 nodes)
1052 nodes)
1053 else:
1053 else:
1054 # repo is not locked, do not change any phases!
1054 # repo is not locked, do not change any phases!
1055 # Informs the user that phases should have been moved when
1055 # Informs the user that phases should have been moved when
1056 # applicable.
1056 # applicable.
1057 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
1057 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
1058 phasestr = phases.phasenames[phase]
1058 phasestr = phases.phasenames[phase]
1059 if actualmoves:
1059 if actualmoves:
1060 pushop.ui.status(_('cannot lock source repo, skipping '
1060 pushop.ui.status(_('cannot lock source repo, skipping '
1061 'local %s phase update\n') % phasestr)
1061 'local %s phase update\n') % phasestr)
1062
1062
1063 def _pushobsolete(pushop):
1063 def _pushobsolete(pushop):
1064 """utility function to push obsolete markers to a remote"""
1064 """utility function to push obsolete markers to a remote"""
1065 if 'obsmarkers' in pushop.stepsdone:
1065 if 'obsmarkers' in pushop.stepsdone:
1066 return
1066 return
1067 repo = pushop.repo
1067 repo = pushop.repo
1068 remote = pushop.remote
1068 remote = pushop.remote
1069 pushop.stepsdone.add('obsmarkers')
1069 pushop.stepsdone.add('obsmarkers')
1070 if pushop.outobsmarkers:
1070 if pushop.outobsmarkers:
1071 pushop.ui.debug('try to push obsolete markers to remote\n')
1071 pushop.ui.debug('try to push obsolete markers to remote\n')
1072 rslts = []
1072 rslts = []
1073 remotedata = obsolete._pushkeyescape(sorted(pushop.outobsmarkers))
1073 remotedata = obsolete._pushkeyescape(sorted(pushop.outobsmarkers))
1074 for key in sorted(remotedata, reverse=True):
1074 for key in sorted(remotedata, reverse=True):
1075 # reverse sort to ensure we end with dump0
1075 # reverse sort to ensure we end with dump0
1076 data = remotedata[key]
1076 data = remotedata[key]
1077 rslts.append(remote.pushkey('obsolete', key, '', data))
1077 rslts.append(remote.pushkey('obsolete', key, '', data))
1078 if [r for r in rslts if not r]:
1078 if [r for r in rslts if not r]:
1079 msg = _('failed to push some obsolete markers!\n')
1079 msg = _('failed to push some obsolete markers!\n')
1080 repo.ui.warn(msg)
1080 repo.ui.warn(msg)
1081
1081
1082 def _pushbookmark(pushop):
1082 def _pushbookmark(pushop):
1083 """Update bookmark position on remote"""
1083 """Update bookmark position on remote"""
1084 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
1084 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
1085 return
1085 return
1086 pushop.stepsdone.add('bookmarks')
1086 pushop.stepsdone.add('bookmarks')
1087 ui = pushop.ui
1087 ui = pushop.ui
1088 remote = pushop.remote
1088 remote = pushop.remote
1089
1089
1090 for b, old, new in pushop.outbookmarks:
1090 for b, old, new in pushop.outbookmarks:
1091 action = 'update'
1091 action = 'update'
1092 if not old:
1092 if not old:
1093 action = 'export'
1093 action = 'export'
1094 elif not new:
1094 elif not new:
1095 action = 'delete'
1095 action = 'delete'
1096 if remote.pushkey('bookmarks', b, old, new):
1096 if remote.pushkey('bookmarks', b, old, new):
1097 ui.status(bookmsgmap[action][0] % b)
1097 ui.status(bookmsgmap[action][0] % b)
1098 else:
1098 else:
1099 ui.warn(bookmsgmap[action][1] % b)
1099 ui.warn(bookmsgmap[action][1] % b)
1100 # discovery can have set the value form invalid entry
1100 # discovery can have set the value form invalid entry
1101 if pushop.bkresult is not None:
1101 if pushop.bkresult is not None:
1102 pushop.bkresult = 1
1102 pushop.bkresult = 1
1103
1103
1104 class pulloperation(object):
1104 class pulloperation(object):
1105 """A object that represent a single pull operation
1105 """A object that represent a single pull operation
1106
1106
1107 It purpose is to carry pull related state and very common operation.
1107 It purpose is to carry pull related state and very common operation.
1108
1108
1109 A new should be created at the beginning of each pull and discarded
1109 A new should be created at the beginning of each pull and discarded
1110 afterward.
1110 afterward.
1111 """
1111 """
1112
1112
1113 def __init__(self, repo, remote, heads=None, force=False, bookmarks=(),
1113 def __init__(self, repo, remote, heads=None, force=False, bookmarks=(),
1114 remotebookmarks=None, streamclonerequested=None):
1114 remotebookmarks=None, streamclonerequested=None):
1115 # repo we pull into
1115 # repo we pull into
1116 self.repo = repo
1116 self.repo = repo
1117 # repo we pull from
1117 # repo we pull from
1118 self.remote = remote
1118 self.remote = remote
1119 # revision we try to pull (None is "all")
1119 # revision we try to pull (None is "all")
1120 self.heads = heads
1120 self.heads = heads
1121 # bookmark pulled explicitly
1121 # bookmark pulled explicitly
1122 self.explicitbookmarks = [repo._bookmarks.expandname(bookmark)
1122 self.explicitbookmarks = [repo._bookmarks.expandname(bookmark)
1123 for bookmark in bookmarks]
1123 for bookmark in bookmarks]
1124 # do we force pull?
1124 # do we force pull?
1125 self.force = force
1125 self.force = force
1126 # whether a streaming clone was requested
1126 # whether a streaming clone was requested
1127 self.streamclonerequested = streamclonerequested
1127 self.streamclonerequested = streamclonerequested
1128 # transaction manager
1128 # transaction manager
1129 self.trmanager = None
1129 self.trmanager = None
1130 # set of common changeset between local and remote before pull
1130 # set of common changeset between local and remote before pull
1131 self.common = None
1131 self.common = None
1132 # set of pulled head
1132 # set of pulled head
1133 self.rheads = None
1133 self.rheads = None
1134 # list of missing changeset to fetch remotely
1134 # list of missing changeset to fetch remotely
1135 self.fetch = None
1135 self.fetch = None
1136 # remote bookmarks data
1136 # remote bookmarks data
1137 self.remotebookmarks = remotebookmarks
1137 self.remotebookmarks = remotebookmarks
1138 # result of changegroup pulling (used as return code by pull)
1138 # result of changegroup pulling (used as return code by pull)
1139 self.cgresult = None
1139 self.cgresult = None
1140 # list of step already done
1140 # list of step already done
1141 self.stepsdone = set()
1141 self.stepsdone = set()
1142 # Whether we attempted a clone from pre-generated bundles.
1142 # Whether we attempted a clone from pre-generated bundles.
1143 self.clonebundleattempted = False
1143 self.clonebundleattempted = False
1144
1144
1145 @util.propertycache
1145 @util.propertycache
1146 def pulledsubset(self):
1146 def pulledsubset(self):
1147 """heads of the set of changeset target by the pull"""
1147 """heads of the set of changeset target by the pull"""
1148 # compute target subset
1148 # compute target subset
1149 if self.heads is None:
1149 if self.heads is None:
1150 # We pulled every thing possible
1150 # We pulled every thing possible
1151 # sync on everything common
1151 # sync on everything common
1152 c = set(self.common)
1152 c = set(self.common)
1153 ret = list(self.common)
1153 ret = list(self.common)
1154 for n in self.rheads:
1154 for n in self.rheads:
1155 if n not in c:
1155 if n not in c:
1156 ret.append(n)
1156 ret.append(n)
1157 return ret
1157 return ret
1158 else:
1158 else:
1159 # We pulled a specific subset
1159 # We pulled a specific subset
1160 # sync on this subset
1160 # sync on this subset
1161 return self.heads
1161 return self.heads
1162
1162
1163 @util.propertycache
1163 @util.propertycache
1164 def canusebundle2(self):
1164 def canusebundle2(self):
1165 return not _forcebundle1(self)
1165 return not _forcebundle1(self)
1166
1166
1167 @util.propertycache
1167 @util.propertycache
1168 def remotebundle2caps(self):
1168 def remotebundle2caps(self):
1169 return bundle2.bundle2caps(self.remote)
1169 return bundle2.bundle2caps(self.remote)
1170
1170
1171 def gettransaction(self):
1171 def gettransaction(self):
1172 # deprecated; talk to trmanager directly
1172 # deprecated; talk to trmanager directly
1173 return self.trmanager.transaction()
1173 return self.trmanager.transaction()
1174
1174
1175 class transactionmanager(util.transactional):
1175 class transactionmanager(util.transactional):
1176 """An object to manage the life cycle of a transaction
1176 """An object to manage the life cycle of a transaction
1177
1177
1178 It creates the transaction on demand and calls the appropriate hooks when
1178 It creates the transaction on demand and calls the appropriate hooks when
1179 closing the transaction."""
1179 closing the transaction."""
1180 def __init__(self, repo, source, url):
1180 def __init__(self, repo, source, url):
1181 self.repo = repo
1181 self.repo = repo
1182 self.source = source
1182 self.source = source
1183 self.url = url
1183 self.url = url
1184 self._tr = None
1184 self._tr = None
1185
1185
1186 def transaction(self):
1186 def transaction(self):
1187 """Return an open transaction object, constructing if necessary"""
1187 """Return an open transaction object, constructing if necessary"""
1188 if not self._tr:
1188 if not self._tr:
1189 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
1189 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
1190 self._tr = self.repo.transaction(trname)
1190 self._tr = self.repo.transaction(trname)
1191 self._tr.hookargs['source'] = self.source
1191 self._tr.hookargs['source'] = self.source
1192 self._tr.hookargs['url'] = self.url
1192 self._tr.hookargs['url'] = self.url
1193 return self._tr
1193 return self._tr
1194
1194
1195 def close(self):
1195 def close(self):
1196 """close transaction if created"""
1196 """close transaction if created"""
1197 if self._tr is not None:
1197 if self._tr is not None:
1198 self._tr.close()
1198 self._tr.close()
1199
1199
1200 def release(self):
1200 def release(self):
1201 """release transaction if created"""
1201 """release transaction if created"""
1202 if self._tr is not None:
1202 if self._tr is not None:
1203 self._tr.release()
1203 self._tr.release()
1204
1204
1205 def pull(repo, remote, heads=None, force=False, bookmarks=(), opargs=None,
1205 def pull(repo, remote, heads=None, force=False, bookmarks=(), opargs=None,
1206 streamclonerequested=None):
1206 streamclonerequested=None):
1207 """Fetch repository data from a remote.
1207 """Fetch repository data from a remote.
1208
1208
1209 This is the main function used to retrieve data from a remote repository.
1209 This is the main function used to retrieve data from a remote repository.
1210
1210
1211 ``repo`` is the local repository to clone into.
1211 ``repo`` is the local repository to clone into.
1212 ``remote`` is a peer instance.
1212 ``remote`` is a peer instance.
1213 ``heads`` is an iterable of revisions we want to pull. ``None`` (the
1213 ``heads`` is an iterable of revisions we want to pull. ``None`` (the
1214 default) means to pull everything from the remote.
1214 default) means to pull everything from the remote.
1215 ``bookmarks`` is an iterable of bookmarks requesting to be pulled. By
1215 ``bookmarks`` is an iterable of bookmarks requesting to be pulled. By
1216 default, all remote bookmarks are pulled.
1216 default, all remote bookmarks are pulled.
1217 ``opargs`` are additional keyword arguments to pass to ``pulloperation``
1217 ``opargs`` are additional keyword arguments to pass to ``pulloperation``
1218 initialization.
1218 initialization.
1219 ``streamclonerequested`` is a boolean indicating whether a "streaming
1219 ``streamclonerequested`` is a boolean indicating whether a "streaming
1220 clone" is requested. A "streaming clone" is essentially a raw file copy
1220 clone" is requested. A "streaming clone" is essentially a raw file copy
1221 of revlogs from the server. This only works when the local repository is
1221 of revlogs from the server. This only works when the local repository is
1222 empty. The default value of ``None`` means to respect the server
1222 empty. The default value of ``None`` means to respect the server
1223 configuration for preferring stream clones.
1223 configuration for preferring stream clones.
1224
1224
1225 Returns the ``pulloperation`` created for this pull.
1225 Returns the ``pulloperation`` created for this pull.
1226 """
1226 """
1227 if opargs is None:
1227 if opargs is None:
1228 opargs = {}
1228 opargs = {}
1229 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks,
1229 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks,
1230 streamclonerequested=streamclonerequested, **opargs)
1230 streamclonerequested=streamclonerequested, **opargs)
1231
1231
1232 peerlocal = pullop.remote.local()
1232 peerlocal = pullop.remote.local()
1233 if peerlocal:
1233 if peerlocal:
1234 missing = set(peerlocal.requirements) - pullop.repo.supported
1234 missing = set(peerlocal.requirements) - pullop.repo.supported
1235 if missing:
1235 if missing:
1236 msg = _("required features are not"
1236 msg = _("required features are not"
1237 " supported in the destination:"
1237 " supported in the destination:"
1238 " %s") % (', '.join(sorted(missing)))
1238 " %s") % (', '.join(sorted(missing)))
1239 raise error.Abort(msg)
1239 raise error.Abort(msg)
1240
1240
1241 wlock = lock = None
1241 wlock = lock = None
1242 try:
1242 try:
1243 wlock = pullop.repo.wlock()
1243 wlock = pullop.repo.wlock()
1244 lock = pullop.repo.lock()
1244 lock = pullop.repo.lock()
1245 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
1245 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
1246 streamclone.maybeperformlegacystreamclone(pullop)
1246 streamclone.maybeperformlegacystreamclone(pullop)
1247 # This should ideally be in _pullbundle2(). However, it needs to run
1247 # This should ideally be in _pullbundle2(). However, it needs to run
1248 # before discovery to avoid extra work.
1248 # before discovery to avoid extra work.
1249 _maybeapplyclonebundle(pullop)
1249 _maybeapplyclonebundle(pullop)
1250 _pulldiscovery(pullop)
1250 _pulldiscovery(pullop)
1251 if pullop.canusebundle2:
1251 if pullop.canusebundle2:
1252 _pullbundle2(pullop)
1252 _pullbundle2(pullop)
1253 _pullchangeset(pullop)
1253 _pullchangeset(pullop)
1254 _pullphase(pullop)
1254 _pullphase(pullop)
1255 _pullbookmarks(pullop)
1255 _pullbookmarks(pullop)
1256 _pullobsolete(pullop)
1256 _pullobsolete(pullop)
1257 pullop.trmanager.close()
1257 pullop.trmanager.close()
1258 finally:
1258 finally:
1259 lockmod.release(pullop.trmanager, lock, wlock)
1259 lockmod.release(pullop.trmanager, lock, wlock)
1260
1260
1261 return pullop
1261 return pullop
1262
1262
1263 # list of steps to perform discovery before pull
1263 # list of steps to perform discovery before pull
1264 pulldiscoveryorder = []
1264 pulldiscoveryorder = []
1265
1265
1266 # Mapping between step name and function
1266 # Mapping between step name and function
1267 #
1267 #
1268 # This exists to help extensions wrap steps if necessary
1268 # This exists to help extensions wrap steps if necessary
1269 pulldiscoverymapping = {}
1269 pulldiscoverymapping = {}
1270
1270
1271 def pulldiscovery(stepname):
1271 def pulldiscovery(stepname):
1272 """decorator for function performing discovery before pull
1272 """decorator for function performing discovery before pull
1273
1273
1274 The function is added to the step -> function mapping and appended to the
1274 The function is added to the step -> function mapping and appended to the
1275 list of steps. Beware that decorated function will be added in order (this
1275 list of steps. Beware that decorated function will be added in order (this
1276 may matter).
1276 may matter).
1277
1277
1278 You can only use this decorator for a new step, if you want to wrap a step
1278 You can only use this decorator for a new step, if you want to wrap a step
1279 from an extension, change the pulldiscovery dictionary directly."""
1279 from an extension, change the pulldiscovery dictionary directly."""
1280 def dec(func):
1280 def dec(func):
1281 assert stepname not in pulldiscoverymapping
1281 assert stepname not in pulldiscoverymapping
1282 pulldiscoverymapping[stepname] = func
1282 pulldiscoverymapping[stepname] = func
1283 pulldiscoveryorder.append(stepname)
1283 pulldiscoveryorder.append(stepname)
1284 return func
1284 return func
1285 return dec
1285 return dec
1286
1286
1287 def _pulldiscovery(pullop):
1287 def _pulldiscovery(pullop):
1288 """Run all discovery steps"""
1288 """Run all discovery steps"""
1289 for stepname in pulldiscoveryorder:
1289 for stepname in pulldiscoveryorder:
1290 step = pulldiscoverymapping[stepname]
1290 step = pulldiscoverymapping[stepname]
1291 step(pullop)
1291 step(pullop)
1292
1292
1293 @pulldiscovery('b1:bookmarks')
1293 @pulldiscovery('b1:bookmarks')
1294 def _pullbookmarkbundle1(pullop):
1294 def _pullbookmarkbundle1(pullop):
1295 """fetch bookmark data in bundle1 case
1295 """fetch bookmark data in bundle1 case
1296
1296
1297 If not using bundle2, we have to fetch bookmarks before changeset
1297 If not using bundle2, we have to fetch bookmarks before changeset
1298 discovery to reduce the chance and impact of race conditions."""
1298 discovery to reduce the chance and impact of race conditions."""
1299 if pullop.remotebookmarks is not None:
1299 if pullop.remotebookmarks is not None:
1300 return
1300 return
1301 if pullop.canusebundle2 and 'listkeys' in pullop.remotebundle2caps:
1301 if pullop.canusebundle2 and 'listkeys' in pullop.remotebundle2caps:
1302 # all known bundle2 servers now support listkeys, but lets be nice with
1302 # all known bundle2 servers now support listkeys, but lets be nice with
1303 # new implementation.
1303 # new implementation.
1304 return
1304 return
1305 pullop.remotebookmarks = pullop.remote.listkeys('bookmarks')
1305 pullop.remotebookmarks = pullop.remote.listkeys('bookmarks')
1306
1306
1307
1307
1308 @pulldiscovery('changegroup')
1308 @pulldiscovery('changegroup')
1309 def _pulldiscoverychangegroup(pullop):
1309 def _pulldiscoverychangegroup(pullop):
1310 """discovery phase for the pull
1310 """discovery phase for the pull
1311
1311
1312 Current handle changeset discovery only, will change handle all discovery
1312 Current handle changeset discovery only, will change handle all discovery
1313 at some point."""
1313 at some point."""
1314 tmp = discovery.findcommonincoming(pullop.repo,
1314 tmp = discovery.findcommonincoming(pullop.repo,
1315 pullop.remote,
1315 pullop.remote,
1316 heads=pullop.heads,
1316 heads=pullop.heads,
1317 force=pullop.force)
1317 force=pullop.force)
1318 common, fetch, rheads = tmp
1318 common, fetch, rheads = tmp
1319 nm = pullop.repo.unfiltered().changelog.nodemap
1319 nm = pullop.repo.unfiltered().changelog.nodemap
1320 if fetch and rheads:
1320 if fetch and rheads:
1321 # If a remote heads in filtered locally, lets drop it from the unknown
1321 # If a remote heads in filtered locally, lets drop it from the unknown
1322 # remote heads and put in back in common.
1322 # remote heads and put in back in common.
1323 #
1323 #
1324 # This is a hackish solution to catch most of "common but locally
1324 # This is a hackish solution to catch most of "common but locally
1325 # hidden situation". We do not performs discovery on unfiltered
1325 # hidden situation". We do not performs discovery on unfiltered
1326 # repository because it end up doing a pathological amount of round
1326 # repository because it end up doing a pathological amount of round
1327 # trip for w huge amount of changeset we do not care about.
1327 # trip for w huge amount of changeset we do not care about.
1328 #
1328 #
1329 # If a set of such "common but filtered" changeset exist on the server
1329 # If a set of such "common but filtered" changeset exist on the server
1330 # but are not including a remote heads, we'll not be able to detect it,
1330 # but are not including a remote heads, we'll not be able to detect it,
1331 scommon = set(common)
1331 scommon = set(common)
1332 filteredrheads = []
1332 filteredrheads = []
1333 for n in rheads:
1333 for n in rheads:
1334 if n in nm:
1334 if n in nm:
1335 if n not in scommon:
1335 if n not in scommon:
1336 common.append(n)
1336 common.append(n)
1337 else:
1337 else:
1338 filteredrheads.append(n)
1338 filteredrheads.append(n)
1339 if not filteredrheads:
1339 if not filteredrheads:
1340 fetch = []
1340 fetch = []
1341 rheads = filteredrheads
1341 rheads = filteredrheads
1342 pullop.common = common
1342 pullop.common = common
1343 pullop.fetch = fetch
1343 pullop.fetch = fetch
1344 pullop.rheads = rheads
1344 pullop.rheads = rheads
1345
1345
1346 def _pullbundle2(pullop):
1346 def _pullbundle2(pullop):
1347 """pull data using bundle2
1347 """pull data using bundle2
1348
1348
1349 For now, the only supported data are changegroup."""
1349 For now, the only supported data are changegroup."""
1350 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
1350 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
1351
1351
1352 # At the moment we don't do stream clones over bundle2. If that is
1352 # At the moment we don't do stream clones over bundle2. If that is
1353 # implemented then here's where the check for that will go.
1353 # implemented then here's where the check for that will go.
1354 streaming = False
1354 streaming = False
1355
1355
1356 # pulling changegroup
1356 # pulling changegroup
1357 pullop.stepsdone.add('changegroup')
1357 pullop.stepsdone.add('changegroup')
1358
1358
1359 kwargs['common'] = pullop.common
1359 kwargs['common'] = pullop.common
1360 kwargs['heads'] = pullop.heads or pullop.rheads
1360 kwargs['heads'] = pullop.heads or pullop.rheads
1361 kwargs['cg'] = pullop.fetch
1361 kwargs['cg'] = pullop.fetch
1362 if 'listkeys' in pullop.remotebundle2caps:
1362 if 'listkeys' in pullop.remotebundle2caps:
1363 kwargs['listkeys'] = ['phases']
1363 kwargs['listkeys'] = ['phases']
1364 if pullop.remotebookmarks is None:
1364 if pullop.remotebookmarks is None:
1365 # make sure to always includes bookmark data when migrating
1365 # make sure to always includes bookmark data when migrating
1366 # `hg incoming --bundle` to using this function.
1366 # `hg incoming --bundle` to using this function.
1367 kwargs['listkeys'].append('bookmarks')
1367 kwargs['listkeys'].append('bookmarks')
1368
1368
1369 # If this is a full pull / clone and the server supports the clone bundles
1369 # If this is a full pull / clone and the server supports the clone bundles
1370 # feature, tell the server whether we attempted a clone bundle. The
1370 # feature, tell the server whether we attempted a clone bundle. The
1371 # presence of this flag indicates the client supports clone bundles. This
1371 # presence of this flag indicates the client supports clone bundles. This
1372 # will enable the server to treat clients that support clone bundles
1372 # will enable the server to treat clients that support clone bundles
1373 # differently from those that don't.
1373 # differently from those that don't.
1374 if (pullop.remote.capable('clonebundles')
1374 if (pullop.remote.capable('clonebundles')
1375 and pullop.heads is None and list(pullop.common) == [nullid]):
1375 and pullop.heads is None and list(pullop.common) == [nullid]):
1376 kwargs['cbattempted'] = pullop.clonebundleattempted
1376 kwargs['cbattempted'] = pullop.clonebundleattempted
1377
1377
1378 if streaming:
1378 if streaming:
1379 pullop.repo.ui.status(_('streaming all changes\n'))
1379 pullop.repo.ui.status(_('streaming all changes\n'))
1380 elif not pullop.fetch:
1380 elif not pullop.fetch:
1381 pullop.repo.ui.status(_("no changes found\n"))
1381 pullop.repo.ui.status(_("no changes found\n"))
1382 pullop.cgresult = 0
1382 pullop.cgresult = 0
1383 else:
1383 else:
1384 if pullop.heads is None and list(pullop.common) == [nullid]:
1384 if pullop.heads is None and list(pullop.common) == [nullid]:
1385 pullop.repo.ui.status(_("requesting all changes\n"))
1385 pullop.repo.ui.status(_("requesting all changes\n"))
1386 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1386 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1387 remoteversions = bundle2.obsmarkersversion(pullop.remotebundle2caps)
1387 remoteversions = bundle2.obsmarkersversion(pullop.remotebundle2caps)
1388 if obsolete.commonversion(remoteversions) is not None:
1388 if obsolete.commonversion(remoteversions) is not None:
1389 kwargs['obsmarkers'] = True
1389 kwargs['obsmarkers'] = True
1390 pullop.stepsdone.add('obsmarkers')
1390 pullop.stepsdone.add('obsmarkers')
1391 _pullbundle2extraprepare(pullop, kwargs)
1391 _pullbundle2extraprepare(pullop, kwargs)
1392 bundle = pullop.remote.getbundle('pull', **pycompat.strkwargs(kwargs))
1392 bundle = pullop.remote.getbundle('pull', **pycompat.strkwargs(kwargs))
1393 try:
1393 try:
1394 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
1394 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
1395 except bundle2.AbortFromPart as exc:
1395 except bundle2.AbortFromPart as exc:
1396 pullop.repo.ui.status(_('remote: abort: %s\n') % exc)
1396 pullop.repo.ui.status(_('remote: abort: %s\n') % exc)
1397 raise error.Abort(_('pull failed on remote'), hint=exc.hint)
1397 raise error.Abort(_('pull failed on remote'), hint=exc.hint)
1398 except error.BundleValueError as exc:
1398 except error.BundleValueError as exc:
1399 raise error.Abort(_('missing support for %s') % exc)
1399 raise error.Abort(_('missing support for %s') % exc)
1400
1400
1401 if pullop.fetch:
1401 if pullop.fetch:
1402 pullop.cgresult = bundle2.combinechangegroupresults(op)
1402 pullop.cgresult = bundle2.combinechangegroupresults(op)
1403
1403
1404 # If the bundle had a phase-heads part, then phase exchange is already done
1405 if op.records['phase-heads']:
1406 pullop.stepsdone.add('phases')
1407
1404 # processing phases change
1408 # processing phases change
1405 for namespace, value in op.records['listkeys']:
1409 for namespace, value in op.records['listkeys']:
1406 if namespace == 'phases':
1410 if namespace == 'phases':
1407 _pullapplyphases(pullop, value)
1411 _pullapplyphases(pullop, value)
1408
1412
1409 # processing bookmark update
1413 # processing bookmark update
1410 for namespace, value in op.records['listkeys']:
1414 for namespace, value in op.records['listkeys']:
1411 if namespace == 'bookmarks':
1415 if namespace == 'bookmarks':
1412 pullop.remotebookmarks = value
1416 pullop.remotebookmarks = value
1413
1417
1414 # bookmark data were either already there or pulled in the bundle
1418 # bookmark data were either already there or pulled in the bundle
1415 if pullop.remotebookmarks is not None:
1419 if pullop.remotebookmarks is not None:
1416 _pullbookmarks(pullop)
1420 _pullbookmarks(pullop)
1417
1421
1418 def _pullbundle2extraprepare(pullop, kwargs):
1422 def _pullbundle2extraprepare(pullop, kwargs):
1419 """hook function so that extensions can extend the getbundle call"""
1423 """hook function so that extensions can extend the getbundle call"""
1420 pass
1424 pass
1421
1425
1422 def _pullchangeset(pullop):
1426 def _pullchangeset(pullop):
1423 """pull changeset from unbundle into the local repo"""
1427 """pull changeset from unbundle into the local repo"""
1424 # We delay the open of the transaction as late as possible so we
1428 # We delay the open of the transaction as late as possible so we
1425 # don't open transaction for nothing or you break future useful
1429 # don't open transaction for nothing or you break future useful
1426 # rollback call
1430 # rollback call
1427 if 'changegroup' in pullop.stepsdone:
1431 if 'changegroup' in pullop.stepsdone:
1428 return
1432 return
1429 pullop.stepsdone.add('changegroup')
1433 pullop.stepsdone.add('changegroup')
1430 if not pullop.fetch:
1434 if not pullop.fetch:
1431 pullop.repo.ui.status(_("no changes found\n"))
1435 pullop.repo.ui.status(_("no changes found\n"))
1432 pullop.cgresult = 0
1436 pullop.cgresult = 0
1433 return
1437 return
1434 tr = pullop.gettransaction()
1438 tr = pullop.gettransaction()
1435 if pullop.heads is None and list(pullop.common) == [nullid]:
1439 if pullop.heads is None and list(pullop.common) == [nullid]:
1436 pullop.repo.ui.status(_("requesting all changes\n"))
1440 pullop.repo.ui.status(_("requesting all changes\n"))
1437 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1441 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1438 # issue1320, avoid a race if remote changed after discovery
1442 # issue1320, avoid a race if remote changed after discovery
1439 pullop.heads = pullop.rheads
1443 pullop.heads = pullop.rheads
1440
1444
1441 if pullop.remote.capable('getbundle'):
1445 if pullop.remote.capable('getbundle'):
1442 # TODO: get bundlecaps from remote
1446 # TODO: get bundlecaps from remote
1443 cg = pullop.remote.getbundle('pull', common=pullop.common,
1447 cg = pullop.remote.getbundle('pull', common=pullop.common,
1444 heads=pullop.heads or pullop.rheads)
1448 heads=pullop.heads or pullop.rheads)
1445 elif pullop.heads is None:
1449 elif pullop.heads is None:
1446 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1450 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1447 elif not pullop.remote.capable('changegroupsubset'):
1451 elif not pullop.remote.capable('changegroupsubset'):
1448 raise error.Abort(_("partial pull cannot be done because "
1452 raise error.Abort(_("partial pull cannot be done because "
1449 "other repository doesn't support "
1453 "other repository doesn't support "
1450 "changegroupsubset."))
1454 "changegroupsubset."))
1451 else:
1455 else:
1452 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1456 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1453 bundleop = bundle2.applybundle(pullop.repo, cg, tr, 'pull',
1457 bundleop = bundle2.applybundle(pullop.repo, cg, tr, 'pull',
1454 pullop.remote.url())
1458 pullop.remote.url())
1455 pullop.cgresult = bundle2.combinechangegroupresults(bundleop)
1459 pullop.cgresult = bundle2.combinechangegroupresults(bundleop)
1456
1460
1457 def _pullphase(pullop):
1461 def _pullphase(pullop):
1458 # Get remote phases data from remote
1462 # Get remote phases data from remote
1459 if 'phases' in pullop.stepsdone:
1463 if 'phases' in pullop.stepsdone:
1460 return
1464 return
1461 remotephases = pullop.remote.listkeys('phases')
1465 remotephases = pullop.remote.listkeys('phases')
1462 _pullapplyphases(pullop, remotephases)
1466 _pullapplyphases(pullop, remotephases)
1463
1467
1464 def _pullapplyphases(pullop, remotephases):
1468 def _pullapplyphases(pullop, remotephases):
1465 """apply phase movement from observed remote state"""
1469 """apply phase movement from observed remote state"""
1466 if 'phases' in pullop.stepsdone:
1470 if 'phases' in pullop.stepsdone:
1467 return
1471 return
1468 pullop.stepsdone.add('phases')
1472 pullop.stepsdone.add('phases')
1469 publishing = bool(remotephases.get('publishing', False))
1473 publishing = bool(remotephases.get('publishing', False))
1470 if remotephases and not publishing:
1474 if remotephases and not publishing:
1471 # remote is new and non-publishing
1475 # remote is new and non-publishing
1472 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1476 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1473 pullop.pulledsubset,
1477 pullop.pulledsubset,
1474 remotephases)
1478 remotephases)
1475 dheads = pullop.pulledsubset
1479 dheads = pullop.pulledsubset
1476 else:
1480 else:
1477 # Remote is old or publishing all common changesets
1481 # Remote is old or publishing all common changesets
1478 # should be seen as public
1482 # should be seen as public
1479 pheads = pullop.pulledsubset
1483 pheads = pullop.pulledsubset
1480 dheads = []
1484 dheads = []
1481 unfi = pullop.repo.unfiltered()
1485 unfi = pullop.repo.unfiltered()
1482 phase = unfi._phasecache.phase
1486 phase = unfi._phasecache.phase
1483 rev = unfi.changelog.nodemap.get
1487 rev = unfi.changelog.nodemap.get
1484 public = phases.public
1488 public = phases.public
1485 draft = phases.draft
1489 draft = phases.draft
1486
1490
1487 # exclude changesets already public locally and update the others
1491 # exclude changesets already public locally and update the others
1488 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1492 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1489 if pheads:
1493 if pheads:
1490 tr = pullop.gettransaction()
1494 tr = pullop.gettransaction()
1491 phases.advanceboundary(pullop.repo, tr, public, pheads)
1495 phases.advanceboundary(pullop.repo, tr, public, pheads)
1492
1496
1493 # exclude changesets already draft locally and update the others
1497 # exclude changesets already draft locally and update the others
1494 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1498 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1495 if dheads:
1499 if dheads:
1496 tr = pullop.gettransaction()
1500 tr = pullop.gettransaction()
1497 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1501 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1498
1502
1499 def _pullbookmarks(pullop):
1503 def _pullbookmarks(pullop):
1500 """process the remote bookmark information to update the local one"""
1504 """process the remote bookmark information to update the local one"""
1501 if 'bookmarks' in pullop.stepsdone:
1505 if 'bookmarks' in pullop.stepsdone:
1502 return
1506 return
1503 pullop.stepsdone.add('bookmarks')
1507 pullop.stepsdone.add('bookmarks')
1504 repo = pullop.repo
1508 repo = pullop.repo
1505 remotebookmarks = pullop.remotebookmarks
1509 remotebookmarks = pullop.remotebookmarks
1506 remotebookmarks = bookmod.unhexlifybookmarks(remotebookmarks)
1510 remotebookmarks = bookmod.unhexlifybookmarks(remotebookmarks)
1507 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1511 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1508 pullop.remote.url(),
1512 pullop.remote.url(),
1509 pullop.gettransaction,
1513 pullop.gettransaction,
1510 explicit=pullop.explicitbookmarks)
1514 explicit=pullop.explicitbookmarks)
1511
1515
1512 def _pullobsolete(pullop):
1516 def _pullobsolete(pullop):
1513 """utility function to pull obsolete markers from a remote
1517 """utility function to pull obsolete markers from a remote
1514
1518
1515 The `gettransaction` is function that return the pull transaction, creating
1519 The `gettransaction` is function that return the pull transaction, creating
1516 one if necessary. We return the transaction to inform the calling code that
1520 one if necessary. We return the transaction to inform the calling code that
1517 a new transaction have been created (when applicable).
1521 a new transaction have been created (when applicable).
1518
1522
1519 Exists mostly to allow overriding for experimentation purpose"""
1523 Exists mostly to allow overriding for experimentation purpose"""
1520 if 'obsmarkers' in pullop.stepsdone:
1524 if 'obsmarkers' in pullop.stepsdone:
1521 return
1525 return
1522 pullop.stepsdone.add('obsmarkers')
1526 pullop.stepsdone.add('obsmarkers')
1523 tr = None
1527 tr = None
1524 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1528 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1525 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1529 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1526 remoteobs = pullop.remote.listkeys('obsolete')
1530 remoteobs = pullop.remote.listkeys('obsolete')
1527 if 'dump0' in remoteobs:
1531 if 'dump0' in remoteobs:
1528 tr = pullop.gettransaction()
1532 tr = pullop.gettransaction()
1529 markers = []
1533 markers = []
1530 for key in sorted(remoteobs, reverse=True):
1534 for key in sorted(remoteobs, reverse=True):
1531 if key.startswith('dump'):
1535 if key.startswith('dump'):
1532 data = util.b85decode(remoteobs[key])
1536 data = util.b85decode(remoteobs[key])
1533 version, newmarks = obsolete._readmarkers(data)
1537 version, newmarks = obsolete._readmarkers(data)
1534 markers += newmarks
1538 markers += newmarks
1535 if markers:
1539 if markers:
1536 pullop.repo.obsstore.add(tr, markers)
1540 pullop.repo.obsstore.add(tr, markers)
1537 pullop.repo.invalidatevolatilesets()
1541 pullop.repo.invalidatevolatilesets()
1538 return tr
1542 return tr
1539
1543
1540 def caps20to10(repo):
1544 def caps20to10(repo):
1541 """return a set with appropriate options to use bundle20 during getbundle"""
1545 """return a set with appropriate options to use bundle20 during getbundle"""
1542 caps = {'HG20'}
1546 caps = {'HG20'}
1543 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1547 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1544 caps.add('bundle2=' + urlreq.quote(capsblob))
1548 caps.add('bundle2=' + urlreq.quote(capsblob))
1545 return caps
1549 return caps
1546
1550
1547 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1551 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1548 getbundle2partsorder = []
1552 getbundle2partsorder = []
1549
1553
1550 # Mapping between step name and function
1554 # Mapping between step name and function
1551 #
1555 #
1552 # This exists to help extensions wrap steps if necessary
1556 # This exists to help extensions wrap steps if necessary
1553 getbundle2partsmapping = {}
1557 getbundle2partsmapping = {}
1554
1558
1555 def getbundle2partsgenerator(stepname, idx=None):
1559 def getbundle2partsgenerator(stepname, idx=None):
1556 """decorator for function generating bundle2 part for getbundle
1560 """decorator for function generating bundle2 part for getbundle
1557
1561
1558 The function is added to the step -> function mapping and appended to the
1562 The function is added to the step -> function mapping and appended to the
1559 list of steps. Beware that decorated functions will be added in order
1563 list of steps. Beware that decorated functions will be added in order
1560 (this may matter).
1564 (this may matter).
1561
1565
1562 You can only use this decorator for new steps, if you want to wrap a step
1566 You can only use this decorator for new steps, if you want to wrap a step
1563 from an extension, attack the getbundle2partsmapping dictionary directly."""
1567 from an extension, attack the getbundle2partsmapping dictionary directly."""
1564 def dec(func):
1568 def dec(func):
1565 assert stepname not in getbundle2partsmapping
1569 assert stepname not in getbundle2partsmapping
1566 getbundle2partsmapping[stepname] = func
1570 getbundle2partsmapping[stepname] = func
1567 if idx is None:
1571 if idx is None:
1568 getbundle2partsorder.append(stepname)
1572 getbundle2partsorder.append(stepname)
1569 else:
1573 else:
1570 getbundle2partsorder.insert(idx, stepname)
1574 getbundle2partsorder.insert(idx, stepname)
1571 return func
1575 return func
1572 return dec
1576 return dec
1573
1577
1574 def bundle2requested(bundlecaps):
1578 def bundle2requested(bundlecaps):
1575 if bundlecaps is not None:
1579 if bundlecaps is not None:
1576 return any(cap.startswith('HG2') for cap in bundlecaps)
1580 return any(cap.startswith('HG2') for cap in bundlecaps)
1577 return False
1581 return False
1578
1582
1579 def getbundlechunks(repo, source, heads=None, common=None, bundlecaps=None,
1583 def getbundlechunks(repo, source, heads=None, common=None, bundlecaps=None,
1580 **kwargs):
1584 **kwargs):
1581 """Return chunks constituting a bundle's raw data.
1585 """Return chunks constituting a bundle's raw data.
1582
1586
1583 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
1587 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
1584 passed.
1588 passed.
1585
1589
1586 Returns an iterator over raw chunks (of varying sizes).
1590 Returns an iterator over raw chunks (of varying sizes).
1587 """
1591 """
1588 kwargs = pycompat.byteskwargs(kwargs)
1592 kwargs = pycompat.byteskwargs(kwargs)
1589 usebundle2 = bundle2requested(bundlecaps)
1593 usebundle2 = bundle2requested(bundlecaps)
1590 # bundle10 case
1594 # bundle10 case
1591 if not usebundle2:
1595 if not usebundle2:
1592 if bundlecaps and not kwargs.get('cg', True):
1596 if bundlecaps and not kwargs.get('cg', True):
1593 raise ValueError(_('request for bundle10 must include changegroup'))
1597 raise ValueError(_('request for bundle10 must include changegroup'))
1594
1598
1595 if kwargs:
1599 if kwargs:
1596 raise ValueError(_('unsupported getbundle arguments: %s')
1600 raise ValueError(_('unsupported getbundle arguments: %s')
1597 % ', '.join(sorted(kwargs.keys())))
1601 % ', '.join(sorted(kwargs.keys())))
1598 outgoing = _computeoutgoing(repo, heads, common)
1602 outgoing = _computeoutgoing(repo, heads, common)
1599 bundler = changegroup.getbundler('01', repo, bundlecaps)
1603 bundler = changegroup.getbundler('01', repo, bundlecaps)
1600 return changegroup.getsubsetraw(repo, outgoing, bundler, source)
1604 return changegroup.getsubsetraw(repo, outgoing, bundler, source)
1601
1605
1602 # bundle20 case
1606 # bundle20 case
1603 b2caps = {}
1607 b2caps = {}
1604 for bcaps in bundlecaps:
1608 for bcaps in bundlecaps:
1605 if bcaps.startswith('bundle2='):
1609 if bcaps.startswith('bundle2='):
1606 blob = urlreq.unquote(bcaps[len('bundle2='):])
1610 blob = urlreq.unquote(bcaps[len('bundle2='):])
1607 b2caps.update(bundle2.decodecaps(blob))
1611 b2caps.update(bundle2.decodecaps(blob))
1608 bundler = bundle2.bundle20(repo.ui, b2caps)
1612 bundler = bundle2.bundle20(repo.ui, b2caps)
1609
1613
1610 kwargs['heads'] = heads
1614 kwargs['heads'] = heads
1611 kwargs['common'] = common
1615 kwargs['common'] = common
1612
1616
1613 for name in getbundle2partsorder:
1617 for name in getbundle2partsorder:
1614 func = getbundle2partsmapping[name]
1618 func = getbundle2partsmapping[name]
1615 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1619 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1616 **pycompat.strkwargs(kwargs))
1620 **pycompat.strkwargs(kwargs))
1617
1621
1618 return bundler.getchunks()
1622 return bundler.getchunks()
1619
1623
1620 @getbundle2partsgenerator('changegroup')
1624 @getbundle2partsgenerator('changegroup')
1621 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1625 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1622 b2caps=None, heads=None, common=None, **kwargs):
1626 b2caps=None, heads=None, common=None, **kwargs):
1623 """add a changegroup part to the requested bundle"""
1627 """add a changegroup part to the requested bundle"""
1624 cg = None
1628 cg = None
1625 if kwargs.get('cg', True):
1629 if kwargs.get('cg', True):
1626 # build changegroup bundle here.
1630 # build changegroup bundle here.
1627 version = '01'
1631 version = '01'
1628 cgversions = b2caps.get('changegroup')
1632 cgversions = b2caps.get('changegroup')
1629 if cgversions: # 3.1 and 3.2 ship with an empty value
1633 if cgversions: # 3.1 and 3.2 ship with an empty value
1630 cgversions = [v for v in cgversions
1634 cgversions = [v for v in cgversions
1631 if v in changegroup.supportedoutgoingversions(repo)]
1635 if v in changegroup.supportedoutgoingversions(repo)]
1632 if not cgversions:
1636 if not cgversions:
1633 raise ValueError(_('no common changegroup version'))
1637 raise ValueError(_('no common changegroup version'))
1634 version = max(cgversions)
1638 version = max(cgversions)
1635 outgoing = _computeoutgoing(repo, heads, common)
1639 outgoing = _computeoutgoing(repo, heads, common)
1636 cg = changegroup.getlocalchangegroupraw(repo, source, outgoing,
1640 cg = changegroup.getlocalchangegroupraw(repo, source, outgoing,
1637 bundlecaps=bundlecaps,
1641 bundlecaps=bundlecaps,
1638 version=version)
1642 version=version)
1639
1643
1640 if cg:
1644 if cg:
1641 part = bundler.newpart('changegroup', data=cg)
1645 part = bundler.newpart('changegroup', data=cg)
1642 if cgversions:
1646 if cgversions:
1643 part.addparam('version', version)
1647 part.addparam('version', version)
1644 part.addparam('nbchanges', str(len(outgoing.missing)), mandatory=False)
1648 part.addparam('nbchanges', str(len(outgoing.missing)), mandatory=False)
1645 if 'treemanifest' in repo.requirements:
1649 if 'treemanifest' in repo.requirements:
1646 part.addparam('treemanifest', '1')
1650 part.addparam('treemanifest', '1')
1647
1651
1648 @getbundle2partsgenerator('listkeys')
1652 @getbundle2partsgenerator('listkeys')
1649 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1653 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1650 b2caps=None, **kwargs):
1654 b2caps=None, **kwargs):
1651 """add parts containing listkeys namespaces to the requested bundle"""
1655 """add parts containing listkeys namespaces to the requested bundle"""
1652 listkeys = kwargs.get('listkeys', ())
1656 listkeys = kwargs.get('listkeys', ())
1653 for namespace in listkeys:
1657 for namespace in listkeys:
1654 part = bundler.newpart('listkeys')
1658 part = bundler.newpart('listkeys')
1655 part.addparam('namespace', namespace)
1659 part.addparam('namespace', namespace)
1656 keys = repo.listkeys(namespace).items()
1660 keys = repo.listkeys(namespace).items()
1657 part.data = pushkey.encodekeys(keys)
1661 part.data = pushkey.encodekeys(keys)
1658
1662
1659 @getbundle2partsgenerator('obsmarkers')
1663 @getbundle2partsgenerator('obsmarkers')
1660 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1664 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1661 b2caps=None, heads=None, **kwargs):
1665 b2caps=None, heads=None, **kwargs):
1662 """add an obsolescence markers part to the requested bundle"""
1666 """add an obsolescence markers part to the requested bundle"""
1663 if kwargs.get('obsmarkers', False):
1667 if kwargs.get('obsmarkers', False):
1664 if heads is None:
1668 if heads is None:
1665 heads = repo.heads()
1669 heads = repo.heads()
1666 subset = [c.node() for c in repo.set('::%ln', heads)]
1670 subset = [c.node() for c in repo.set('::%ln', heads)]
1667 markers = repo.obsstore.relevantmarkers(subset)
1671 markers = repo.obsstore.relevantmarkers(subset)
1668 markers = sorted(markers)
1672 markers = sorted(markers)
1669 bundle2.buildobsmarkerspart(bundler, markers)
1673 bundle2.buildobsmarkerspart(bundler, markers)
1670
1674
1671 @getbundle2partsgenerator('hgtagsfnodes')
1675 @getbundle2partsgenerator('hgtagsfnodes')
1672 def _getbundletagsfnodes(bundler, repo, source, bundlecaps=None,
1676 def _getbundletagsfnodes(bundler, repo, source, bundlecaps=None,
1673 b2caps=None, heads=None, common=None,
1677 b2caps=None, heads=None, common=None,
1674 **kwargs):
1678 **kwargs):
1675 """Transfer the .hgtags filenodes mapping.
1679 """Transfer the .hgtags filenodes mapping.
1676
1680
1677 Only values for heads in this bundle will be transferred.
1681 Only values for heads in this bundle will be transferred.
1678
1682
1679 The part data consists of pairs of 20 byte changeset node and .hgtags
1683 The part data consists of pairs of 20 byte changeset node and .hgtags
1680 filenodes raw values.
1684 filenodes raw values.
1681 """
1685 """
1682 # Don't send unless:
1686 # Don't send unless:
1683 # - changeset are being exchanged,
1687 # - changeset are being exchanged,
1684 # - the client supports it.
1688 # - the client supports it.
1685 if not (kwargs.get('cg', True) and 'hgtagsfnodes' in b2caps):
1689 if not (kwargs.get('cg', True) and 'hgtagsfnodes' in b2caps):
1686 return
1690 return
1687
1691
1688 outgoing = _computeoutgoing(repo, heads, common)
1692 outgoing = _computeoutgoing(repo, heads, common)
1689 bundle2.addparttagsfnodescache(repo, bundler, outgoing)
1693 bundle2.addparttagsfnodescache(repo, bundler, outgoing)
1690
1694
1691 def _getbookmarks(repo, **kwargs):
1695 def _getbookmarks(repo, **kwargs):
1692 """Returns bookmark to node mapping.
1696 """Returns bookmark to node mapping.
1693
1697
1694 This function is primarily used to generate `bookmarks` bundle2 part.
1698 This function is primarily used to generate `bookmarks` bundle2 part.
1695 It is a separate function in order to make it easy to wrap it
1699 It is a separate function in order to make it easy to wrap it
1696 in extensions. Passing `kwargs` to the function makes it easy to
1700 in extensions. Passing `kwargs` to the function makes it easy to
1697 add new parameters in extensions.
1701 add new parameters in extensions.
1698 """
1702 """
1699
1703
1700 return dict(bookmod.listbinbookmarks(repo))
1704 return dict(bookmod.listbinbookmarks(repo))
1701
1705
1702 def check_heads(repo, their_heads, context):
1706 def check_heads(repo, their_heads, context):
1703 """check if the heads of a repo have been modified
1707 """check if the heads of a repo have been modified
1704
1708
1705 Used by peer for unbundling.
1709 Used by peer for unbundling.
1706 """
1710 """
1707 heads = repo.heads()
1711 heads = repo.heads()
1708 heads_hash = hashlib.sha1(''.join(sorted(heads))).digest()
1712 heads_hash = hashlib.sha1(''.join(sorted(heads))).digest()
1709 if not (their_heads == ['force'] or their_heads == heads or
1713 if not (their_heads == ['force'] or their_heads == heads or
1710 their_heads == ['hashed', heads_hash]):
1714 their_heads == ['hashed', heads_hash]):
1711 # someone else committed/pushed/unbundled while we
1715 # someone else committed/pushed/unbundled while we
1712 # were transferring data
1716 # were transferring data
1713 raise error.PushRaced('repository changed while %s - '
1717 raise error.PushRaced('repository changed while %s - '
1714 'please try again' % context)
1718 'please try again' % context)
1715
1719
1716 def unbundle(repo, cg, heads, source, url):
1720 def unbundle(repo, cg, heads, source, url):
1717 """Apply a bundle to a repo.
1721 """Apply a bundle to a repo.
1718
1722
1719 this function makes sure the repo is locked during the application and have
1723 this function makes sure the repo is locked during the application and have
1720 mechanism to check that no push race occurred between the creation of the
1724 mechanism to check that no push race occurred between the creation of the
1721 bundle and its application.
1725 bundle and its application.
1722
1726
1723 If the push was raced as PushRaced exception is raised."""
1727 If the push was raced as PushRaced exception is raised."""
1724 r = 0
1728 r = 0
1725 # need a transaction when processing a bundle2 stream
1729 # need a transaction when processing a bundle2 stream
1726 # [wlock, lock, tr] - needs to be an array so nested functions can modify it
1730 # [wlock, lock, tr] - needs to be an array so nested functions can modify it
1727 lockandtr = [None, None, None]
1731 lockandtr = [None, None, None]
1728 recordout = None
1732 recordout = None
1729 # quick fix for output mismatch with bundle2 in 3.4
1733 # quick fix for output mismatch with bundle2 in 3.4
1730 captureoutput = repo.ui.configbool('experimental', 'bundle2-output-capture')
1734 captureoutput = repo.ui.configbool('experimental', 'bundle2-output-capture')
1731 if url.startswith('remote:http:') or url.startswith('remote:https:'):
1735 if url.startswith('remote:http:') or url.startswith('remote:https:'):
1732 captureoutput = True
1736 captureoutput = True
1733 try:
1737 try:
1734 # note: outside bundle1, 'heads' is expected to be empty and this
1738 # note: outside bundle1, 'heads' is expected to be empty and this
1735 # 'check_heads' call wil be a no-op
1739 # 'check_heads' call wil be a no-op
1736 check_heads(repo, heads, 'uploading changes')
1740 check_heads(repo, heads, 'uploading changes')
1737 # push can proceed
1741 # push can proceed
1738 if not isinstance(cg, bundle2.unbundle20):
1742 if not isinstance(cg, bundle2.unbundle20):
1739 # legacy case: bundle1 (changegroup 01)
1743 # legacy case: bundle1 (changegroup 01)
1740 txnname = "\n".join([source, util.hidepassword(url)])
1744 txnname = "\n".join([source, util.hidepassword(url)])
1741 with repo.lock(), repo.transaction(txnname) as tr:
1745 with repo.lock(), repo.transaction(txnname) as tr:
1742 op = bundle2.applybundle(repo, cg, tr, source, url)
1746 op = bundle2.applybundle(repo, cg, tr, source, url)
1743 r = bundle2.combinechangegroupresults(op)
1747 r = bundle2.combinechangegroupresults(op)
1744 else:
1748 else:
1745 r = None
1749 r = None
1746 try:
1750 try:
1747 def gettransaction():
1751 def gettransaction():
1748 if not lockandtr[2]:
1752 if not lockandtr[2]:
1749 lockandtr[0] = repo.wlock()
1753 lockandtr[0] = repo.wlock()
1750 lockandtr[1] = repo.lock()
1754 lockandtr[1] = repo.lock()
1751 lockandtr[2] = repo.transaction(source)
1755 lockandtr[2] = repo.transaction(source)
1752 lockandtr[2].hookargs['source'] = source
1756 lockandtr[2].hookargs['source'] = source
1753 lockandtr[2].hookargs['url'] = url
1757 lockandtr[2].hookargs['url'] = url
1754 lockandtr[2].hookargs['bundle2'] = '1'
1758 lockandtr[2].hookargs['bundle2'] = '1'
1755 return lockandtr[2]
1759 return lockandtr[2]
1756
1760
1757 # Do greedy locking by default until we're satisfied with lazy
1761 # Do greedy locking by default until we're satisfied with lazy
1758 # locking.
1762 # locking.
1759 if not repo.ui.configbool('experimental', 'bundle2lazylocking'):
1763 if not repo.ui.configbool('experimental', 'bundle2lazylocking'):
1760 gettransaction()
1764 gettransaction()
1761
1765
1762 op = bundle2.bundleoperation(repo, gettransaction,
1766 op = bundle2.bundleoperation(repo, gettransaction,
1763 captureoutput=captureoutput)
1767 captureoutput=captureoutput)
1764 try:
1768 try:
1765 op = bundle2.processbundle(repo, cg, op=op)
1769 op = bundle2.processbundle(repo, cg, op=op)
1766 finally:
1770 finally:
1767 r = op.reply
1771 r = op.reply
1768 if captureoutput and r is not None:
1772 if captureoutput and r is not None:
1769 repo.ui.pushbuffer(error=True, subproc=True)
1773 repo.ui.pushbuffer(error=True, subproc=True)
1770 def recordout(output):
1774 def recordout(output):
1771 r.newpart('output', data=output, mandatory=False)
1775 r.newpart('output', data=output, mandatory=False)
1772 if lockandtr[2] is not None:
1776 if lockandtr[2] is not None:
1773 lockandtr[2].close()
1777 lockandtr[2].close()
1774 except BaseException as exc:
1778 except BaseException as exc:
1775 exc.duringunbundle2 = True
1779 exc.duringunbundle2 = True
1776 if captureoutput and r is not None:
1780 if captureoutput and r is not None:
1777 parts = exc._bundle2salvagedoutput = r.salvageoutput()
1781 parts = exc._bundle2salvagedoutput = r.salvageoutput()
1778 def recordout(output):
1782 def recordout(output):
1779 part = bundle2.bundlepart('output', data=output,
1783 part = bundle2.bundlepart('output', data=output,
1780 mandatory=False)
1784 mandatory=False)
1781 parts.append(part)
1785 parts.append(part)
1782 raise
1786 raise
1783 finally:
1787 finally:
1784 lockmod.release(lockandtr[2], lockandtr[1], lockandtr[0])
1788 lockmod.release(lockandtr[2], lockandtr[1], lockandtr[0])
1785 if recordout is not None:
1789 if recordout is not None:
1786 recordout(repo.ui.popbuffer())
1790 recordout(repo.ui.popbuffer())
1787 return r
1791 return r
1788
1792
1789 def _maybeapplyclonebundle(pullop):
1793 def _maybeapplyclonebundle(pullop):
1790 """Apply a clone bundle from a remote, if possible."""
1794 """Apply a clone bundle from a remote, if possible."""
1791
1795
1792 repo = pullop.repo
1796 repo = pullop.repo
1793 remote = pullop.remote
1797 remote = pullop.remote
1794
1798
1795 if not repo.ui.configbool('ui', 'clonebundles'):
1799 if not repo.ui.configbool('ui', 'clonebundles'):
1796 return
1800 return
1797
1801
1798 # Only run if local repo is empty.
1802 # Only run if local repo is empty.
1799 if len(repo):
1803 if len(repo):
1800 return
1804 return
1801
1805
1802 if pullop.heads:
1806 if pullop.heads:
1803 return
1807 return
1804
1808
1805 if not remote.capable('clonebundles'):
1809 if not remote.capable('clonebundles'):
1806 return
1810 return
1807
1811
1808 res = remote._call('clonebundles')
1812 res = remote._call('clonebundles')
1809
1813
1810 # If we call the wire protocol command, that's good enough to record the
1814 # If we call the wire protocol command, that's good enough to record the
1811 # attempt.
1815 # attempt.
1812 pullop.clonebundleattempted = True
1816 pullop.clonebundleattempted = True
1813
1817
1814 entries = parseclonebundlesmanifest(repo, res)
1818 entries = parseclonebundlesmanifest(repo, res)
1815 if not entries:
1819 if not entries:
1816 repo.ui.note(_('no clone bundles available on remote; '
1820 repo.ui.note(_('no clone bundles available on remote; '
1817 'falling back to regular clone\n'))
1821 'falling back to regular clone\n'))
1818 return
1822 return
1819
1823
1820 entries = filterclonebundleentries(repo, entries)
1824 entries = filterclonebundleentries(repo, entries)
1821 if not entries:
1825 if not entries:
1822 # There is a thundering herd concern here. However, if a server
1826 # There is a thundering herd concern here. However, if a server
1823 # operator doesn't advertise bundles appropriate for its clients,
1827 # operator doesn't advertise bundles appropriate for its clients,
1824 # they deserve what's coming. Furthermore, from a client's
1828 # they deserve what's coming. Furthermore, from a client's
1825 # perspective, no automatic fallback would mean not being able to
1829 # perspective, no automatic fallback would mean not being able to
1826 # clone!
1830 # clone!
1827 repo.ui.warn(_('no compatible clone bundles available on server; '
1831 repo.ui.warn(_('no compatible clone bundles available on server; '
1828 'falling back to regular clone\n'))
1832 'falling back to regular clone\n'))
1829 repo.ui.warn(_('(you may want to report this to the server '
1833 repo.ui.warn(_('(you may want to report this to the server '
1830 'operator)\n'))
1834 'operator)\n'))
1831 return
1835 return
1832
1836
1833 entries = sortclonebundleentries(repo.ui, entries)
1837 entries = sortclonebundleentries(repo.ui, entries)
1834
1838
1835 url = entries[0]['URL']
1839 url = entries[0]['URL']
1836 repo.ui.status(_('applying clone bundle from %s\n') % url)
1840 repo.ui.status(_('applying clone bundle from %s\n') % url)
1837 if trypullbundlefromurl(repo.ui, repo, url):
1841 if trypullbundlefromurl(repo.ui, repo, url):
1838 repo.ui.status(_('finished applying clone bundle\n'))
1842 repo.ui.status(_('finished applying clone bundle\n'))
1839 # Bundle failed.
1843 # Bundle failed.
1840 #
1844 #
1841 # We abort by default to avoid the thundering herd of
1845 # We abort by default to avoid the thundering herd of
1842 # clients flooding a server that was expecting expensive
1846 # clients flooding a server that was expecting expensive
1843 # clone load to be offloaded.
1847 # clone load to be offloaded.
1844 elif repo.ui.configbool('ui', 'clonebundlefallback'):
1848 elif repo.ui.configbool('ui', 'clonebundlefallback'):
1845 repo.ui.warn(_('falling back to normal clone\n'))
1849 repo.ui.warn(_('falling back to normal clone\n'))
1846 else:
1850 else:
1847 raise error.Abort(_('error applying bundle'),
1851 raise error.Abort(_('error applying bundle'),
1848 hint=_('if this error persists, consider contacting '
1852 hint=_('if this error persists, consider contacting '
1849 'the server operator or disable clone '
1853 'the server operator or disable clone '
1850 'bundles via '
1854 'bundles via '
1851 '"--config ui.clonebundles=false"'))
1855 '"--config ui.clonebundles=false"'))
1852
1856
1853 def parseclonebundlesmanifest(repo, s):
1857 def parseclonebundlesmanifest(repo, s):
1854 """Parses the raw text of a clone bundles manifest.
1858 """Parses the raw text of a clone bundles manifest.
1855
1859
1856 Returns a list of dicts. The dicts have a ``URL`` key corresponding
1860 Returns a list of dicts. The dicts have a ``URL`` key corresponding
1857 to the URL and other keys are the attributes for the entry.
1861 to the URL and other keys are the attributes for the entry.
1858 """
1862 """
1859 m = []
1863 m = []
1860 for line in s.splitlines():
1864 for line in s.splitlines():
1861 fields = line.split()
1865 fields = line.split()
1862 if not fields:
1866 if not fields:
1863 continue
1867 continue
1864 attrs = {'URL': fields[0]}
1868 attrs = {'URL': fields[0]}
1865 for rawattr in fields[1:]:
1869 for rawattr in fields[1:]:
1866 key, value = rawattr.split('=', 1)
1870 key, value = rawattr.split('=', 1)
1867 key = urlreq.unquote(key)
1871 key = urlreq.unquote(key)
1868 value = urlreq.unquote(value)
1872 value = urlreq.unquote(value)
1869 attrs[key] = value
1873 attrs[key] = value
1870
1874
1871 # Parse BUNDLESPEC into components. This makes client-side
1875 # Parse BUNDLESPEC into components. This makes client-side
1872 # preferences easier to specify since you can prefer a single
1876 # preferences easier to specify since you can prefer a single
1873 # component of the BUNDLESPEC.
1877 # component of the BUNDLESPEC.
1874 if key == 'BUNDLESPEC':
1878 if key == 'BUNDLESPEC':
1875 try:
1879 try:
1876 comp, version, params = parsebundlespec(repo, value,
1880 comp, version, params = parsebundlespec(repo, value,
1877 externalnames=True)
1881 externalnames=True)
1878 attrs['COMPRESSION'] = comp
1882 attrs['COMPRESSION'] = comp
1879 attrs['VERSION'] = version
1883 attrs['VERSION'] = version
1880 except error.InvalidBundleSpecification:
1884 except error.InvalidBundleSpecification:
1881 pass
1885 pass
1882 except error.UnsupportedBundleSpecification:
1886 except error.UnsupportedBundleSpecification:
1883 pass
1887 pass
1884
1888
1885 m.append(attrs)
1889 m.append(attrs)
1886
1890
1887 return m
1891 return m
1888
1892
1889 def filterclonebundleentries(repo, entries):
1893 def filterclonebundleentries(repo, entries):
1890 """Remove incompatible clone bundle manifest entries.
1894 """Remove incompatible clone bundle manifest entries.
1891
1895
1892 Accepts a list of entries parsed with ``parseclonebundlesmanifest``
1896 Accepts a list of entries parsed with ``parseclonebundlesmanifest``
1893 and returns a new list consisting of only the entries that this client
1897 and returns a new list consisting of only the entries that this client
1894 should be able to apply.
1898 should be able to apply.
1895
1899
1896 There is no guarantee we'll be able to apply all returned entries because
1900 There is no guarantee we'll be able to apply all returned entries because
1897 the metadata we use to filter on may be missing or wrong.
1901 the metadata we use to filter on may be missing or wrong.
1898 """
1902 """
1899 newentries = []
1903 newentries = []
1900 for entry in entries:
1904 for entry in entries:
1901 spec = entry.get('BUNDLESPEC')
1905 spec = entry.get('BUNDLESPEC')
1902 if spec:
1906 if spec:
1903 try:
1907 try:
1904 parsebundlespec(repo, spec, strict=True)
1908 parsebundlespec(repo, spec, strict=True)
1905 except error.InvalidBundleSpecification as e:
1909 except error.InvalidBundleSpecification as e:
1906 repo.ui.debug(str(e) + '\n')
1910 repo.ui.debug(str(e) + '\n')
1907 continue
1911 continue
1908 except error.UnsupportedBundleSpecification as e:
1912 except error.UnsupportedBundleSpecification as e:
1909 repo.ui.debug('filtering %s because unsupported bundle '
1913 repo.ui.debug('filtering %s because unsupported bundle '
1910 'spec: %s\n' % (entry['URL'], str(e)))
1914 'spec: %s\n' % (entry['URL'], str(e)))
1911 continue
1915 continue
1912
1916
1913 if 'REQUIRESNI' in entry and not sslutil.hassni:
1917 if 'REQUIRESNI' in entry and not sslutil.hassni:
1914 repo.ui.debug('filtering %s because SNI not supported\n' %
1918 repo.ui.debug('filtering %s because SNI not supported\n' %
1915 entry['URL'])
1919 entry['URL'])
1916 continue
1920 continue
1917
1921
1918 newentries.append(entry)
1922 newentries.append(entry)
1919
1923
1920 return newentries
1924 return newentries
1921
1925
1922 class clonebundleentry(object):
1926 class clonebundleentry(object):
1923 """Represents an item in a clone bundles manifest.
1927 """Represents an item in a clone bundles manifest.
1924
1928
1925 This rich class is needed to support sorting since sorted() in Python 3
1929 This rich class is needed to support sorting since sorted() in Python 3
1926 doesn't support ``cmp`` and our comparison is complex enough that ``key=``
1930 doesn't support ``cmp`` and our comparison is complex enough that ``key=``
1927 won't work.
1931 won't work.
1928 """
1932 """
1929
1933
1930 def __init__(self, value, prefers):
1934 def __init__(self, value, prefers):
1931 self.value = value
1935 self.value = value
1932 self.prefers = prefers
1936 self.prefers = prefers
1933
1937
1934 def _cmp(self, other):
1938 def _cmp(self, other):
1935 for prefkey, prefvalue in self.prefers:
1939 for prefkey, prefvalue in self.prefers:
1936 avalue = self.value.get(prefkey)
1940 avalue = self.value.get(prefkey)
1937 bvalue = other.value.get(prefkey)
1941 bvalue = other.value.get(prefkey)
1938
1942
1939 # Special case for b missing attribute and a matches exactly.
1943 # Special case for b missing attribute and a matches exactly.
1940 if avalue is not None and bvalue is None and avalue == prefvalue:
1944 if avalue is not None and bvalue is None and avalue == prefvalue:
1941 return -1
1945 return -1
1942
1946
1943 # Special case for a missing attribute and b matches exactly.
1947 # Special case for a missing attribute and b matches exactly.
1944 if bvalue is not None and avalue is None and bvalue == prefvalue:
1948 if bvalue is not None and avalue is None and bvalue == prefvalue:
1945 return 1
1949 return 1
1946
1950
1947 # We can't compare unless attribute present on both.
1951 # We can't compare unless attribute present on both.
1948 if avalue is None or bvalue is None:
1952 if avalue is None or bvalue is None:
1949 continue
1953 continue
1950
1954
1951 # Same values should fall back to next attribute.
1955 # Same values should fall back to next attribute.
1952 if avalue == bvalue:
1956 if avalue == bvalue:
1953 continue
1957 continue
1954
1958
1955 # Exact matches come first.
1959 # Exact matches come first.
1956 if avalue == prefvalue:
1960 if avalue == prefvalue:
1957 return -1
1961 return -1
1958 if bvalue == prefvalue:
1962 if bvalue == prefvalue:
1959 return 1
1963 return 1
1960
1964
1961 # Fall back to next attribute.
1965 # Fall back to next attribute.
1962 continue
1966 continue
1963
1967
1964 # If we got here we couldn't sort by attributes and prefers. Fall
1968 # If we got here we couldn't sort by attributes and prefers. Fall
1965 # back to index order.
1969 # back to index order.
1966 return 0
1970 return 0
1967
1971
1968 def __lt__(self, other):
1972 def __lt__(self, other):
1969 return self._cmp(other) < 0
1973 return self._cmp(other) < 0
1970
1974
1971 def __gt__(self, other):
1975 def __gt__(self, other):
1972 return self._cmp(other) > 0
1976 return self._cmp(other) > 0
1973
1977
1974 def __eq__(self, other):
1978 def __eq__(self, other):
1975 return self._cmp(other) == 0
1979 return self._cmp(other) == 0
1976
1980
1977 def __le__(self, other):
1981 def __le__(self, other):
1978 return self._cmp(other) <= 0
1982 return self._cmp(other) <= 0
1979
1983
1980 def __ge__(self, other):
1984 def __ge__(self, other):
1981 return self._cmp(other) >= 0
1985 return self._cmp(other) >= 0
1982
1986
1983 def __ne__(self, other):
1987 def __ne__(self, other):
1984 return self._cmp(other) != 0
1988 return self._cmp(other) != 0
1985
1989
1986 def sortclonebundleentries(ui, entries):
1990 def sortclonebundleentries(ui, entries):
1987 prefers = ui.configlist('ui', 'clonebundleprefers')
1991 prefers = ui.configlist('ui', 'clonebundleprefers')
1988 if not prefers:
1992 if not prefers:
1989 return list(entries)
1993 return list(entries)
1990
1994
1991 prefers = [p.split('=', 1) for p in prefers]
1995 prefers = [p.split('=', 1) for p in prefers]
1992
1996
1993 items = sorted(clonebundleentry(v, prefers) for v in entries)
1997 items = sorted(clonebundleentry(v, prefers) for v in entries)
1994 return [i.value for i in items]
1998 return [i.value for i in items]
1995
1999
1996 def trypullbundlefromurl(ui, repo, url):
2000 def trypullbundlefromurl(ui, repo, url):
1997 """Attempt to apply a bundle from a URL."""
2001 """Attempt to apply a bundle from a URL."""
1998 with repo.lock(), repo.transaction('bundleurl') as tr:
2002 with repo.lock(), repo.transaction('bundleurl') as tr:
1999 try:
2003 try:
2000 fh = urlmod.open(ui, url)
2004 fh = urlmod.open(ui, url)
2001 cg = readbundle(ui, fh, 'stream')
2005 cg = readbundle(ui, fh, 'stream')
2002
2006
2003 if isinstance(cg, streamclone.streamcloneapplier):
2007 if isinstance(cg, streamclone.streamcloneapplier):
2004 cg.apply(repo)
2008 cg.apply(repo)
2005 else:
2009 else:
2006 bundle2.applybundle(repo, cg, tr, 'clonebundles', url)
2010 bundle2.applybundle(repo, cg, tr, 'clonebundles', url)
2007 return True
2011 return True
2008 except urlerr.httperror as e:
2012 except urlerr.httperror as e:
2009 ui.warn(_('HTTP error fetching bundle: %s\n') % str(e))
2013 ui.warn(_('HTTP error fetching bundle: %s\n') % str(e))
2010 except urlerr.urlerror as e:
2014 except urlerr.urlerror as e:
2011 ui.warn(_('error fetching bundle: %s\n') % e.reason)
2015 ui.warn(_('error fetching bundle: %s\n') % e.reason)
2012
2016
2013 return False
2017 return False
General Comments 0
You need to be logged in to leave comments. Login now