##// END OF EJS Templates
pull: remove inadequate use of operations records to update stepdone...
Boris Feld -
r34324:6c7aaf59 default
parent child Browse files
Show More
@@ -1,1924 +1,1923 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 phases,
161 phases,
162 pushkey,
162 pushkey,
163 pycompat,
163 pycompat,
164 tags,
164 tags,
165 url,
165 url,
166 util,
166 util,
167 )
167 )
168
168
169 urlerr = util.urlerr
169 urlerr = util.urlerr
170 urlreq = util.urlreq
170 urlreq = util.urlreq
171
171
172 _pack = struct.pack
172 _pack = struct.pack
173 _unpack = struct.unpack
173 _unpack = struct.unpack
174
174
175 _fstreamparamsize = '>i'
175 _fstreamparamsize = '>i'
176 _fpartheadersize = '>i'
176 _fpartheadersize = '>i'
177 _fparttypesize = '>B'
177 _fparttypesize = '>B'
178 _fpartid = '>I'
178 _fpartid = '>I'
179 _fpayloadsize = '>i'
179 _fpayloadsize = '>i'
180 _fpartparamcount = '>BB'
180 _fpartparamcount = '>BB'
181
181
182 preferedchunksize = 4096
182 preferedchunksize = 4096
183
183
184 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
184 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
185
185
186 def outdebug(ui, message):
186 def outdebug(ui, message):
187 """debug regarding output stream (bundling)"""
187 """debug regarding output stream (bundling)"""
188 if ui.configbool('devel', 'bundle2.debug'):
188 if ui.configbool('devel', 'bundle2.debug'):
189 ui.debug('bundle2-output: %s\n' % message)
189 ui.debug('bundle2-output: %s\n' % message)
190
190
191 def indebug(ui, message):
191 def indebug(ui, message):
192 """debug on input stream (unbundling)"""
192 """debug on input stream (unbundling)"""
193 if ui.configbool('devel', 'bundle2.debug'):
193 if ui.configbool('devel', 'bundle2.debug'):
194 ui.debug('bundle2-input: %s\n' % message)
194 ui.debug('bundle2-input: %s\n' % message)
195
195
196 def validateparttype(parttype):
196 def validateparttype(parttype):
197 """raise ValueError if a parttype contains invalid character"""
197 """raise ValueError if a parttype contains invalid character"""
198 if _parttypeforbidden.search(parttype):
198 if _parttypeforbidden.search(parttype):
199 raise ValueError(parttype)
199 raise ValueError(parttype)
200
200
201 def _makefpartparamsizes(nbparams):
201 def _makefpartparamsizes(nbparams):
202 """return a struct format to read part parameter sizes
202 """return a struct format to read part parameter sizes
203
203
204 The number parameters is variable so we need to build that format
204 The number parameters is variable so we need to build that format
205 dynamically.
205 dynamically.
206 """
206 """
207 return '>'+('BB'*nbparams)
207 return '>'+('BB'*nbparams)
208
208
209 parthandlermapping = {}
209 parthandlermapping = {}
210
210
211 def parthandler(parttype, params=()):
211 def parthandler(parttype, params=()):
212 """decorator that register a function as a bundle2 part handler
212 """decorator that register a function as a bundle2 part handler
213
213
214 eg::
214 eg::
215
215
216 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
216 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
217 def myparttypehandler(...):
217 def myparttypehandler(...):
218 '''process a part of type "my part".'''
218 '''process a part of type "my part".'''
219 ...
219 ...
220 """
220 """
221 validateparttype(parttype)
221 validateparttype(parttype)
222 def _decorator(func):
222 def _decorator(func):
223 lparttype = parttype.lower() # enforce lower case matching.
223 lparttype = parttype.lower() # enforce lower case matching.
224 assert lparttype not in parthandlermapping
224 assert lparttype not in parthandlermapping
225 parthandlermapping[lparttype] = func
225 parthandlermapping[lparttype] = func
226 func.params = frozenset(params)
226 func.params = frozenset(params)
227 return func
227 return func
228 return _decorator
228 return _decorator
229
229
230 class unbundlerecords(object):
230 class unbundlerecords(object):
231 """keep record of what happens during and unbundle
231 """keep record of what happens during and unbundle
232
232
233 New records are added using `records.add('cat', obj)`. Where 'cat' is a
233 New records are added using `records.add('cat', obj)`. Where 'cat' is a
234 category of record and obj is an arbitrary object.
234 category of record and obj is an arbitrary object.
235
235
236 `records['cat']` will return all entries of this category 'cat'.
236 `records['cat']` will return all entries of this category 'cat'.
237
237
238 Iterating on the object itself will yield `('category', obj)` tuples
238 Iterating on the object itself will yield `('category', obj)` tuples
239 for all entries.
239 for all entries.
240
240
241 All iterations happens in chronological order.
241 All iterations happens in chronological order.
242 """
242 """
243
243
244 def __init__(self):
244 def __init__(self):
245 self._categories = {}
245 self._categories = {}
246 self._sequences = []
246 self._sequences = []
247 self._replies = {}
247 self._replies = {}
248
248
249 def add(self, category, entry, inreplyto=None):
249 def add(self, category, entry, inreplyto=None):
250 """add a new record of a given category.
250 """add a new record of a given category.
251
251
252 The entry can then be retrieved in the list returned by
252 The entry can then be retrieved in the list returned by
253 self['category']."""
253 self['category']."""
254 self._categories.setdefault(category, []).append(entry)
254 self._categories.setdefault(category, []).append(entry)
255 self._sequences.append((category, entry))
255 self._sequences.append((category, entry))
256 if inreplyto is not None:
256 if inreplyto is not None:
257 self.getreplies(inreplyto).add(category, entry)
257 self.getreplies(inreplyto).add(category, entry)
258
258
259 def getreplies(self, partid):
259 def getreplies(self, partid):
260 """get the records that are replies to a specific part"""
260 """get the records that are replies to a specific part"""
261 return self._replies.setdefault(partid, unbundlerecords())
261 return self._replies.setdefault(partid, unbundlerecords())
262
262
263 def __getitem__(self, cat):
263 def __getitem__(self, cat):
264 return tuple(self._categories.get(cat, ()))
264 return tuple(self._categories.get(cat, ()))
265
265
266 def __iter__(self):
266 def __iter__(self):
267 return iter(self._sequences)
267 return iter(self._sequences)
268
268
269 def __len__(self):
269 def __len__(self):
270 return len(self._sequences)
270 return len(self._sequences)
271
271
272 def __nonzero__(self):
272 def __nonzero__(self):
273 return bool(self._sequences)
273 return bool(self._sequences)
274
274
275 __bool__ = __nonzero__
275 __bool__ = __nonzero__
276
276
277 class bundleoperation(object):
277 class bundleoperation(object):
278 """an object that represents a single bundling process
278 """an object that represents a single bundling process
279
279
280 Its purpose is to carry unbundle-related objects and states.
280 Its purpose is to carry unbundle-related objects and states.
281
281
282 A new object should be created at the beginning of each bundle processing.
282 A new object should be created at the beginning of each bundle processing.
283 The object is to be returned by the processing function.
283 The object is to be returned by the processing function.
284
284
285 The object has very little content now it will ultimately contain:
285 The object has very little content now it will ultimately contain:
286 * an access to the repo the bundle is applied to,
286 * an access to the repo the bundle is applied to,
287 * a ui object,
287 * a ui object,
288 * a way to retrieve a transaction to add changes to the repo,
288 * a way to retrieve a transaction to add changes to the repo,
289 * a way to record the result of processing each part,
289 * a way to record the result of processing each part,
290 * a way to construct a bundle response when applicable.
290 * a way to construct a bundle response when applicable.
291 """
291 """
292
292
293 def __init__(self, repo, transactiongetter, captureoutput=True):
293 def __init__(self, repo, transactiongetter, captureoutput=True):
294 self.repo = repo
294 self.repo = repo
295 self.ui = repo.ui
295 self.ui = repo.ui
296 self.records = unbundlerecords()
296 self.records = unbundlerecords()
297 self.reply = None
297 self.reply = None
298 self.captureoutput = captureoutput
298 self.captureoutput = captureoutput
299 self.hookargs = {}
299 self.hookargs = {}
300 self._gettransaction = transactiongetter
300 self._gettransaction = transactiongetter
301
301
302 def gettransaction(self):
302 def gettransaction(self):
303 transaction = self._gettransaction()
303 transaction = self._gettransaction()
304
304
305 if self.hookargs:
305 if self.hookargs:
306 # the ones added to the transaction supercede those added
306 # the ones added to the transaction supercede those added
307 # to the operation.
307 # to the operation.
308 self.hookargs.update(transaction.hookargs)
308 self.hookargs.update(transaction.hookargs)
309 transaction.hookargs = self.hookargs
309 transaction.hookargs = self.hookargs
310
310
311 # mark the hookargs as flushed. further attempts to add to
311 # mark the hookargs as flushed. further attempts to add to
312 # hookargs will result in an abort.
312 # hookargs will result in an abort.
313 self.hookargs = None
313 self.hookargs = None
314
314
315 return transaction
315 return transaction
316
316
317 def addhookargs(self, hookargs):
317 def addhookargs(self, hookargs):
318 if self.hookargs is None:
318 if self.hookargs is None:
319 raise error.ProgrammingError('attempted to add hookargs to '
319 raise error.ProgrammingError('attempted to add hookargs to '
320 'operation after transaction started')
320 'operation after transaction started')
321 self.hookargs.update(hookargs)
321 self.hookargs.update(hookargs)
322
322
323 class TransactionUnavailable(RuntimeError):
323 class TransactionUnavailable(RuntimeError):
324 pass
324 pass
325
325
326 def _notransaction():
326 def _notransaction():
327 """default method to get a transaction while processing a bundle
327 """default method to get a transaction while processing a bundle
328
328
329 Raise an exception to highlight the fact that no transaction was expected
329 Raise an exception to highlight the fact that no transaction was expected
330 to be created"""
330 to be created"""
331 raise TransactionUnavailable()
331 raise TransactionUnavailable()
332
332
333 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
333 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
334 # transform me into unbundler.apply() as soon as the freeze is lifted
334 # transform me into unbundler.apply() as soon as the freeze is lifted
335 if isinstance(unbundler, unbundle20):
335 if isinstance(unbundler, unbundle20):
336 tr.hookargs['bundle2'] = '1'
336 tr.hookargs['bundle2'] = '1'
337 if source is not None and 'source' not in tr.hookargs:
337 if source is not None and 'source' not in tr.hookargs:
338 tr.hookargs['source'] = source
338 tr.hookargs['source'] = source
339 if url is not None and 'url' not in tr.hookargs:
339 if url is not None and 'url' not in tr.hookargs:
340 tr.hookargs['url'] = url
340 tr.hookargs['url'] = url
341 return processbundle(repo, unbundler, lambda: tr)
341 return processbundle(repo, unbundler, lambda: tr)
342 else:
342 else:
343 # the transactiongetter won't be used, but we might as well set it
343 # the transactiongetter won't be used, but we might as well set it
344 op = bundleoperation(repo, lambda: tr)
344 op = bundleoperation(repo, lambda: tr)
345 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
345 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
346 return op
346 return op
347
347
348 class partiterator(object):
348 class partiterator(object):
349 def __init__(self, repo, op, unbundler):
349 def __init__(self, repo, op, unbundler):
350 self.repo = repo
350 self.repo = repo
351 self.op = op
351 self.op = op
352 self.unbundler = unbundler
352 self.unbundler = unbundler
353 self.iterator = None
353 self.iterator = None
354 self.count = 0
354 self.count = 0
355 self.current = None
355 self.current = None
356
356
357 def __enter__(self):
357 def __enter__(self):
358 def func():
358 def func():
359 itr = enumerate(self.unbundler.iterparts())
359 itr = enumerate(self.unbundler.iterparts())
360 for count, p in itr:
360 for count, p in itr:
361 self.count = count
361 self.count = count
362 self.current = p
362 self.current = p
363 yield p
363 yield p
364 p.seek(0, 2)
364 p.seek(0, 2)
365 self.current = None
365 self.current = None
366 self.iterator = func()
366 self.iterator = func()
367 return self.iterator
367 return self.iterator
368
368
369 def __exit__(self, type, exc, tb):
369 def __exit__(self, type, exc, tb):
370 if not self.iterator:
370 if not self.iterator:
371 return
371 return
372
372
373 if exc:
373 if exc:
374 # If exiting or interrupted, do not attempt to seek the stream in
374 # If exiting or interrupted, do not attempt to seek the stream in
375 # the finally block below. This makes abort faster.
375 # the finally block below. This makes abort faster.
376 if (self.current and
376 if (self.current and
377 not isinstance(exc, (SystemExit, KeyboardInterrupt))):
377 not isinstance(exc, (SystemExit, KeyboardInterrupt))):
378 # consume the part content to not corrupt the stream.
378 # consume the part content to not corrupt the stream.
379 self.current.seek(0, 2)
379 self.current.seek(0, 2)
380
380
381 # Any exceptions seeking to the end of the bundle at this point are
381 # Any exceptions seeking to the end of the bundle at this point are
382 # almost certainly related to the underlying stream being bad.
382 # almost certainly related to the underlying stream being bad.
383 # And, chances are that the exception we're handling is related to
383 # And, chances are that the exception we're handling is related to
384 # getting in that bad state. So, we swallow the seeking error and
384 # getting in that bad state. So, we swallow the seeking error and
385 # re-raise the original error.
385 # re-raise the original error.
386 seekerror = False
386 seekerror = False
387 try:
387 try:
388 for part in self.iterator:
388 for part in self.iterator:
389 # consume the bundle content
389 # consume the bundle content
390 part.seek(0, 2)
390 part.seek(0, 2)
391 except Exception:
391 except Exception:
392 seekerror = True
392 seekerror = True
393
393
394 # Small hack to let caller code distinguish exceptions from bundle2
394 # Small hack to let caller code distinguish exceptions from bundle2
395 # processing from processing the old format. This is mostly needed
395 # processing from processing the old format. This is mostly needed
396 # to handle different return codes to unbundle according to the type
396 # to handle different return codes to unbundle according to the type
397 # of bundle. We should probably clean up or drop this return code
397 # of bundle. We should probably clean up or drop this return code
398 # craziness in a future version.
398 # craziness in a future version.
399 exc.duringunbundle2 = True
399 exc.duringunbundle2 = True
400 salvaged = []
400 salvaged = []
401 replycaps = None
401 replycaps = None
402 if self.op.reply is not None:
402 if self.op.reply is not None:
403 salvaged = self.op.reply.salvageoutput()
403 salvaged = self.op.reply.salvageoutput()
404 replycaps = self.op.reply.capabilities
404 replycaps = self.op.reply.capabilities
405 exc._replycaps = replycaps
405 exc._replycaps = replycaps
406 exc._bundle2salvagedoutput = salvaged
406 exc._bundle2salvagedoutput = salvaged
407
407
408 # Re-raising from a variable loses the original stack. So only use
408 # Re-raising from a variable loses the original stack. So only use
409 # that form if we need to.
409 # that form if we need to.
410 if seekerror:
410 if seekerror:
411 raise exc
411 raise exc
412
412
413 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
413 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
414 self.count)
414 self.count)
415
415
416 def processbundle(repo, unbundler, transactiongetter=None, op=None):
416 def processbundle(repo, unbundler, transactiongetter=None, op=None):
417 """This function process a bundle, apply effect to/from a repo
417 """This function process a bundle, apply effect to/from a repo
418
418
419 It iterates over each part then searches for and uses the proper handling
419 It iterates over each part then searches for and uses the proper handling
420 code to process the part. Parts are processed in order.
420 code to process the part. Parts are processed in order.
421
421
422 Unknown Mandatory part will abort the process.
422 Unknown Mandatory part will abort the process.
423
423
424 It is temporarily possible to provide a prebuilt bundleoperation to the
424 It is temporarily possible to provide a prebuilt bundleoperation to the
425 function. This is used to ensure output is properly propagated in case of
425 function. This is used to ensure output is properly propagated in case of
426 an error during the unbundling. This output capturing part will likely be
426 an error during the unbundling. This output capturing part will likely be
427 reworked and this ability will probably go away in the process.
427 reworked and this ability will probably go away in the process.
428 """
428 """
429 if op is None:
429 if op is None:
430 if transactiongetter is None:
430 if transactiongetter is None:
431 transactiongetter = _notransaction
431 transactiongetter = _notransaction
432 op = bundleoperation(repo, transactiongetter)
432 op = bundleoperation(repo, transactiongetter)
433 # todo:
433 # todo:
434 # - replace this is a init function soon.
434 # - replace this is a init function soon.
435 # - exception catching
435 # - exception catching
436 unbundler.params
436 unbundler.params
437 if repo.ui.debugflag:
437 if repo.ui.debugflag:
438 msg = ['bundle2-input-bundle:']
438 msg = ['bundle2-input-bundle:']
439 if unbundler.params:
439 if unbundler.params:
440 msg.append(' %i params' % len(unbundler.params))
440 msg.append(' %i params' % len(unbundler.params))
441 if op._gettransaction is None or op._gettransaction is _notransaction:
441 if op._gettransaction is None or op._gettransaction is _notransaction:
442 msg.append(' no-transaction')
442 msg.append(' no-transaction')
443 else:
443 else:
444 msg.append(' with-transaction')
444 msg.append(' with-transaction')
445 msg.append('\n')
445 msg.append('\n')
446 repo.ui.debug(''.join(msg))
446 repo.ui.debug(''.join(msg))
447
447
448 processparts(repo, op, unbundler)
448 processparts(repo, op, unbundler)
449
449
450 return op
450 return op
451
451
452 def processparts(repo, op, unbundler):
452 def processparts(repo, op, unbundler):
453 with partiterator(repo, op, unbundler) as parts:
453 with partiterator(repo, op, unbundler) as parts:
454 for part in parts:
454 for part in parts:
455 _processpart(op, part)
455 _processpart(op, part)
456
456
457 def _processchangegroup(op, cg, tr, source, url, **kwargs):
457 def _processchangegroup(op, cg, tr, source, url, **kwargs):
458 ret = cg.apply(op.repo, tr, source, url, **kwargs)
458 ret = cg.apply(op.repo, tr, source, url, **kwargs)
459 op.records.add('changegroup', {
459 op.records.add('changegroup', {
460 'return': ret,
460 'return': ret,
461 })
461 })
462 return ret
462 return ret
463
463
464 def _gethandler(op, part):
464 def _gethandler(op, part):
465 status = 'unknown' # used by debug output
465 status = 'unknown' # used by debug output
466 try:
466 try:
467 handler = parthandlermapping.get(part.type)
467 handler = parthandlermapping.get(part.type)
468 if handler is None:
468 if handler is None:
469 status = 'unsupported-type'
469 status = 'unsupported-type'
470 raise error.BundleUnknownFeatureError(parttype=part.type)
470 raise error.BundleUnknownFeatureError(parttype=part.type)
471 indebug(op.ui, 'found a handler for part %s' % part.type)
471 indebug(op.ui, 'found a handler for part %s' % part.type)
472 unknownparams = part.mandatorykeys - handler.params
472 unknownparams = part.mandatorykeys - handler.params
473 if unknownparams:
473 if unknownparams:
474 unknownparams = list(unknownparams)
474 unknownparams = list(unknownparams)
475 unknownparams.sort()
475 unknownparams.sort()
476 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
476 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
477 raise error.BundleUnknownFeatureError(parttype=part.type,
477 raise error.BundleUnknownFeatureError(parttype=part.type,
478 params=unknownparams)
478 params=unknownparams)
479 status = 'supported'
479 status = 'supported'
480 except error.BundleUnknownFeatureError as exc:
480 except error.BundleUnknownFeatureError as exc:
481 if part.mandatory: # mandatory parts
481 if part.mandatory: # mandatory parts
482 raise
482 raise
483 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
483 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
484 return # skip to part processing
484 return # skip to part processing
485 finally:
485 finally:
486 if op.ui.debugflag:
486 if op.ui.debugflag:
487 msg = ['bundle2-input-part: "%s"' % part.type]
487 msg = ['bundle2-input-part: "%s"' % part.type]
488 if not part.mandatory:
488 if not part.mandatory:
489 msg.append(' (advisory)')
489 msg.append(' (advisory)')
490 nbmp = len(part.mandatorykeys)
490 nbmp = len(part.mandatorykeys)
491 nbap = len(part.params) - nbmp
491 nbap = len(part.params) - nbmp
492 if nbmp or nbap:
492 if nbmp or nbap:
493 msg.append(' (params:')
493 msg.append(' (params:')
494 if nbmp:
494 if nbmp:
495 msg.append(' %i mandatory' % nbmp)
495 msg.append(' %i mandatory' % nbmp)
496 if nbap:
496 if nbap:
497 msg.append(' %i advisory' % nbmp)
497 msg.append(' %i advisory' % nbmp)
498 msg.append(')')
498 msg.append(')')
499 msg.append(' %s\n' % status)
499 msg.append(' %s\n' % status)
500 op.ui.debug(''.join(msg))
500 op.ui.debug(''.join(msg))
501
501
502 return handler
502 return handler
503
503
504 def _processpart(op, part):
504 def _processpart(op, part):
505 """process a single part from a bundle
505 """process a single part from a bundle
506
506
507 The part is guaranteed to have been fully consumed when the function exits
507 The part is guaranteed to have been fully consumed when the function exits
508 (even if an exception is raised)."""
508 (even if an exception is raised)."""
509 handler = _gethandler(op, part)
509 handler = _gethandler(op, part)
510 if handler is None:
510 if handler is None:
511 return
511 return
512
512
513 # handler is called outside the above try block so that we don't
513 # handler is called outside the above try block so that we don't
514 # risk catching KeyErrors from anything other than the
514 # risk catching KeyErrors from anything other than the
515 # parthandlermapping lookup (any KeyError raised by handler()
515 # parthandlermapping lookup (any KeyError raised by handler()
516 # itself represents a defect of a different variety).
516 # itself represents a defect of a different variety).
517 output = None
517 output = None
518 if op.captureoutput and op.reply is not None:
518 if op.captureoutput and op.reply is not None:
519 op.ui.pushbuffer(error=True, subproc=True)
519 op.ui.pushbuffer(error=True, subproc=True)
520 output = ''
520 output = ''
521 try:
521 try:
522 handler(op, part)
522 handler(op, part)
523 finally:
523 finally:
524 if output is not None:
524 if output is not None:
525 output = op.ui.popbuffer()
525 output = op.ui.popbuffer()
526 if output:
526 if output:
527 outpart = op.reply.newpart('output', data=output,
527 outpart = op.reply.newpart('output', data=output,
528 mandatory=False)
528 mandatory=False)
529 outpart.addparam(
529 outpart.addparam(
530 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
530 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
531
531
532 def decodecaps(blob):
532 def decodecaps(blob):
533 """decode a bundle2 caps bytes blob into a dictionary
533 """decode a bundle2 caps bytes blob into a dictionary
534
534
535 The blob is a list of capabilities (one per line)
535 The blob is a list of capabilities (one per line)
536 Capabilities may have values using a line of the form::
536 Capabilities may have values using a line of the form::
537
537
538 capability=value1,value2,value3
538 capability=value1,value2,value3
539
539
540 The values are always a list."""
540 The values are always a list."""
541 caps = {}
541 caps = {}
542 for line in blob.splitlines():
542 for line in blob.splitlines():
543 if not line:
543 if not line:
544 continue
544 continue
545 if '=' not in line:
545 if '=' not in line:
546 key, vals = line, ()
546 key, vals = line, ()
547 else:
547 else:
548 key, vals = line.split('=', 1)
548 key, vals = line.split('=', 1)
549 vals = vals.split(',')
549 vals = vals.split(',')
550 key = urlreq.unquote(key)
550 key = urlreq.unquote(key)
551 vals = [urlreq.unquote(v) for v in vals]
551 vals = [urlreq.unquote(v) for v in vals]
552 caps[key] = vals
552 caps[key] = vals
553 return caps
553 return caps
554
554
555 def encodecaps(caps):
555 def encodecaps(caps):
556 """encode a bundle2 caps dictionary into a bytes blob"""
556 """encode a bundle2 caps dictionary into a bytes blob"""
557 chunks = []
557 chunks = []
558 for ca in sorted(caps):
558 for ca in sorted(caps):
559 vals = caps[ca]
559 vals = caps[ca]
560 ca = urlreq.quote(ca)
560 ca = urlreq.quote(ca)
561 vals = [urlreq.quote(v) for v in vals]
561 vals = [urlreq.quote(v) for v in vals]
562 if vals:
562 if vals:
563 ca = "%s=%s" % (ca, ','.join(vals))
563 ca = "%s=%s" % (ca, ','.join(vals))
564 chunks.append(ca)
564 chunks.append(ca)
565 return '\n'.join(chunks)
565 return '\n'.join(chunks)
566
566
567 bundletypes = {
567 bundletypes = {
568 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
568 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
569 # since the unification ssh accepts a header but there
569 # since the unification ssh accepts a header but there
570 # is no capability signaling it.
570 # is no capability signaling it.
571 "HG20": (), # special-cased below
571 "HG20": (), # special-cased below
572 "HG10UN": ("HG10UN", 'UN'),
572 "HG10UN": ("HG10UN", 'UN'),
573 "HG10BZ": ("HG10", 'BZ'),
573 "HG10BZ": ("HG10", 'BZ'),
574 "HG10GZ": ("HG10GZ", 'GZ'),
574 "HG10GZ": ("HG10GZ", 'GZ'),
575 }
575 }
576
576
577 # hgweb uses this list to communicate its preferred type
577 # hgweb uses this list to communicate its preferred type
578 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
578 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
579
579
580 class bundle20(object):
580 class bundle20(object):
581 """represent an outgoing bundle2 container
581 """represent an outgoing bundle2 container
582
582
583 Use the `addparam` method to add stream level parameter. and `newpart` to
583 Use the `addparam` method to add stream level parameter. and `newpart` to
584 populate it. Then call `getchunks` to retrieve all the binary chunks of
584 populate it. Then call `getchunks` to retrieve all the binary chunks of
585 data that compose the bundle2 container."""
585 data that compose the bundle2 container."""
586
586
587 _magicstring = 'HG20'
587 _magicstring = 'HG20'
588
588
589 def __init__(self, ui, capabilities=()):
589 def __init__(self, ui, capabilities=()):
590 self.ui = ui
590 self.ui = ui
591 self._params = []
591 self._params = []
592 self._parts = []
592 self._parts = []
593 self.capabilities = dict(capabilities)
593 self.capabilities = dict(capabilities)
594 self._compengine = util.compengines.forbundletype('UN')
594 self._compengine = util.compengines.forbundletype('UN')
595 self._compopts = None
595 self._compopts = None
596
596
597 def setcompression(self, alg, compopts=None):
597 def setcompression(self, alg, compopts=None):
598 """setup core part compression to <alg>"""
598 """setup core part compression to <alg>"""
599 if alg in (None, 'UN'):
599 if alg in (None, 'UN'):
600 return
600 return
601 assert not any(n.lower() == 'compression' for n, v in self._params)
601 assert not any(n.lower() == 'compression' for n, v in self._params)
602 self.addparam('Compression', alg)
602 self.addparam('Compression', alg)
603 self._compengine = util.compengines.forbundletype(alg)
603 self._compengine = util.compengines.forbundletype(alg)
604 self._compopts = compopts
604 self._compopts = compopts
605
605
606 @property
606 @property
607 def nbparts(self):
607 def nbparts(self):
608 """total number of parts added to the bundler"""
608 """total number of parts added to the bundler"""
609 return len(self._parts)
609 return len(self._parts)
610
610
611 # methods used to defines the bundle2 content
611 # methods used to defines the bundle2 content
612 def addparam(self, name, value=None):
612 def addparam(self, name, value=None):
613 """add a stream level parameter"""
613 """add a stream level parameter"""
614 if not name:
614 if not name:
615 raise ValueError(r'empty parameter name')
615 raise ValueError(r'empty parameter name')
616 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
616 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
617 raise ValueError(r'non letter first character: %s' % name)
617 raise ValueError(r'non letter first character: %s' % name)
618 self._params.append((name, value))
618 self._params.append((name, value))
619
619
620 def addpart(self, part):
620 def addpart(self, part):
621 """add a new part to the bundle2 container
621 """add a new part to the bundle2 container
622
622
623 Parts contains the actual applicative payload."""
623 Parts contains the actual applicative payload."""
624 assert part.id is None
624 assert part.id is None
625 part.id = len(self._parts) # very cheap counter
625 part.id = len(self._parts) # very cheap counter
626 self._parts.append(part)
626 self._parts.append(part)
627
627
628 def newpart(self, typeid, *args, **kwargs):
628 def newpart(self, typeid, *args, **kwargs):
629 """create a new part and add it to the containers
629 """create a new part and add it to the containers
630
630
631 As the part is directly added to the containers. For now, this means
631 As the part is directly added to the containers. For now, this means
632 that any failure to properly initialize the part after calling
632 that any failure to properly initialize the part after calling
633 ``newpart`` should result in a failure of the whole bundling process.
633 ``newpart`` should result in a failure of the whole bundling process.
634
634
635 You can still fall back to manually create and add if you need better
635 You can still fall back to manually create and add if you need better
636 control."""
636 control."""
637 part = bundlepart(typeid, *args, **kwargs)
637 part = bundlepart(typeid, *args, **kwargs)
638 self.addpart(part)
638 self.addpart(part)
639 return part
639 return part
640
640
641 # methods used to generate the bundle2 stream
641 # methods used to generate the bundle2 stream
642 def getchunks(self):
642 def getchunks(self):
643 if self.ui.debugflag:
643 if self.ui.debugflag:
644 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
644 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
645 if self._params:
645 if self._params:
646 msg.append(' (%i params)' % len(self._params))
646 msg.append(' (%i params)' % len(self._params))
647 msg.append(' %i parts total\n' % len(self._parts))
647 msg.append(' %i parts total\n' % len(self._parts))
648 self.ui.debug(''.join(msg))
648 self.ui.debug(''.join(msg))
649 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
649 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
650 yield self._magicstring
650 yield self._magicstring
651 param = self._paramchunk()
651 param = self._paramchunk()
652 outdebug(self.ui, 'bundle parameter: %s' % param)
652 outdebug(self.ui, 'bundle parameter: %s' % param)
653 yield _pack(_fstreamparamsize, len(param))
653 yield _pack(_fstreamparamsize, len(param))
654 if param:
654 if param:
655 yield param
655 yield param
656 for chunk in self._compengine.compressstream(self._getcorechunk(),
656 for chunk in self._compengine.compressstream(self._getcorechunk(),
657 self._compopts):
657 self._compopts):
658 yield chunk
658 yield chunk
659
659
660 def _paramchunk(self):
660 def _paramchunk(self):
661 """return a encoded version of all stream parameters"""
661 """return a encoded version of all stream parameters"""
662 blocks = []
662 blocks = []
663 for par, value in self._params:
663 for par, value in self._params:
664 par = urlreq.quote(par)
664 par = urlreq.quote(par)
665 if value is not None:
665 if value is not None:
666 value = urlreq.quote(value)
666 value = urlreq.quote(value)
667 par = '%s=%s' % (par, value)
667 par = '%s=%s' % (par, value)
668 blocks.append(par)
668 blocks.append(par)
669 return ' '.join(blocks)
669 return ' '.join(blocks)
670
670
671 def _getcorechunk(self):
671 def _getcorechunk(self):
672 """yield chunk for the core part of the bundle
672 """yield chunk for the core part of the bundle
673
673
674 (all but headers and parameters)"""
674 (all but headers and parameters)"""
675 outdebug(self.ui, 'start of parts')
675 outdebug(self.ui, 'start of parts')
676 for part in self._parts:
676 for part in self._parts:
677 outdebug(self.ui, 'bundle part: "%s"' % part.type)
677 outdebug(self.ui, 'bundle part: "%s"' % part.type)
678 for chunk in part.getchunks(ui=self.ui):
678 for chunk in part.getchunks(ui=self.ui):
679 yield chunk
679 yield chunk
680 outdebug(self.ui, 'end of bundle')
680 outdebug(self.ui, 'end of bundle')
681 yield _pack(_fpartheadersize, 0)
681 yield _pack(_fpartheadersize, 0)
682
682
683
683
684 def salvageoutput(self):
684 def salvageoutput(self):
685 """return a list with a copy of all output parts in the bundle
685 """return a list with a copy of all output parts in the bundle
686
686
687 This is meant to be used during error handling to make sure we preserve
687 This is meant to be used during error handling to make sure we preserve
688 server output"""
688 server output"""
689 salvaged = []
689 salvaged = []
690 for part in self._parts:
690 for part in self._parts:
691 if part.type.startswith('output'):
691 if part.type.startswith('output'):
692 salvaged.append(part.copy())
692 salvaged.append(part.copy())
693 return salvaged
693 return salvaged
694
694
695
695
696 class unpackermixin(object):
696 class unpackermixin(object):
697 """A mixin to extract bytes and struct data from a stream"""
697 """A mixin to extract bytes and struct data from a stream"""
698
698
699 def __init__(self, fp):
699 def __init__(self, fp):
700 self._fp = fp
700 self._fp = fp
701
701
702 def _unpack(self, format):
702 def _unpack(self, format):
703 """unpack this struct format from the stream
703 """unpack this struct format from the stream
704
704
705 This method is meant for internal usage by the bundle2 protocol only.
705 This method is meant for internal usage by the bundle2 protocol only.
706 They directly manipulate the low level stream including bundle2 level
706 They directly manipulate the low level stream including bundle2 level
707 instruction.
707 instruction.
708
708
709 Do not use it to implement higher-level logic or methods."""
709 Do not use it to implement higher-level logic or methods."""
710 data = self._readexact(struct.calcsize(format))
710 data = self._readexact(struct.calcsize(format))
711 return _unpack(format, data)
711 return _unpack(format, data)
712
712
713 def _readexact(self, size):
713 def _readexact(self, size):
714 """read exactly <size> bytes from the stream
714 """read exactly <size> bytes from the stream
715
715
716 This method is meant for internal usage by the bundle2 protocol only.
716 This method is meant for internal usage by the bundle2 protocol only.
717 They directly manipulate the low level stream including bundle2 level
717 They directly manipulate the low level stream including bundle2 level
718 instruction.
718 instruction.
719
719
720 Do not use it to implement higher-level logic or methods."""
720 Do not use it to implement higher-level logic or methods."""
721 return changegroup.readexactly(self._fp, size)
721 return changegroup.readexactly(self._fp, size)
722
722
723 def getunbundler(ui, fp, magicstring=None):
723 def getunbundler(ui, fp, magicstring=None):
724 """return a valid unbundler object for a given magicstring"""
724 """return a valid unbundler object for a given magicstring"""
725 if magicstring is None:
725 if magicstring is None:
726 magicstring = changegroup.readexactly(fp, 4)
726 magicstring = changegroup.readexactly(fp, 4)
727 magic, version = magicstring[0:2], magicstring[2:4]
727 magic, version = magicstring[0:2], magicstring[2:4]
728 if magic != 'HG':
728 if magic != 'HG':
729 ui.debug(
729 ui.debug(
730 "error: invalid magic: %r (version %r), should be 'HG'\n"
730 "error: invalid magic: %r (version %r), should be 'HG'\n"
731 % (magic, version))
731 % (magic, version))
732 raise error.Abort(_('not a Mercurial bundle'))
732 raise error.Abort(_('not a Mercurial bundle'))
733 unbundlerclass = formatmap.get(version)
733 unbundlerclass = formatmap.get(version)
734 if unbundlerclass is None:
734 if unbundlerclass is None:
735 raise error.Abort(_('unknown bundle version %s') % version)
735 raise error.Abort(_('unknown bundle version %s') % version)
736 unbundler = unbundlerclass(ui, fp)
736 unbundler = unbundlerclass(ui, fp)
737 indebug(ui, 'start processing of %s stream' % magicstring)
737 indebug(ui, 'start processing of %s stream' % magicstring)
738 return unbundler
738 return unbundler
739
739
740 class unbundle20(unpackermixin):
740 class unbundle20(unpackermixin):
741 """interpret a bundle2 stream
741 """interpret a bundle2 stream
742
742
743 This class is fed with a binary stream and yields parts through its
743 This class is fed with a binary stream and yields parts through its
744 `iterparts` methods."""
744 `iterparts` methods."""
745
745
746 _magicstring = 'HG20'
746 _magicstring = 'HG20'
747
747
748 def __init__(self, ui, fp):
748 def __init__(self, ui, fp):
749 """If header is specified, we do not read it out of the stream."""
749 """If header is specified, we do not read it out of the stream."""
750 self.ui = ui
750 self.ui = ui
751 self._compengine = util.compengines.forbundletype('UN')
751 self._compengine = util.compengines.forbundletype('UN')
752 self._compressed = None
752 self._compressed = None
753 super(unbundle20, self).__init__(fp)
753 super(unbundle20, self).__init__(fp)
754
754
755 @util.propertycache
755 @util.propertycache
756 def params(self):
756 def params(self):
757 """dictionary of stream level parameters"""
757 """dictionary of stream level parameters"""
758 indebug(self.ui, 'reading bundle2 stream parameters')
758 indebug(self.ui, 'reading bundle2 stream parameters')
759 params = {}
759 params = {}
760 paramssize = self._unpack(_fstreamparamsize)[0]
760 paramssize = self._unpack(_fstreamparamsize)[0]
761 if paramssize < 0:
761 if paramssize < 0:
762 raise error.BundleValueError('negative bundle param size: %i'
762 raise error.BundleValueError('negative bundle param size: %i'
763 % paramssize)
763 % paramssize)
764 if paramssize:
764 if paramssize:
765 params = self._readexact(paramssize)
765 params = self._readexact(paramssize)
766 params = self._processallparams(params)
766 params = self._processallparams(params)
767 return params
767 return params
768
768
769 def _processallparams(self, paramsblock):
769 def _processallparams(self, paramsblock):
770 """"""
770 """"""
771 params = util.sortdict()
771 params = util.sortdict()
772 for p in paramsblock.split(' '):
772 for p in paramsblock.split(' '):
773 p = p.split('=', 1)
773 p = p.split('=', 1)
774 p = [urlreq.unquote(i) for i in p]
774 p = [urlreq.unquote(i) for i in p]
775 if len(p) < 2:
775 if len(p) < 2:
776 p.append(None)
776 p.append(None)
777 self._processparam(*p)
777 self._processparam(*p)
778 params[p[0]] = p[1]
778 params[p[0]] = p[1]
779 return params
779 return params
780
780
781
781
782 def _processparam(self, name, value):
782 def _processparam(self, name, value):
783 """process a parameter, applying its effect if needed
783 """process a parameter, applying its effect if needed
784
784
785 Parameter starting with a lower case letter are advisory and will be
785 Parameter starting with a lower case letter are advisory and will be
786 ignored when unknown. Those starting with an upper case letter are
786 ignored when unknown. Those starting with an upper case letter are
787 mandatory and will this function will raise a KeyError when unknown.
787 mandatory and will this function will raise a KeyError when unknown.
788
788
789 Note: no option are currently supported. Any input will be either
789 Note: no option are currently supported. Any input will be either
790 ignored or failing.
790 ignored or failing.
791 """
791 """
792 if not name:
792 if not name:
793 raise ValueError(r'empty parameter name')
793 raise ValueError(r'empty parameter name')
794 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
794 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
795 raise ValueError(r'non letter first character: %s' % name)
795 raise ValueError(r'non letter first character: %s' % name)
796 try:
796 try:
797 handler = b2streamparamsmap[name.lower()]
797 handler = b2streamparamsmap[name.lower()]
798 except KeyError:
798 except KeyError:
799 if name[0:1].islower():
799 if name[0:1].islower():
800 indebug(self.ui, "ignoring unknown parameter %s" % name)
800 indebug(self.ui, "ignoring unknown parameter %s" % name)
801 else:
801 else:
802 raise error.BundleUnknownFeatureError(params=(name,))
802 raise error.BundleUnknownFeatureError(params=(name,))
803 else:
803 else:
804 handler(self, name, value)
804 handler(self, name, value)
805
805
806 def _forwardchunks(self):
806 def _forwardchunks(self):
807 """utility to transfer a bundle2 as binary
807 """utility to transfer a bundle2 as binary
808
808
809 This is made necessary by the fact the 'getbundle' command over 'ssh'
809 This is made necessary by the fact the 'getbundle' command over 'ssh'
810 have no way to know then the reply end, relying on the bundle to be
810 have no way to know then the reply end, relying on the bundle to be
811 interpreted to know its end. This is terrible and we are sorry, but we
811 interpreted to know its end. This is terrible and we are sorry, but we
812 needed to move forward to get general delta enabled.
812 needed to move forward to get general delta enabled.
813 """
813 """
814 yield self._magicstring
814 yield self._magicstring
815 assert 'params' not in vars(self)
815 assert 'params' not in vars(self)
816 paramssize = self._unpack(_fstreamparamsize)[0]
816 paramssize = self._unpack(_fstreamparamsize)[0]
817 if paramssize < 0:
817 if paramssize < 0:
818 raise error.BundleValueError('negative bundle param size: %i'
818 raise error.BundleValueError('negative bundle param size: %i'
819 % paramssize)
819 % paramssize)
820 yield _pack(_fstreamparamsize, paramssize)
820 yield _pack(_fstreamparamsize, paramssize)
821 if paramssize:
821 if paramssize:
822 params = self._readexact(paramssize)
822 params = self._readexact(paramssize)
823 self._processallparams(params)
823 self._processallparams(params)
824 yield params
824 yield params
825 assert self._compengine.bundletype == 'UN'
825 assert self._compengine.bundletype == 'UN'
826 # From there, payload might need to be decompressed
826 # From there, payload might need to be decompressed
827 self._fp = self._compengine.decompressorreader(self._fp)
827 self._fp = self._compengine.decompressorreader(self._fp)
828 emptycount = 0
828 emptycount = 0
829 while emptycount < 2:
829 while emptycount < 2:
830 # so we can brainlessly loop
830 # so we can brainlessly loop
831 assert _fpartheadersize == _fpayloadsize
831 assert _fpartheadersize == _fpayloadsize
832 size = self._unpack(_fpartheadersize)[0]
832 size = self._unpack(_fpartheadersize)[0]
833 yield _pack(_fpartheadersize, size)
833 yield _pack(_fpartheadersize, size)
834 if size:
834 if size:
835 emptycount = 0
835 emptycount = 0
836 else:
836 else:
837 emptycount += 1
837 emptycount += 1
838 continue
838 continue
839 if size == flaginterrupt:
839 if size == flaginterrupt:
840 continue
840 continue
841 elif size < 0:
841 elif size < 0:
842 raise error.BundleValueError('negative chunk size: %i')
842 raise error.BundleValueError('negative chunk size: %i')
843 yield self._readexact(size)
843 yield self._readexact(size)
844
844
845
845
846 def iterparts(self):
846 def iterparts(self):
847 """yield all parts contained in the stream"""
847 """yield all parts contained in the stream"""
848 # make sure param have been loaded
848 # make sure param have been loaded
849 self.params
849 self.params
850 # From there, payload need to be decompressed
850 # From there, payload need to be decompressed
851 self._fp = self._compengine.decompressorreader(self._fp)
851 self._fp = self._compengine.decompressorreader(self._fp)
852 indebug(self.ui, 'start extraction of bundle2 parts')
852 indebug(self.ui, 'start extraction of bundle2 parts')
853 headerblock = self._readpartheader()
853 headerblock = self._readpartheader()
854 while headerblock is not None:
854 while headerblock is not None:
855 part = unbundlepart(self.ui, headerblock, self._fp)
855 part = unbundlepart(self.ui, headerblock, self._fp)
856 yield part
856 yield part
857 # Seek to the end of the part to force it's consumption so the next
857 # Seek to the end of the part to force it's consumption so the next
858 # part can be read. But then seek back to the beginning so the
858 # part can be read. But then seek back to the beginning so the
859 # code consuming this generator has a part that starts at 0.
859 # code consuming this generator has a part that starts at 0.
860 part.seek(0, 2)
860 part.seek(0, 2)
861 part.seek(0)
861 part.seek(0)
862 headerblock = self._readpartheader()
862 headerblock = self._readpartheader()
863 indebug(self.ui, 'end of bundle2 stream')
863 indebug(self.ui, 'end of bundle2 stream')
864
864
865 def _readpartheader(self):
865 def _readpartheader(self):
866 """reads a part header size and return the bytes blob
866 """reads a part header size and return the bytes blob
867
867
868 returns None if empty"""
868 returns None if empty"""
869 headersize = self._unpack(_fpartheadersize)[0]
869 headersize = self._unpack(_fpartheadersize)[0]
870 if headersize < 0:
870 if headersize < 0:
871 raise error.BundleValueError('negative part header size: %i'
871 raise error.BundleValueError('negative part header size: %i'
872 % headersize)
872 % headersize)
873 indebug(self.ui, 'part header size: %i' % headersize)
873 indebug(self.ui, 'part header size: %i' % headersize)
874 if headersize:
874 if headersize:
875 return self._readexact(headersize)
875 return self._readexact(headersize)
876 return None
876 return None
877
877
878 def compressed(self):
878 def compressed(self):
879 self.params # load params
879 self.params # load params
880 return self._compressed
880 return self._compressed
881
881
882 def close(self):
882 def close(self):
883 """close underlying file"""
883 """close underlying file"""
884 if util.safehasattr(self._fp, 'close'):
884 if util.safehasattr(self._fp, 'close'):
885 return self._fp.close()
885 return self._fp.close()
886
886
887 formatmap = {'20': unbundle20}
887 formatmap = {'20': unbundle20}
888
888
889 b2streamparamsmap = {}
889 b2streamparamsmap = {}
890
890
891 def b2streamparamhandler(name):
891 def b2streamparamhandler(name):
892 """register a handler for a stream level parameter"""
892 """register a handler for a stream level parameter"""
893 def decorator(func):
893 def decorator(func):
894 assert name not in formatmap
894 assert name not in formatmap
895 b2streamparamsmap[name] = func
895 b2streamparamsmap[name] = func
896 return func
896 return func
897 return decorator
897 return decorator
898
898
899 @b2streamparamhandler('compression')
899 @b2streamparamhandler('compression')
900 def processcompression(unbundler, param, value):
900 def processcompression(unbundler, param, value):
901 """read compression parameter and install payload decompression"""
901 """read compression parameter and install payload decompression"""
902 if value not in util.compengines.supportedbundletypes:
902 if value not in util.compengines.supportedbundletypes:
903 raise error.BundleUnknownFeatureError(params=(param,),
903 raise error.BundleUnknownFeatureError(params=(param,),
904 values=(value,))
904 values=(value,))
905 unbundler._compengine = util.compengines.forbundletype(value)
905 unbundler._compengine = util.compengines.forbundletype(value)
906 if value is not None:
906 if value is not None:
907 unbundler._compressed = True
907 unbundler._compressed = True
908
908
909 class bundlepart(object):
909 class bundlepart(object):
910 """A bundle2 part contains application level payload
910 """A bundle2 part contains application level payload
911
911
912 The part `type` is used to route the part to the application level
912 The part `type` is used to route the part to the application level
913 handler.
913 handler.
914
914
915 The part payload is contained in ``part.data``. It could be raw bytes or a
915 The part payload is contained in ``part.data``. It could be raw bytes or a
916 generator of byte chunks.
916 generator of byte chunks.
917
917
918 You can add parameters to the part using the ``addparam`` method.
918 You can add parameters to the part using the ``addparam`` method.
919 Parameters can be either mandatory (default) or advisory. Remote side
919 Parameters can be either mandatory (default) or advisory. Remote side
920 should be able to safely ignore the advisory ones.
920 should be able to safely ignore the advisory ones.
921
921
922 Both data and parameters cannot be modified after the generation has begun.
922 Both data and parameters cannot be modified after the generation has begun.
923 """
923 """
924
924
925 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
925 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
926 data='', mandatory=True):
926 data='', mandatory=True):
927 validateparttype(parttype)
927 validateparttype(parttype)
928 self.id = None
928 self.id = None
929 self.type = parttype
929 self.type = parttype
930 self._data = data
930 self._data = data
931 self._mandatoryparams = list(mandatoryparams)
931 self._mandatoryparams = list(mandatoryparams)
932 self._advisoryparams = list(advisoryparams)
932 self._advisoryparams = list(advisoryparams)
933 # checking for duplicated entries
933 # checking for duplicated entries
934 self._seenparams = set()
934 self._seenparams = set()
935 for pname, __ in self._mandatoryparams + self._advisoryparams:
935 for pname, __ in self._mandatoryparams + self._advisoryparams:
936 if pname in self._seenparams:
936 if pname in self._seenparams:
937 raise error.ProgrammingError('duplicated params: %s' % pname)
937 raise error.ProgrammingError('duplicated params: %s' % pname)
938 self._seenparams.add(pname)
938 self._seenparams.add(pname)
939 # status of the part's generation:
939 # status of the part's generation:
940 # - None: not started,
940 # - None: not started,
941 # - False: currently generated,
941 # - False: currently generated,
942 # - True: generation done.
942 # - True: generation done.
943 self._generated = None
943 self._generated = None
944 self.mandatory = mandatory
944 self.mandatory = mandatory
945
945
946 def __repr__(self):
946 def __repr__(self):
947 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
947 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
948 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
948 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
949 % (cls, id(self), self.id, self.type, self.mandatory))
949 % (cls, id(self), self.id, self.type, self.mandatory))
950
950
951 def copy(self):
951 def copy(self):
952 """return a copy of the part
952 """return a copy of the part
953
953
954 The new part have the very same content but no partid assigned yet.
954 The new part have the very same content but no partid assigned yet.
955 Parts with generated data cannot be copied."""
955 Parts with generated data cannot be copied."""
956 assert not util.safehasattr(self.data, 'next')
956 assert not util.safehasattr(self.data, 'next')
957 return self.__class__(self.type, self._mandatoryparams,
957 return self.__class__(self.type, self._mandatoryparams,
958 self._advisoryparams, self._data, self.mandatory)
958 self._advisoryparams, self._data, self.mandatory)
959
959
960 # methods used to defines the part content
960 # methods used to defines the part content
961 @property
961 @property
962 def data(self):
962 def data(self):
963 return self._data
963 return self._data
964
964
965 @data.setter
965 @data.setter
966 def data(self, data):
966 def data(self, data):
967 if self._generated is not None:
967 if self._generated is not None:
968 raise error.ReadOnlyPartError('part is being generated')
968 raise error.ReadOnlyPartError('part is being generated')
969 self._data = data
969 self._data = data
970
970
971 @property
971 @property
972 def mandatoryparams(self):
972 def mandatoryparams(self):
973 # make it an immutable tuple to force people through ``addparam``
973 # make it an immutable tuple to force people through ``addparam``
974 return tuple(self._mandatoryparams)
974 return tuple(self._mandatoryparams)
975
975
976 @property
976 @property
977 def advisoryparams(self):
977 def advisoryparams(self):
978 # make it an immutable tuple to force people through ``addparam``
978 # make it an immutable tuple to force people through ``addparam``
979 return tuple(self._advisoryparams)
979 return tuple(self._advisoryparams)
980
980
981 def addparam(self, name, value='', mandatory=True):
981 def addparam(self, name, value='', mandatory=True):
982 """add a parameter to the part
982 """add a parameter to the part
983
983
984 If 'mandatory' is set to True, the remote handler must claim support
984 If 'mandatory' is set to True, the remote handler must claim support
985 for this parameter or the unbundling will be aborted.
985 for this parameter or the unbundling will be aborted.
986
986
987 The 'name' and 'value' cannot exceed 255 bytes each.
987 The 'name' and 'value' cannot exceed 255 bytes each.
988 """
988 """
989 if self._generated is not None:
989 if self._generated is not None:
990 raise error.ReadOnlyPartError('part is being generated')
990 raise error.ReadOnlyPartError('part is being generated')
991 if name in self._seenparams:
991 if name in self._seenparams:
992 raise ValueError('duplicated params: %s' % name)
992 raise ValueError('duplicated params: %s' % name)
993 self._seenparams.add(name)
993 self._seenparams.add(name)
994 params = self._advisoryparams
994 params = self._advisoryparams
995 if mandatory:
995 if mandatory:
996 params = self._mandatoryparams
996 params = self._mandatoryparams
997 params.append((name, value))
997 params.append((name, value))
998
998
999 # methods used to generates the bundle2 stream
999 # methods used to generates the bundle2 stream
1000 def getchunks(self, ui):
1000 def getchunks(self, ui):
1001 if self._generated is not None:
1001 if self._generated is not None:
1002 raise error.ProgrammingError('part can only be consumed once')
1002 raise error.ProgrammingError('part can only be consumed once')
1003 self._generated = False
1003 self._generated = False
1004
1004
1005 if ui.debugflag:
1005 if ui.debugflag:
1006 msg = ['bundle2-output-part: "%s"' % self.type]
1006 msg = ['bundle2-output-part: "%s"' % self.type]
1007 if not self.mandatory:
1007 if not self.mandatory:
1008 msg.append(' (advisory)')
1008 msg.append(' (advisory)')
1009 nbmp = len(self.mandatoryparams)
1009 nbmp = len(self.mandatoryparams)
1010 nbap = len(self.advisoryparams)
1010 nbap = len(self.advisoryparams)
1011 if nbmp or nbap:
1011 if nbmp or nbap:
1012 msg.append(' (params:')
1012 msg.append(' (params:')
1013 if nbmp:
1013 if nbmp:
1014 msg.append(' %i mandatory' % nbmp)
1014 msg.append(' %i mandatory' % nbmp)
1015 if nbap:
1015 if nbap:
1016 msg.append(' %i advisory' % nbmp)
1016 msg.append(' %i advisory' % nbmp)
1017 msg.append(')')
1017 msg.append(')')
1018 if not self.data:
1018 if not self.data:
1019 msg.append(' empty payload')
1019 msg.append(' empty payload')
1020 elif (util.safehasattr(self.data, 'next')
1020 elif (util.safehasattr(self.data, 'next')
1021 or util.safehasattr(self.data, '__next__')):
1021 or util.safehasattr(self.data, '__next__')):
1022 msg.append(' streamed payload')
1022 msg.append(' streamed payload')
1023 else:
1023 else:
1024 msg.append(' %i bytes payload' % len(self.data))
1024 msg.append(' %i bytes payload' % len(self.data))
1025 msg.append('\n')
1025 msg.append('\n')
1026 ui.debug(''.join(msg))
1026 ui.debug(''.join(msg))
1027
1027
1028 #### header
1028 #### header
1029 if self.mandatory:
1029 if self.mandatory:
1030 parttype = self.type.upper()
1030 parttype = self.type.upper()
1031 else:
1031 else:
1032 parttype = self.type.lower()
1032 parttype = self.type.lower()
1033 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1033 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1034 ## parttype
1034 ## parttype
1035 header = [_pack(_fparttypesize, len(parttype)),
1035 header = [_pack(_fparttypesize, len(parttype)),
1036 parttype, _pack(_fpartid, self.id),
1036 parttype, _pack(_fpartid, self.id),
1037 ]
1037 ]
1038 ## parameters
1038 ## parameters
1039 # count
1039 # count
1040 manpar = self.mandatoryparams
1040 manpar = self.mandatoryparams
1041 advpar = self.advisoryparams
1041 advpar = self.advisoryparams
1042 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1042 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1043 # size
1043 # size
1044 parsizes = []
1044 parsizes = []
1045 for key, value in manpar:
1045 for key, value in manpar:
1046 parsizes.append(len(key))
1046 parsizes.append(len(key))
1047 parsizes.append(len(value))
1047 parsizes.append(len(value))
1048 for key, value in advpar:
1048 for key, value in advpar:
1049 parsizes.append(len(key))
1049 parsizes.append(len(key))
1050 parsizes.append(len(value))
1050 parsizes.append(len(value))
1051 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1051 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1052 header.append(paramsizes)
1052 header.append(paramsizes)
1053 # key, value
1053 # key, value
1054 for key, value in manpar:
1054 for key, value in manpar:
1055 header.append(key)
1055 header.append(key)
1056 header.append(value)
1056 header.append(value)
1057 for key, value in advpar:
1057 for key, value in advpar:
1058 header.append(key)
1058 header.append(key)
1059 header.append(value)
1059 header.append(value)
1060 ## finalize header
1060 ## finalize header
1061 try:
1061 try:
1062 headerchunk = ''.join(header)
1062 headerchunk = ''.join(header)
1063 except TypeError:
1063 except TypeError:
1064 raise TypeError(r'Found a non-bytes trying to '
1064 raise TypeError(r'Found a non-bytes trying to '
1065 r'build bundle part header: %r' % header)
1065 r'build bundle part header: %r' % header)
1066 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1066 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1067 yield _pack(_fpartheadersize, len(headerchunk))
1067 yield _pack(_fpartheadersize, len(headerchunk))
1068 yield headerchunk
1068 yield headerchunk
1069 ## payload
1069 ## payload
1070 try:
1070 try:
1071 for chunk in self._payloadchunks():
1071 for chunk in self._payloadchunks():
1072 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1072 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1073 yield _pack(_fpayloadsize, len(chunk))
1073 yield _pack(_fpayloadsize, len(chunk))
1074 yield chunk
1074 yield chunk
1075 except GeneratorExit:
1075 except GeneratorExit:
1076 # GeneratorExit means that nobody is listening for our
1076 # GeneratorExit means that nobody is listening for our
1077 # results anyway, so just bail quickly rather than trying
1077 # results anyway, so just bail quickly rather than trying
1078 # to produce an error part.
1078 # to produce an error part.
1079 ui.debug('bundle2-generatorexit\n')
1079 ui.debug('bundle2-generatorexit\n')
1080 raise
1080 raise
1081 except BaseException as exc:
1081 except BaseException as exc:
1082 bexc = util.forcebytestr(exc)
1082 bexc = util.forcebytestr(exc)
1083 # backup exception data for later
1083 # backup exception data for later
1084 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1084 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1085 % bexc)
1085 % bexc)
1086 tb = sys.exc_info()[2]
1086 tb = sys.exc_info()[2]
1087 msg = 'unexpected error: %s' % bexc
1087 msg = 'unexpected error: %s' % bexc
1088 interpart = bundlepart('error:abort', [('message', msg)],
1088 interpart = bundlepart('error:abort', [('message', msg)],
1089 mandatory=False)
1089 mandatory=False)
1090 interpart.id = 0
1090 interpart.id = 0
1091 yield _pack(_fpayloadsize, -1)
1091 yield _pack(_fpayloadsize, -1)
1092 for chunk in interpart.getchunks(ui=ui):
1092 for chunk in interpart.getchunks(ui=ui):
1093 yield chunk
1093 yield chunk
1094 outdebug(ui, 'closing payload chunk')
1094 outdebug(ui, 'closing payload chunk')
1095 # abort current part payload
1095 # abort current part payload
1096 yield _pack(_fpayloadsize, 0)
1096 yield _pack(_fpayloadsize, 0)
1097 pycompat.raisewithtb(exc, tb)
1097 pycompat.raisewithtb(exc, tb)
1098 # end of payload
1098 # end of payload
1099 outdebug(ui, 'closing payload chunk')
1099 outdebug(ui, 'closing payload chunk')
1100 yield _pack(_fpayloadsize, 0)
1100 yield _pack(_fpayloadsize, 0)
1101 self._generated = True
1101 self._generated = True
1102
1102
1103 def _payloadchunks(self):
1103 def _payloadchunks(self):
1104 """yield chunks of a the part payload
1104 """yield chunks of a the part payload
1105
1105
1106 Exists to handle the different methods to provide data to a part."""
1106 Exists to handle the different methods to provide data to a part."""
1107 # we only support fixed size data now.
1107 # we only support fixed size data now.
1108 # This will be improved in the future.
1108 # This will be improved in the future.
1109 if (util.safehasattr(self.data, 'next')
1109 if (util.safehasattr(self.data, 'next')
1110 or util.safehasattr(self.data, '__next__')):
1110 or util.safehasattr(self.data, '__next__')):
1111 buff = util.chunkbuffer(self.data)
1111 buff = util.chunkbuffer(self.data)
1112 chunk = buff.read(preferedchunksize)
1112 chunk = buff.read(preferedchunksize)
1113 while chunk:
1113 while chunk:
1114 yield chunk
1114 yield chunk
1115 chunk = buff.read(preferedchunksize)
1115 chunk = buff.read(preferedchunksize)
1116 elif len(self.data):
1116 elif len(self.data):
1117 yield self.data
1117 yield self.data
1118
1118
1119
1119
1120 flaginterrupt = -1
1120 flaginterrupt = -1
1121
1121
1122 class interrupthandler(unpackermixin):
1122 class interrupthandler(unpackermixin):
1123 """read one part and process it with restricted capability
1123 """read one part and process it with restricted capability
1124
1124
1125 This allows to transmit exception raised on the producer size during part
1125 This allows to transmit exception raised on the producer size during part
1126 iteration while the consumer is reading a part.
1126 iteration while the consumer is reading a part.
1127
1127
1128 Part processed in this manner only have access to a ui object,"""
1128 Part processed in this manner only have access to a ui object,"""
1129
1129
1130 def __init__(self, ui, fp):
1130 def __init__(self, ui, fp):
1131 super(interrupthandler, self).__init__(fp)
1131 super(interrupthandler, self).__init__(fp)
1132 self.ui = ui
1132 self.ui = ui
1133
1133
1134 def _readpartheader(self):
1134 def _readpartheader(self):
1135 """reads a part header size and return the bytes blob
1135 """reads a part header size and return the bytes blob
1136
1136
1137 returns None if empty"""
1137 returns None if empty"""
1138 headersize = self._unpack(_fpartheadersize)[0]
1138 headersize = self._unpack(_fpartheadersize)[0]
1139 if headersize < 0:
1139 if headersize < 0:
1140 raise error.BundleValueError('negative part header size: %i'
1140 raise error.BundleValueError('negative part header size: %i'
1141 % headersize)
1141 % headersize)
1142 indebug(self.ui, 'part header size: %i\n' % headersize)
1142 indebug(self.ui, 'part header size: %i\n' % headersize)
1143 if headersize:
1143 if headersize:
1144 return self._readexact(headersize)
1144 return self._readexact(headersize)
1145 return None
1145 return None
1146
1146
1147 def __call__(self):
1147 def __call__(self):
1148
1148
1149 self.ui.debug('bundle2-input-stream-interrupt:'
1149 self.ui.debug('bundle2-input-stream-interrupt:'
1150 ' opening out of band context\n')
1150 ' opening out of band context\n')
1151 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1151 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1152 headerblock = self._readpartheader()
1152 headerblock = self._readpartheader()
1153 if headerblock is None:
1153 if headerblock is None:
1154 indebug(self.ui, 'no part found during interruption.')
1154 indebug(self.ui, 'no part found during interruption.')
1155 return
1155 return
1156 part = unbundlepart(self.ui, headerblock, self._fp)
1156 part = unbundlepart(self.ui, headerblock, self._fp)
1157 op = interruptoperation(self.ui)
1157 op = interruptoperation(self.ui)
1158 hardabort = False
1158 hardabort = False
1159 try:
1159 try:
1160 _processpart(op, part)
1160 _processpart(op, part)
1161 except (SystemExit, KeyboardInterrupt):
1161 except (SystemExit, KeyboardInterrupt):
1162 hardabort = True
1162 hardabort = True
1163 raise
1163 raise
1164 finally:
1164 finally:
1165 if not hardabort:
1165 if not hardabort:
1166 part.seek(0, 2)
1166 part.seek(0, 2)
1167 self.ui.debug('bundle2-input-stream-interrupt:'
1167 self.ui.debug('bundle2-input-stream-interrupt:'
1168 ' closing out of band context\n')
1168 ' closing out of band context\n')
1169
1169
1170 class interruptoperation(object):
1170 class interruptoperation(object):
1171 """A limited operation to be use by part handler during interruption
1171 """A limited operation to be use by part handler during interruption
1172
1172
1173 It only have access to an ui object.
1173 It only have access to an ui object.
1174 """
1174 """
1175
1175
1176 def __init__(self, ui):
1176 def __init__(self, ui):
1177 self.ui = ui
1177 self.ui = ui
1178 self.reply = None
1178 self.reply = None
1179 self.captureoutput = False
1179 self.captureoutput = False
1180
1180
1181 @property
1181 @property
1182 def repo(self):
1182 def repo(self):
1183 raise error.ProgrammingError('no repo access from stream interruption')
1183 raise error.ProgrammingError('no repo access from stream interruption')
1184
1184
1185 def gettransaction(self):
1185 def gettransaction(self):
1186 raise TransactionUnavailable('no repo access from stream interruption')
1186 raise TransactionUnavailable('no repo access from stream interruption')
1187
1187
1188 class unbundlepart(unpackermixin):
1188 class unbundlepart(unpackermixin):
1189 """a bundle part read from a bundle"""
1189 """a bundle part read from a bundle"""
1190
1190
1191 def __init__(self, ui, header, fp):
1191 def __init__(self, ui, header, fp):
1192 super(unbundlepart, self).__init__(fp)
1192 super(unbundlepart, self).__init__(fp)
1193 self._seekable = (util.safehasattr(fp, 'seek') and
1193 self._seekable = (util.safehasattr(fp, 'seek') and
1194 util.safehasattr(fp, 'tell'))
1194 util.safehasattr(fp, 'tell'))
1195 self.ui = ui
1195 self.ui = ui
1196 # unbundle state attr
1196 # unbundle state attr
1197 self._headerdata = header
1197 self._headerdata = header
1198 self._headeroffset = 0
1198 self._headeroffset = 0
1199 self._initialized = False
1199 self._initialized = False
1200 self.consumed = False
1200 self.consumed = False
1201 # part data
1201 # part data
1202 self.id = None
1202 self.id = None
1203 self.type = None
1203 self.type = None
1204 self.mandatoryparams = None
1204 self.mandatoryparams = None
1205 self.advisoryparams = None
1205 self.advisoryparams = None
1206 self.params = None
1206 self.params = None
1207 self.mandatorykeys = ()
1207 self.mandatorykeys = ()
1208 self._payloadstream = None
1208 self._payloadstream = None
1209 self._readheader()
1209 self._readheader()
1210 self._mandatory = None
1210 self._mandatory = None
1211 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1211 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1212 self._pos = 0
1212 self._pos = 0
1213
1213
1214 def _fromheader(self, size):
1214 def _fromheader(self, size):
1215 """return the next <size> byte from the header"""
1215 """return the next <size> byte from the header"""
1216 offset = self._headeroffset
1216 offset = self._headeroffset
1217 data = self._headerdata[offset:(offset + size)]
1217 data = self._headerdata[offset:(offset + size)]
1218 self._headeroffset = offset + size
1218 self._headeroffset = offset + size
1219 return data
1219 return data
1220
1220
1221 def _unpackheader(self, format):
1221 def _unpackheader(self, format):
1222 """read given format from header
1222 """read given format from header
1223
1223
1224 This automatically compute the size of the format to read."""
1224 This automatically compute the size of the format to read."""
1225 data = self._fromheader(struct.calcsize(format))
1225 data = self._fromheader(struct.calcsize(format))
1226 return _unpack(format, data)
1226 return _unpack(format, data)
1227
1227
1228 def _initparams(self, mandatoryparams, advisoryparams):
1228 def _initparams(self, mandatoryparams, advisoryparams):
1229 """internal function to setup all logic related parameters"""
1229 """internal function to setup all logic related parameters"""
1230 # make it read only to prevent people touching it by mistake.
1230 # make it read only to prevent people touching it by mistake.
1231 self.mandatoryparams = tuple(mandatoryparams)
1231 self.mandatoryparams = tuple(mandatoryparams)
1232 self.advisoryparams = tuple(advisoryparams)
1232 self.advisoryparams = tuple(advisoryparams)
1233 # user friendly UI
1233 # user friendly UI
1234 self.params = util.sortdict(self.mandatoryparams)
1234 self.params = util.sortdict(self.mandatoryparams)
1235 self.params.update(self.advisoryparams)
1235 self.params.update(self.advisoryparams)
1236 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1236 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1237
1237
1238 def _payloadchunks(self, chunknum=0):
1238 def _payloadchunks(self, chunknum=0):
1239 '''seek to specified chunk and start yielding data'''
1239 '''seek to specified chunk and start yielding data'''
1240 if len(self._chunkindex) == 0:
1240 if len(self._chunkindex) == 0:
1241 assert chunknum == 0, 'Must start with chunk 0'
1241 assert chunknum == 0, 'Must start with chunk 0'
1242 self._chunkindex.append((0, self._tellfp()))
1242 self._chunkindex.append((0, self._tellfp()))
1243 else:
1243 else:
1244 assert chunknum < len(self._chunkindex), \
1244 assert chunknum < len(self._chunkindex), \
1245 'Unknown chunk %d' % chunknum
1245 'Unknown chunk %d' % chunknum
1246 self._seekfp(self._chunkindex[chunknum][1])
1246 self._seekfp(self._chunkindex[chunknum][1])
1247
1247
1248 pos = self._chunkindex[chunknum][0]
1248 pos = self._chunkindex[chunknum][0]
1249 payloadsize = self._unpack(_fpayloadsize)[0]
1249 payloadsize = self._unpack(_fpayloadsize)[0]
1250 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1250 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1251 while payloadsize:
1251 while payloadsize:
1252 if payloadsize == flaginterrupt:
1252 if payloadsize == flaginterrupt:
1253 # interruption detection, the handler will now read a
1253 # interruption detection, the handler will now read a
1254 # single part and process it.
1254 # single part and process it.
1255 interrupthandler(self.ui, self._fp)()
1255 interrupthandler(self.ui, self._fp)()
1256 elif payloadsize < 0:
1256 elif payloadsize < 0:
1257 msg = 'negative payload chunk size: %i' % payloadsize
1257 msg = 'negative payload chunk size: %i' % payloadsize
1258 raise error.BundleValueError(msg)
1258 raise error.BundleValueError(msg)
1259 else:
1259 else:
1260 result = self._readexact(payloadsize)
1260 result = self._readexact(payloadsize)
1261 chunknum += 1
1261 chunknum += 1
1262 pos += payloadsize
1262 pos += payloadsize
1263 if chunknum == len(self._chunkindex):
1263 if chunknum == len(self._chunkindex):
1264 self._chunkindex.append((pos, self._tellfp()))
1264 self._chunkindex.append((pos, self._tellfp()))
1265 yield result
1265 yield result
1266 payloadsize = self._unpack(_fpayloadsize)[0]
1266 payloadsize = self._unpack(_fpayloadsize)[0]
1267 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1267 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1268
1268
1269 def _findchunk(self, pos):
1269 def _findchunk(self, pos):
1270 '''for a given payload position, return a chunk number and offset'''
1270 '''for a given payload position, return a chunk number and offset'''
1271 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1271 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1272 if ppos == pos:
1272 if ppos == pos:
1273 return chunk, 0
1273 return chunk, 0
1274 elif ppos > pos:
1274 elif ppos > pos:
1275 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1275 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1276 raise ValueError('Unknown chunk')
1276 raise ValueError('Unknown chunk')
1277
1277
1278 def _readheader(self):
1278 def _readheader(self):
1279 """read the header and setup the object"""
1279 """read the header and setup the object"""
1280 typesize = self._unpackheader(_fparttypesize)[0]
1280 typesize = self._unpackheader(_fparttypesize)[0]
1281 self.type = self._fromheader(typesize)
1281 self.type = self._fromheader(typesize)
1282 indebug(self.ui, 'part type: "%s"' % self.type)
1282 indebug(self.ui, 'part type: "%s"' % self.type)
1283 self.id = self._unpackheader(_fpartid)[0]
1283 self.id = self._unpackheader(_fpartid)[0]
1284 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1284 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1285 # extract mandatory bit from type
1285 # extract mandatory bit from type
1286 self.mandatory = (self.type != self.type.lower())
1286 self.mandatory = (self.type != self.type.lower())
1287 self.type = self.type.lower()
1287 self.type = self.type.lower()
1288 ## reading parameters
1288 ## reading parameters
1289 # param count
1289 # param count
1290 mancount, advcount = self._unpackheader(_fpartparamcount)
1290 mancount, advcount = self._unpackheader(_fpartparamcount)
1291 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1291 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1292 # param size
1292 # param size
1293 fparamsizes = _makefpartparamsizes(mancount + advcount)
1293 fparamsizes = _makefpartparamsizes(mancount + advcount)
1294 paramsizes = self._unpackheader(fparamsizes)
1294 paramsizes = self._unpackheader(fparamsizes)
1295 # make it a list of couple again
1295 # make it a list of couple again
1296 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1296 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1297 # split mandatory from advisory
1297 # split mandatory from advisory
1298 mansizes = paramsizes[:mancount]
1298 mansizes = paramsizes[:mancount]
1299 advsizes = paramsizes[mancount:]
1299 advsizes = paramsizes[mancount:]
1300 # retrieve param value
1300 # retrieve param value
1301 manparams = []
1301 manparams = []
1302 for key, value in mansizes:
1302 for key, value in mansizes:
1303 manparams.append((self._fromheader(key), self._fromheader(value)))
1303 manparams.append((self._fromheader(key), self._fromheader(value)))
1304 advparams = []
1304 advparams = []
1305 for key, value in advsizes:
1305 for key, value in advsizes:
1306 advparams.append((self._fromheader(key), self._fromheader(value)))
1306 advparams.append((self._fromheader(key), self._fromheader(value)))
1307 self._initparams(manparams, advparams)
1307 self._initparams(manparams, advparams)
1308 ## part payload
1308 ## part payload
1309 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1309 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1310 # we read the data, tell it
1310 # we read the data, tell it
1311 self._initialized = True
1311 self._initialized = True
1312
1312
1313 def read(self, size=None):
1313 def read(self, size=None):
1314 """read payload data"""
1314 """read payload data"""
1315 if not self._initialized:
1315 if not self._initialized:
1316 self._readheader()
1316 self._readheader()
1317 if size is None:
1317 if size is None:
1318 data = self._payloadstream.read()
1318 data = self._payloadstream.read()
1319 else:
1319 else:
1320 data = self._payloadstream.read(size)
1320 data = self._payloadstream.read(size)
1321 self._pos += len(data)
1321 self._pos += len(data)
1322 if size is None or len(data) < size:
1322 if size is None or len(data) < size:
1323 if not self.consumed and self._pos:
1323 if not self.consumed and self._pos:
1324 self.ui.debug('bundle2-input-part: total payload size %i\n'
1324 self.ui.debug('bundle2-input-part: total payload size %i\n'
1325 % self._pos)
1325 % self._pos)
1326 self.consumed = True
1326 self.consumed = True
1327 return data
1327 return data
1328
1328
1329 def tell(self):
1329 def tell(self):
1330 return self._pos
1330 return self._pos
1331
1331
1332 def seek(self, offset, whence=0):
1332 def seek(self, offset, whence=0):
1333 if whence == 0:
1333 if whence == 0:
1334 newpos = offset
1334 newpos = offset
1335 elif whence == 1:
1335 elif whence == 1:
1336 newpos = self._pos + offset
1336 newpos = self._pos + offset
1337 elif whence == 2:
1337 elif whence == 2:
1338 if not self.consumed:
1338 if not self.consumed:
1339 self.read()
1339 self.read()
1340 newpos = self._chunkindex[-1][0] - offset
1340 newpos = self._chunkindex[-1][0] - offset
1341 else:
1341 else:
1342 raise ValueError('Unknown whence value: %r' % (whence,))
1342 raise ValueError('Unknown whence value: %r' % (whence,))
1343
1343
1344 if newpos > self._chunkindex[-1][0] and not self.consumed:
1344 if newpos > self._chunkindex[-1][0] and not self.consumed:
1345 self.read()
1345 self.read()
1346 if not 0 <= newpos <= self._chunkindex[-1][0]:
1346 if not 0 <= newpos <= self._chunkindex[-1][0]:
1347 raise ValueError('Offset out of range')
1347 raise ValueError('Offset out of range')
1348
1348
1349 if self._pos != newpos:
1349 if self._pos != newpos:
1350 chunk, internaloffset = self._findchunk(newpos)
1350 chunk, internaloffset = self._findchunk(newpos)
1351 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1351 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1352 adjust = self.read(internaloffset)
1352 adjust = self.read(internaloffset)
1353 if len(adjust) != internaloffset:
1353 if len(adjust) != internaloffset:
1354 raise error.Abort(_('Seek failed\n'))
1354 raise error.Abort(_('Seek failed\n'))
1355 self._pos = newpos
1355 self._pos = newpos
1356
1356
1357 def _seekfp(self, offset, whence=0):
1357 def _seekfp(self, offset, whence=0):
1358 """move the underlying file pointer
1358 """move the underlying file pointer
1359
1359
1360 This method is meant for internal usage by the bundle2 protocol only.
1360 This method is meant for internal usage by the bundle2 protocol only.
1361 They directly manipulate the low level stream including bundle2 level
1361 They directly manipulate the low level stream including bundle2 level
1362 instruction.
1362 instruction.
1363
1363
1364 Do not use it to implement higher-level logic or methods."""
1364 Do not use it to implement higher-level logic or methods."""
1365 if self._seekable:
1365 if self._seekable:
1366 return self._fp.seek(offset, whence)
1366 return self._fp.seek(offset, whence)
1367 else:
1367 else:
1368 raise NotImplementedError(_('File pointer is not seekable'))
1368 raise NotImplementedError(_('File pointer is not seekable'))
1369
1369
1370 def _tellfp(self):
1370 def _tellfp(self):
1371 """return the file offset, or None if file is not seekable
1371 """return the file offset, or None if file is not seekable
1372
1372
1373 This method is meant for internal usage by the bundle2 protocol only.
1373 This method is meant for internal usage by the bundle2 protocol only.
1374 They directly manipulate the low level stream including bundle2 level
1374 They directly manipulate the low level stream including bundle2 level
1375 instruction.
1375 instruction.
1376
1376
1377 Do not use it to implement higher-level logic or methods."""
1377 Do not use it to implement higher-level logic or methods."""
1378 if self._seekable:
1378 if self._seekable:
1379 try:
1379 try:
1380 return self._fp.tell()
1380 return self._fp.tell()
1381 except IOError as e:
1381 except IOError as e:
1382 if e.errno == errno.ESPIPE:
1382 if e.errno == errno.ESPIPE:
1383 self._seekable = False
1383 self._seekable = False
1384 else:
1384 else:
1385 raise
1385 raise
1386 return None
1386 return None
1387
1387
1388 # These are only the static capabilities.
1388 # These are only the static capabilities.
1389 # Check the 'getrepocaps' function for the rest.
1389 # Check the 'getrepocaps' function for the rest.
1390 capabilities = {'HG20': (),
1390 capabilities = {'HG20': (),
1391 'error': ('abort', 'unsupportedcontent', 'pushraced',
1391 'error': ('abort', 'unsupportedcontent', 'pushraced',
1392 'pushkey'),
1392 'pushkey'),
1393 'listkeys': (),
1393 'listkeys': (),
1394 'pushkey': (),
1394 'pushkey': (),
1395 'digests': tuple(sorted(util.DIGESTS.keys())),
1395 'digests': tuple(sorted(util.DIGESTS.keys())),
1396 'remote-changegroup': ('http', 'https'),
1396 'remote-changegroup': ('http', 'https'),
1397 'hgtagsfnodes': (),
1397 'hgtagsfnodes': (),
1398 'phases': ('heads',),
1398 'phases': ('heads',),
1399 }
1399 }
1400
1400
1401 def getrepocaps(repo, allowpushback=False):
1401 def getrepocaps(repo, allowpushback=False):
1402 """return the bundle2 capabilities for a given repo
1402 """return the bundle2 capabilities for a given repo
1403
1403
1404 Exists to allow extensions (like evolution) to mutate the capabilities.
1404 Exists to allow extensions (like evolution) to mutate the capabilities.
1405 """
1405 """
1406 caps = capabilities.copy()
1406 caps = capabilities.copy()
1407 caps['changegroup'] = tuple(sorted(
1407 caps['changegroup'] = tuple(sorted(
1408 changegroup.supportedincomingversions(repo)))
1408 changegroup.supportedincomingversions(repo)))
1409 if obsolete.isenabled(repo, obsolete.exchangeopt):
1409 if obsolete.isenabled(repo, obsolete.exchangeopt):
1410 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1410 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1411 caps['obsmarkers'] = supportedformat
1411 caps['obsmarkers'] = supportedformat
1412 if allowpushback:
1412 if allowpushback:
1413 caps['pushback'] = ()
1413 caps['pushback'] = ()
1414 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1414 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1415 if cpmode == 'check-related':
1415 if cpmode == 'check-related':
1416 caps['checkheads'] = ('related',)
1416 caps['checkheads'] = ('related',)
1417 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1417 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1418 caps.pop('phases')
1418 caps.pop('phases')
1419 return caps
1419 return caps
1420
1420
1421 def bundle2caps(remote):
1421 def bundle2caps(remote):
1422 """return the bundle capabilities of a peer as dict"""
1422 """return the bundle capabilities of a peer as dict"""
1423 raw = remote.capable('bundle2')
1423 raw = remote.capable('bundle2')
1424 if not raw and raw != '':
1424 if not raw and raw != '':
1425 return {}
1425 return {}
1426 capsblob = urlreq.unquote(remote.capable('bundle2'))
1426 capsblob = urlreq.unquote(remote.capable('bundle2'))
1427 return decodecaps(capsblob)
1427 return decodecaps(capsblob)
1428
1428
1429 def obsmarkersversion(caps):
1429 def obsmarkersversion(caps):
1430 """extract the list of supported obsmarkers versions from a bundle2caps dict
1430 """extract the list of supported obsmarkers versions from a bundle2caps dict
1431 """
1431 """
1432 obscaps = caps.get('obsmarkers', ())
1432 obscaps = caps.get('obsmarkers', ())
1433 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1433 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1434
1434
1435 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1435 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1436 vfs=None, compression=None, compopts=None):
1436 vfs=None, compression=None, compopts=None):
1437 if bundletype.startswith('HG10'):
1437 if bundletype.startswith('HG10'):
1438 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1438 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1439 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1439 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1440 compression=compression, compopts=compopts)
1440 compression=compression, compopts=compopts)
1441 elif not bundletype.startswith('HG20'):
1441 elif not bundletype.startswith('HG20'):
1442 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1442 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1443
1443
1444 caps = {}
1444 caps = {}
1445 if 'obsolescence' in opts:
1445 if 'obsolescence' in opts:
1446 caps['obsmarkers'] = ('V1',)
1446 caps['obsmarkers'] = ('V1',)
1447 bundle = bundle20(ui, caps)
1447 bundle = bundle20(ui, caps)
1448 bundle.setcompression(compression, compopts)
1448 bundle.setcompression(compression, compopts)
1449 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1449 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1450 chunkiter = bundle.getchunks()
1450 chunkiter = bundle.getchunks()
1451
1451
1452 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1452 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1453
1453
1454 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1454 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1455 # We should eventually reconcile this logic with the one behind
1455 # We should eventually reconcile this logic with the one behind
1456 # 'exchange.getbundle2partsgenerator'.
1456 # 'exchange.getbundle2partsgenerator'.
1457 #
1457 #
1458 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1458 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1459 # different right now. So we keep them separated for now for the sake of
1459 # different right now. So we keep them separated for now for the sake of
1460 # simplicity.
1460 # simplicity.
1461
1461
1462 # we always want a changegroup in such bundle
1462 # we always want a changegroup in such bundle
1463 cgversion = opts.get('cg.version')
1463 cgversion = opts.get('cg.version')
1464 if cgversion is None:
1464 if cgversion is None:
1465 cgversion = changegroup.safeversion(repo)
1465 cgversion = changegroup.safeversion(repo)
1466 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1466 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1467 part = bundler.newpart('changegroup', data=cg.getchunks())
1467 part = bundler.newpart('changegroup', data=cg.getchunks())
1468 part.addparam('version', cg.version)
1468 part.addparam('version', cg.version)
1469 if 'clcount' in cg.extras:
1469 if 'clcount' in cg.extras:
1470 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1470 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1471 mandatory=False)
1471 mandatory=False)
1472 if opts.get('phases') and repo.revs('%ln and secret()',
1472 if opts.get('phases') and repo.revs('%ln and secret()',
1473 outgoing.missingheads):
1473 outgoing.missingheads):
1474 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1474 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1475
1475
1476 addparttagsfnodescache(repo, bundler, outgoing)
1476 addparttagsfnodescache(repo, bundler, outgoing)
1477
1477
1478 if opts.get('obsolescence', False):
1478 if opts.get('obsolescence', False):
1479 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1479 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1480 buildobsmarkerspart(bundler, obsmarkers)
1480 buildobsmarkerspart(bundler, obsmarkers)
1481
1481
1482 if opts.get('phases', False):
1482 if opts.get('phases', False):
1483 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1483 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1484 phasedata = phases.binaryencode(headsbyphase)
1484 phasedata = phases.binaryencode(headsbyphase)
1485 bundler.newpart('phase-heads', data=phasedata)
1485 bundler.newpart('phase-heads', data=phasedata)
1486
1486
1487 def addparttagsfnodescache(repo, bundler, outgoing):
1487 def addparttagsfnodescache(repo, bundler, outgoing):
1488 # we include the tags fnode cache for the bundle changeset
1488 # we include the tags fnode cache for the bundle changeset
1489 # (as an optional parts)
1489 # (as an optional parts)
1490 cache = tags.hgtagsfnodescache(repo.unfiltered())
1490 cache = tags.hgtagsfnodescache(repo.unfiltered())
1491 chunks = []
1491 chunks = []
1492
1492
1493 # .hgtags fnodes are only relevant for head changesets. While we could
1493 # .hgtags fnodes are only relevant for head changesets. While we could
1494 # transfer values for all known nodes, there will likely be little to
1494 # transfer values for all known nodes, there will likely be little to
1495 # no benefit.
1495 # no benefit.
1496 #
1496 #
1497 # We don't bother using a generator to produce output data because
1497 # We don't bother using a generator to produce output data because
1498 # a) we only have 40 bytes per head and even esoteric numbers of heads
1498 # a) we only have 40 bytes per head and even esoteric numbers of heads
1499 # consume little memory (1M heads is 40MB) b) we don't want to send the
1499 # consume little memory (1M heads is 40MB) b) we don't want to send the
1500 # part if we don't have entries and knowing if we have entries requires
1500 # part if we don't have entries and knowing if we have entries requires
1501 # cache lookups.
1501 # cache lookups.
1502 for node in outgoing.missingheads:
1502 for node in outgoing.missingheads:
1503 # Don't compute missing, as this may slow down serving.
1503 # Don't compute missing, as this may slow down serving.
1504 fnode = cache.getfnode(node, computemissing=False)
1504 fnode = cache.getfnode(node, computemissing=False)
1505 if fnode is not None:
1505 if fnode is not None:
1506 chunks.extend([node, fnode])
1506 chunks.extend([node, fnode])
1507
1507
1508 if chunks:
1508 if chunks:
1509 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1509 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1510
1510
1511 def buildobsmarkerspart(bundler, markers):
1511 def buildobsmarkerspart(bundler, markers):
1512 """add an obsmarker part to the bundler with <markers>
1512 """add an obsmarker part to the bundler with <markers>
1513
1513
1514 No part is created if markers is empty.
1514 No part is created if markers is empty.
1515 Raises ValueError if the bundler doesn't support any known obsmarker format.
1515 Raises ValueError if the bundler doesn't support any known obsmarker format.
1516 """
1516 """
1517 if not markers:
1517 if not markers:
1518 return None
1518 return None
1519
1519
1520 remoteversions = obsmarkersversion(bundler.capabilities)
1520 remoteversions = obsmarkersversion(bundler.capabilities)
1521 version = obsolete.commonversion(remoteversions)
1521 version = obsolete.commonversion(remoteversions)
1522 if version is None:
1522 if version is None:
1523 raise ValueError('bundler does not support common obsmarker format')
1523 raise ValueError('bundler does not support common obsmarker format')
1524 stream = obsolete.encodemarkers(markers, True, version=version)
1524 stream = obsolete.encodemarkers(markers, True, version=version)
1525 return bundler.newpart('obsmarkers', data=stream)
1525 return bundler.newpart('obsmarkers', data=stream)
1526
1526
1527 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1527 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1528 compopts=None):
1528 compopts=None):
1529 """Write a bundle file and return its filename.
1529 """Write a bundle file and return its filename.
1530
1530
1531 Existing files will not be overwritten.
1531 Existing files will not be overwritten.
1532 If no filename is specified, a temporary file is created.
1532 If no filename is specified, a temporary file is created.
1533 bz2 compression can be turned off.
1533 bz2 compression can be turned off.
1534 The bundle file will be deleted in case of errors.
1534 The bundle file will be deleted in case of errors.
1535 """
1535 """
1536
1536
1537 if bundletype == "HG20":
1537 if bundletype == "HG20":
1538 bundle = bundle20(ui)
1538 bundle = bundle20(ui)
1539 bundle.setcompression(compression, compopts)
1539 bundle.setcompression(compression, compopts)
1540 part = bundle.newpart('changegroup', data=cg.getchunks())
1540 part = bundle.newpart('changegroup', data=cg.getchunks())
1541 part.addparam('version', cg.version)
1541 part.addparam('version', cg.version)
1542 if 'clcount' in cg.extras:
1542 if 'clcount' in cg.extras:
1543 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1543 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1544 mandatory=False)
1544 mandatory=False)
1545 chunkiter = bundle.getchunks()
1545 chunkiter = bundle.getchunks()
1546 else:
1546 else:
1547 # compression argument is only for the bundle2 case
1547 # compression argument is only for the bundle2 case
1548 assert compression is None
1548 assert compression is None
1549 if cg.version != '01':
1549 if cg.version != '01':
1550 raise error.Abort(_('old bundle types only supports v1 '
1550 raise error.Abort(_('old bundle types only supports v1 '
1551 'changegroups'))
1551 'changegroups'))
1552 header, comp = bundletypes[bundletype]
1552 header, comp = bundletypes[bundletype]
1553 if comp not in util.compengines.supportedbundletypes:
1553 if comp not in util.compengines.supportedbundletypes:
1554 raise error.Abort(_('unknown stream compression type: %s')
1554 raise error.Abort(_('unknown stream compression type: %s')
1555 % comp)
1555 % comp)
1556 compengine = util.compengines.forbundletype(comp)
1556 compengine = util.compengines.forbundletype(comp)
1557 def chunkiter():
1557 def chunkiter():
1558 yield header
1558 yield header
1559 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1559 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1560 yield chunk
1560 yield chunk
1561 chunkiter = chunkiter()
1561 chunkiter = chunkiter()
1562
1562
1563 # parse the changegroup data, otherwise we will block
1563 # parse the changegroup data, otherwise we will block
1564 # in case of sshrepo because we don't know the end of the stream
1564 # in case of sshrepo because we don't know the end of the stream
1565 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1565 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1566
1566
1567 def combinechangegroupresults(op):
1567 def combinechangegroupresults(op):
1568 """logic to combine 0 or more addchangegroup results into one"""
1568 """logic to combine 0 or more addchangegroup results into one"""
1569 results = [r.get('return', 0)
1569 results = [r.get('return', 0)
1570 for r in op.records['changegroup']]
1570 for r in op.records['changegroup']]
1571 changedheads = 0
1571 changedheads = 0
1572 result = 1
1572 result = 1
1573 for ret in results:
1573 for ret in results:
1574 # If any changegroup result is 0, return 0
1574 # If any changegroup result is 0, return 0
1575 if ret == 0:
1575 if ret == 0:
1576 result = 0
1576 result = 0
1577 break
1577 break
1578 if ret < -1:
1578 if ret < -1:
1579 changedheads += ret + 1
1579 changedheads += ret + 1
1580 elif ret > 1:
1580 elif ret > 1:
1581 changedheads += ret - 1
1581 changedheads += ret - 1
1582 if changedheads > 0:
1582 if changedheads > 0:
1583 result = 1 + changedheads
1583 result = 1 + changedheads
1584 elif changedheads < 0:
1584 elif changedheads < 0:
1585 result = -1 + changedheads
1585 result = -1 + changedheads
1586 return result
1586 return result
1587
1587
1588 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1588 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1589 'targetphase'))
1589 'targetphase'))
1590 def handlechangegroup(op, inpart):
1590 def handlechangegroup(op, inpart):
1591 """apply a changegroup part on the repo
1591 """apply a changegroup part on the repo
1592
1592
1593 This is a very early implementation that will massive rework before being
1593 This is a very early implementation that will massive rework before being
1594 inflicted to any end-user.
1594 inflicted to any end-user.
1595 """
1595 """
1596 tr = op.gettransaction()
1596 tr = op.gettransaction()
1597 unpackerversion = inpart.params.get('version', '01')
1597 unpackerversion = inpart.params.get('version', '01')
1598 # We should raise an appropriate exception here
1598 # We should raise an appropriate exception here
1599 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1599 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1600 # the source and url passed here are overwritten by the one contained in
1600 # the source and url passed here are overwritten by the one contained in
1601 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1601 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1602 nbchangesets = None
1602 nbchangesets = None
1603 if 'nbchanges' in inpart.params:
1603 if 'nbchanges' in inpart.params:
1604 nbchangesets = int(inpart.params.get('nbchanges'))
1604 nbchangesets = int(inpart.params.get('nbchanges'))
1605 if ('treemanifest' in inpart.params and
1605 if ('treemanifest' in inpart.params and
1606 'treemanifest' not in op.repo.requirements):
1606 'treemanifest' not in op.repo.requirements):
1607 if len(op.repo.changelog) != 0:
1607 if len(op.repo.changelog) != 0:
1608 raise error.Abort(_(
1608 raise error.Abort(_(
1609 "bundle contains tree manifests, but local repo is "
1609 "bundle contains tree manifests, but local repo is "
1610 "non-empty and does not use tree manifests"))
1610 "non-empty and does not use tree manifests"))
1611 op.repo.requirements.add('treemanifest')
1611 op.repo.requirements.add('treemanifest')
1612 op.repo._applyopenerreqs()
1612 op.repo._applyopenerreqs()
1613 op.repo._writerequirements()
1613 op.repo._writerequirements()
1614 extrakwargs = {}
1614 extrakwargs = {}
1615 targetphase = inpart.params.get('targetphase')
1615 targetphase = inpart.params.get('targetphase')
1616 if targetphase is not None:
1616 if targetphase is not None:
1617 extrakwargs['targetphase'] = int(targetphase)
1617 extrakwargs['targetphase'] = int(targetphase)
1618 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1618 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1619 expectedtotal=nbchangesets, **extrakwargs)
1619 expectedtotal=nbchangesets, **extrakwargs)
1620 if op.reply is not None:
1620 if op.reply is not None:
1621 # This is definitely not the final form of this
1621 # This is definitely not the final form of this
1622 # return. But one need to start somewhere.
1622 # return. But one need to start somewhere.
1623 part = op.reply.newpart('reply:changegroup', mandatory=False)
1623 part = op.reply.newpart('reply:changegroup', mandatory=False)
1624 part.addparam(
1624 part.addparam(
1625 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1625 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1626 part.addparam('return', '%i' % ret, mandatory=False)
1626 part.addparam('return', '%i' % ret, mandatory=False)
1627 assert not inpart.read()
1627 assert not inpart.read()
1628
1628
1629 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1629 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1630 ['digest:%s' % k for k in util.DIGESTS.keys()])
1630 ['digest:%s' % k for k in util.DIGESTS.keys()])
1631 @parthandler('remote-changegroup', _remotechangegroupparams)
1631 @parthandler('remote-changegroup', _remotechangegroupparams)
1632 def handleremotechangegroup(op, inpart):
1632 def handleremotechangegroup(op, inpart):
1633 """apply a bundle10 on the repo, given an url and validation information
1633 """apply a bundle10 on the repo, given an url and validation information
1634
1634
1635 All the information about the remote bundle to import are given as
1635 All the information about the remote bundle to import are given as
1636 parameters. The parameters include:
1636 parameters. The parameters include:
1637 - url: the url to the bundle10.
1637 - url: the url to the bundle10.
1638 - size: the bundle10 file size. It is used to validate what was
1638 - size: the bundle10 file size. It is used to validate what was
1639 retrieved by the client matches the server knowledge about the bundle.
1639 retrieved by the client matches the server knowledge about the bundle.
1640 - digests: a space separated list of the digest types provided as
1640 - digests: a space separated list of the digest types provided as
1641 parameters.
1641 parameters.
1642 - digest:<digest-type>: the hexadecimal representation of the digest with
1642 - digest:<digest-type>: the hexadecimal representation of the digest with
1643 that name. Like the size, it is used to validate what was retrieved by
1643 that name. Like the size, it is used to validate what was retrieved by
1644 the client matches what the server knows about the bundle.
1644 the client matches what the server knows about the bundle.
1645
1645
1646 When multiple digest types are given, all of them are checked.
1646 When multiple digest types are given, all of them are checked.
1647 """
1647 """
1648 try:
1648 try:
1649 raw_url = inpart.params['url']
1649 raw_url = inpart.params['url']
1650 except KeyError:
1650 except KeyError:
1651 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1651 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1652 parsed_url = util.url(raw_url)
1652 parsed_url = util.url(raw_url)
1653 if parsed_url.scheme not in capabilities['remote-changegroup']:
1653 if parsed_url.scheme not in capabilities['remote-changegroup']:
1654 raise error.Abort(_('remote-changegroup does not support %s urls') %
1654 raise error.Abort(_('remote-changegroup does not support %s urls') %
1655 parsed_url.scheme)
1655 parsed_url.scheme)
1656
1656
1657 try:
1657 try:
1658 size = int(inpart.params['size'])
1658 size = int(inpart.params['size'])
1659 except ValueError:
1659 except ValueError:
1660 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1660 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1661 % 'size')
1661 % 'size')
1662 except KeyError:
1662 except KeyError:
1663 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1663 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1664
1664
1665 digests = {}
1665 digests = {}
1666 for typ in inpart.params.get('digests', '').split():
1666 for typ in inpart.params.get('digests', '').split():
1667 param = 'digest:%s' % typ
1667 param = 'digest:%s' % typ
1668 try:
1668 try:
1669 value = inpart.params[param]
1669 value = inpart.params[param]
1670 except KeyError:
1670 except KeyError:
1671 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1671 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1672 param)
1672 param)
1673 digests[typ] = value
1673 digests[typ] = value
1674
1674
1675 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1675 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1676
1676
1677 tr = op.gettransaction()
1677 tr = op.gettransaction()
1678 from . import exchange
1678 from . import exchange
1679 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1679 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1680 if not isinstance(cg, changegroup.cg1unpacker):
1680 if not isinstance(cg, changegroup.cg1unpacker):
1681 raise error.Abort(_('%s: not a bundle version 1.0') %
1681 raise error.Abort(_('%s: not a bundle version 1.0') %
1682 util.hidepassword(raw_url))
1682 util.hidepassword(raw_url))
1683 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1683 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1684 if op.reply is not None:
1684 if op.reply is not None:
1685 # This is definitely not the final form of this
1685 # This is definitely not the final form of this
1686 # return. But one need to start somewhere.
1686 # return. But one need to start somewhere.
1687 part = op.reply.newpart('reply:changegroup')
1687 part = op.reply.newpart('reply:changegroup')
1688 part.addparam(
1688 part.addparam(
1689 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1689 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1690 part.addparam('return', '%i' % ret, mandatory=False)
1690 part.addparam('return', '%i' % ret, mandatory=False)
1691 try:
1691 try:
1692 real_part.validate()
1692 real_part.validate()
1693 except error.Abort as e:
1693 except error.Abort as e:
1694 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1694 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1695 (util.hidepassword(raw_url), str(e)))
1695 (util.hidepassword(raw_url), str(e)))
1696 assert not inpart.read()
1696 assert not inpart.read()
1697
1697
1698 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1698 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1699 def handlereplychangegroup(op, inpart):
1699 def handlereplychangegroup(op, inpart):
1700 ret = int(inpart.params['return'])
1700 ret = int(inpart.params['return'])
1701 replyto = int(inpart.params['in-reply-to'])
1701 replyto = int(inpart.params['in-reply-to'])
1702 op.records.add('changegroup', {'return': ret}, replyto)
1702 op.records.add('changegroup', {'return': ret}, replyto)
1703
1703
1704 @parthandler('check:heads')
1704 @parthandler('check:heads')
1705 def handlecheckheads(op, inpart):
1705 def handlecheckheads(op, inpart):
1706 """check that head of the repo did not change
1706 """check that head of the repo did not change
1707
1707
1708 This is used to detect a push race when using unbundle.
1708 This is used to detect a push race when using unbundle.
1709 This replaces the "heads" argument of unbundle."""
1709 This replaces the "heads" argument of unbundle."""
1710 h = inpart.read(20)
1710 h = inpart.read(20)
1711 heads = []
1711 heads = []
1712 while len(h) == 20:
1712 while len(h) == 20:
1713 heads.append(h)
1713 heads.append(h)
1714 h = inpart.read(20)
1714 h = inpart.read(20)
1715 assert not h
1715 assert not h
1716 # Trigger a transaction so that we are guaranteed to have the lock now.
1716 # Trigger a transaction so that we are guaranteed to have the lock now.
1717 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1717 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1718 op.gettransaction()
1718 op.gettransaction()
1719 if sorted(heads) != sorted(op.repo.heads()):
1719 if sorted(heads) != sorted(op.repo.heads()):
1720 raise error.PushRaced('repository changed while pushing - '
1720 raise error.PushRaced('repository changed while pushing - '
1721 'please try again')
1721 'please try again')
1722
1722
1723 @parthandler('check:updated-heads')
1723 @parthandler('check:updated-heads')
1724 def handlecheckupdatedheads(op, inpart):
1724 def handlecheckupdatedheads(op, inpart):
1725 """check for race on the heads touched by a push
1725 """check for race on the heads touched by a push
1726
1726
1727 This is similar to 'check:heads' but focus on the heads actually updated
1727 This is similar to 'check:heads' but focus on the heads actually updated
1728 during the push. If other activities happen on unrelated heads, it is
1728 during the push. If other activities happen on unrelated heads, it is
1729 ignored.
1729 ignored.
1730
1730
1731 This allow server with high traffic to avoid push contention as long as
1731 This allow server with high traffic to avoid push contention as long as
1732 unrelated parts of the graph are involved."""
1732 unrelated parts of the graph are involved."""
1733 h = inpart.read(20)
1733 h = inpart.read(20)
1734 heads = []
1734 heads = []
1735 while len(h) == 20:
1735 while len(h) == 20:
1736 heads.append(h)
1736 heads.append(h)
1737 h = inpart.read(20)
1737 h = inpart.read(20)
1738 assert not h
1738 assert not h
1739 # trigger a transaction so that we are guaranteed to have the lock now.
1739 # trigger a transaction so that we are guaranteed to have the lock now.
1740 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1740 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1741 op.gettransaction()
1741 op.gettransaction()
1742
1742
1743 currentheads = set()
1743 currentheads = set()
1744 for ls in op.repo.branchmap().itervalues():
1744 for ls in op.repo.branchmap().itervalues():
1745 currentheads.update(ls)
1745 currentheads.update(ls)
1746
1746
1747 for h in heads:
1747 for h in heads:
1748 if h not in currentheads:
1748 if h not in currentheads:
1749 raise error.PushRaced('repository changed while pushing - '
1749 raise error.PushRaced('repository changed while pushing - '
1750 'please try again')
1750 'please try again')
1751
1751
1752 @parthandler('output')
1752 @parthandler('output')
1753 def handleoutput(op, inpart):
1753 def handleoutput(op, inpart):
1754 """forward output captured on the server to the client"""
1754 """forward output captured on the server to the client"""
1755 for line in inpart.read().splitlines():
1755 for line in inpart.read().splitlines():
1756 op.ui.status(_('remote: %s\n') % line)
1756 op.ui.status(_('remote: %s\n') % line)
1757
1757
1758 @parthandler('replycaps')
1758 @parthandler('replycaps')
1759 def handlereplycaps(op, inpart):
1759 def handlereplycaps(op, inpart):
1760 """Notify that a reply bundle should be created
1760 """Notify that a reply bundle should be created
1761
1761
1762 The payload contains the capabilities information for the reply"""
1762 The payload contains the capabilities information for the reply"""
1763 caps = decodecaps(inpart.read())
1763 caps = decodecaps(inpart.read())
1764 if op.reply is None:
1764 if op.reply is None:
1765 op.reply = bundle20(op.ui, caps)
1765 op.reply = bundle20(op.ui, caps)
1766
1766
1767 class AbortFromPart(error.Abort):
1767 class AbortFromPart(error.Abort):
1768 """Sub-class of Abort that denotes an error from a bundle2 part."""
1768 """Sub-class of Abort that denotes an error from a bundle2 part."""
1769
1769
1770 @parthandler('error:abort', ('message', 'hint'))
1770 @parthandler('error:abort', ('message', 'hint'))
1771 def handleerrorabort(op, inpart):
1771 def handleerrorabort(op, inpart):
1772 """Used to transmit abort error over the wire"""
1772 """Used to transmit abort error over the wire"""
1773 raise AbortFromPart(inpart.params['message'],
1773 raise AbortFromPart(inpart.params['message'],
1774 hint=inpart.params.get('hint'))
1774 hint=inpart.params.get('hint'))
1775
1775
1776 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1776 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1777 'in-reply-to'))
1777 'in-reply-to'))
1778 def handleerrorpushkey(op, inpart):
1778 def handleerrorpushkey(op, inpart):
1779 """Used to transmit failure of a mandatory pushkey over the wire"""
1779 """Used to transmit failure of a mandatory pushkey over the wire"""
1780 kwargs = {}
1780 kwargs = {}
1781 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1781 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1782 value = inpart.params.get(name)
1782 value = inpart.params.get(name)
1783 if value is not None:
1783 if value is not None:
1784 kwargs[name] = value
1784 kwargs[name] = value
1785 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1785 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1786
1786
1787 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1787 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1788 def handleerrorunsupportedcontent(op, inpart):
1788 def handleerrorunsupportedcontent(op, inpart):
1789 """Used to transmit unknown content error over the wire"""
1789 """Used to transmit unknown content error over the wire"""
1790 kwargs = {}
1790 kwargs = {}
1791 parttype = inpart.params.get('parttype')
1791 parttype = inpart.params.get('parttype')
1792 if parttype is not None:
1792 if parttype is not None:
1793 kwargs['parttype'] = parttype
1793 kwargs['parttype'] = parttype
1794 params = inpart.params.get('params')
1794 params = inpart.params.get('params')
1795 if params is not None:
1795 if params is not None:
1796 kwargs['params'] = params.split('\0')
1796 kwargs['params'] = params.split('\0')
1797
1797
1798 raise error.BundleUnknownFeatureError(**kwargs)
1798 raise error.BundleUnknownFeatureError(**kwargs)
1799
1799
1800 @parthandler('error:pushraced', ('message',))
1800 @parthandler('error:pushraced', ('message',))
1801 def handleerrorpushraced(op, inpart):
1801 def handleerrorpushraced(op, inpart):
1802 """Used to transmit push race error over the wire"""
1802 """Used to transmit push race error over the wire"""
1803 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1803 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1804
1804
1805 @parthandler('listkeys', ('namespace',))
1805 @parthandler('listkeys', ('namespace',))
1806 def handlelistkeys(op, inpart):
1806 def handlelistkeys(op, inpart):
1807 """retrieve pushkey namespace content stored in a bundle2"""
1807 """retrieve pushkey namespace content stored in a bundle2"""
1808 namespace = inpart.params['namespace']
1808 namespace = inpart.params['namespace']
1809 r = pushkey.decodekeys(inpart.read())
1809 r = pushkey.decodekeys(inpart.read())
1810 op.records.add('listkeys', (namespace, r))
1810 op.records.add('listkeys', (namespace, r))
1811
1811
1812 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1812 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1813 def handlepushkey(op, inpart):
1813 def handlepushkey(op, inpart):
1814 """process a pushkey request"""
1814 """process a pushkey request"""
1815 dec = pushkey.decode
1815 dec = pushkey.decode
1816 namespace = dec(inpart.params['namespace'])
1816 namespace = dec(inpart.params['namespace'])
1817 key = dec(inpart.params['key'])
1817 key = dec(inpart.params['key'])
1818 old = dec(inpart.params['old'])
1818 old = dec(inpart.params['old'])
1819 new = dec(inpart.params['new'])
1819 new = dec(inpart.params['new'])
1820 # Grab the transaction to ensure that we have the lock before performing the
1820 # Grab the transaction to ensure that we have the lock before performing the
1821 # pushkey.
1821 # pushkey.
1822 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1822 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1823 op.gettransaction()
1823 op.gettransaction()
1824 ret = op.repo.pushkey(namespace, key, old, new)
1824 ret = op.repo.pushkey(namespace, key, old, new)
1825 record = {'namespace': namespace,
1825 record = {'namespace': namespace,
1826 'key': key,
1826 'key': key,
1827 'old': old,
1827 'old': old,
1828 'new': new}
1828 'new': new}
1829 op.records.add('pushkey', record)
1829 op.records.add('pushkey', record)
1830 if op.reply is not None:
1830 if op.reply is not None:
1831 rpart = op.reply.newpart('reply:pushkey')
1831 rpart = op.reply.newpart('reply:pushkey')
1832 rpart.addparam(
1832 rpart.addparam(
1833 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1833 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1834 rpart.addparam('return', '%i' % ret, mandatory=False)
1834 rpart.addparam('return', '%i' % ret, mandatory=False)
1835 if inpart.mandatory and not ret:
1835 if inpart.mandatory and not ret:
1836 kwargs = {}
1836 kwargs = {}
1837 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1837 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1838 if key in inpart.params:
1838 if key in inpart.params:
1839 kwargs[key] = inpart.params[key]
1839 kwargs[key] = inpart.params[key]
1840 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1840 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1841
1841
1842 @parthandler('phase-heads')
1842 @parthandler('phase-heads')
1843 def handlephases(op, inpart):
1843 def handlephases(op, inpart):
1844 """apply phases from bundle part to repo"""
1844 """apply phases from bundle part to repo"""
1845 headsbyphase = phases.binarydecode(inpart)
1845 headsbyphase = phases.binarydecode(inpart)
1846 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
1846 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
1847 op.records.add('phase-heads', {})
1848
1847
1849 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1848 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1850 def handlepushkeyreply(op, inpart):
1849 def handlepushkeyreply(op, inpart):
1851 """retrieve the result of a pushkey request"""
1850 """retrieve the result of a pushkey request"""
1852 ret = int(inpart.params['return'])
1851 ret = int(inpart.params['return'])
1853 partid = int(inpart.params['in-reply-to'])
1852 partid = int(inpart.params['in-reply-to'])
1854 op.records.add('pushkey', {'return': ret}, partid)
1853 op.records.add('pushkey', {'return': ret}, partid)
1855
1854
1856 @parthandler('obsmarkers')
1855 @parthandler('obsmarkers')
1857 def handleobsmarker(op, inpart):
1856 def handleobsmarker(op, inpart):
1858 """add a stream of obsmarkers to the repo"""
1857 """add a stream of obsmarkers to the repo"""
1859 tr = op.gettransaction()
1858 tr = op.gettransaction()
1860 markerdata = inpart.read()
1859 markerdata = inpart.read()
1861 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1860 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1862 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1861 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1863 % len(markerdata))
1862 % len(markerdata))
1864 # The mergemarkers call will crash if marker creation is not enabled.
1863 # The mergemarkers call will crash if marker creation is not enabled.
1865 # we want to avoid this if the part is advisory.
1864 # we want to avoid this if the part is advisory.
1866 if not inpart.mandatory and op.repo.obsstore.readonly:
1865 if not inpart.mandatory and op.repo.obsstore.readonly:
1867 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1866 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1868 return
1867 return
1869 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1868 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1870 op.repo.invalidatevolatilesets()
1869 op.repo.invalidatevolatilesets()
1871 if new:
1870 if new:
1872 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1871 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1873 op.records.add('obsmarkers', {'new': new})
1872 op.records.add('obsmarkers', {'new': new})
1874 if op.reply is not None:
1873 if op.reply is not None:
1875 rpart = op.reply.newpart('reply:obsmarkers')
1874 rpart = op.reply.newpart('reply:obsmarkers')
1876 rpart.addparam(
1875 rpart.addparam(
1877 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1876 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1878 rpart.addparam('new', '%i' % new, mandatory=False)
1877 rpart.addparam('new', '%i' % new, mandatory=False)
1879
1878
1880
1879
1881 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1880 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1882 def handleobsmarkerreply(op, inpart):
1881 def handleobsmarkerreply(op, inpart):
1883 """retrieve the result of a pushkey request"""
1882 """retrieve the result of a pushkey request"""
1884 ret = int(inpart.params['new'])
1883 ret = int(inpart.params['new'])
1885 partid = int(inpart.params['in-reply-to'])
1884 partid = int(inpart.params['in-reply-to'])
1886 op.records.add('obsmarkers', {'new': ret}, partid)
1885 op.records.add('obsmarkers', {'new': ret}, partid)
1887
1886
1888 @parthandler('hgtagsfnodes')
1887 @parthandler('hgtagsfnodes')
1889 def handlehgtagsfnodes(op, inpart):
1888 def handlehgtagsfnodes(op, inpart):
1890 """Applies .hgtags fnodes cache entries to the local repo.
1889 """Applies .hgtags fnodes cache entries to the local repo.
1891
1890
1892 Payload is pairs of 20 byte changeset nodes and filenodes.
1891 Payload is pairs of 20 byte changeset nodes and filenodes.
1893 """
1892 """
1894 # Grab the transaction so we ensure that we have the lock at this point.
1893 # Grab the transaction so we ensure that we have the lock at this point.
1895 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1894 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1896 op.gettransaction()
1895 op.gettransaction()
1897 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1896 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1898
1897
1899 count = 0
1898 count = 0
1900 while True:
1899 while True:
1901 node = inpart.read(20)
1900 node = inpart.read(20)
1902 fnode = inpart.read(20)
1901 fnode = inpart.read(20)
1903 if len(node) < 20 or len(fnode) < 20:
1902 if len(node) < 20 or len(fnode) < 20:
1904 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1903 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1905 break
1904 break
1906 cache.setfnode(node, fnode)
1905 cache.setfnode(node, fnode)
1907 count += 1
1906 count += 1
1908
1907
1909 cache.write()
1908 cache.write()
1910 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1909 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1911
1910
1912 @parthandler('pushvars')
1911 @parthandler('pushvars')
1913 def bundle2getvars(op, part):
1912 def bundle2getvars(op, part):
1914 '''unbundle a bundle2 containing shellvars on the server'''
1913 '''unbundle a bundle2 containing shellvars on the server'''
1915 # An option to disable unbundling on server-side for security reasons
1914 # An option to disable unbundling on server-side for security reasons
1916 if op.ui.configbool('push', 'pushvars.server'):
1915 if op.ui.configbool('push', 'pushvars.server'):
1917 hookargs = {}
1916 hookargs = {}
1918 for key, value in part.advisoryparams:
1917 for key, value in part.advisoryparams:
1919 key = key.upper()
1918 key = key.upper()
1920 # We want pushed variables to have USERVAR_ prepended so we know
1919 # We want pushed variables to have USERVAR_ prepended so we know
1921 # they came from the --pushvar flag.
1920 # they came from the --pushvar flag.
1922 key = "USERVAR_" + key
1921 key = "USERVAR_" + key
1923 hookargs[key] = value
1922 hookargs[key] = value
1924 op.addhookargs(hookargs)
1923 op.addhookargs(hookargs)
@@ -1,2064 +1,2060 b''
1 # exchange.py - utility to exchange data between repos.
1 # exchange.py - utility to exchange data between repos.
2 #
2 #
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import collections
10 import collections
11 import errno
11 import errno
12 import hashlib
12 import hashlib
13
13
14 from .i18n import _
14 from .i18n import _
15 from .node import (
15 from .node import (
16 hex,
16 hex,
17 nullid,
17 nullid,
18 )
18 )
19 from . import (
19 from . import (
20 bookmarks as bookmod,
20 bookmarks as bookmod,
21 bundle2,
21 bundle2,
22 changegroup,
22 changegroup,
23 discovery,
23 discovery,
24 error,
24 error,
25 lock as lockmod,
25 lock as lockmod,
26 obsolete,
26 obsolete,
27 phases,
27 phases,
28 pushkey,
28 pushkey,
29 pycompat,
29 pycompat,
30 scmutil,
30 scmutil,
31 sslutil,
31 sslutil,
32 streamclone,
32 streamclone,
33 url as urlmod,
33 url as urlmod,
34 util,
34 util,
35 )
35 )
36
36
37 urlerr = util.urlerr
37 urlerr = util.urlerr
38 urlreq = util.urlreq
38 urlreq = util.urlreq
39
39
40 # Maps bundle version human names to changegroup versions.
40 # Maps bundle version human names to changegroup versions.
41 _bundlespeccgversions = {'v1': '01',
41 _bundlespeccgversions = {'v1': '01',
42 'v2': '02',
42 'v2': '02',
43 'packed1': 's1',
43 'packed1': 's1',
44 'bundle2': '02', #legacy
44 'bundle2': '02', #legacy
45 }
45 }
46
46
47 # Compression engines allowed in version 1. THIS SHOULD NEVER CHANGE.
47 # Compression engines allowed in version 1. THIS SHOULD NEVER CHANGE.
48 _bundlespecv1compengines = {'gzip', 'bzip2', 'none'}
48 _bundlespecv1compengines = {'gzip', 'bzip2', 'none'}
49
49
50 def parsebundlespec(repo, spec, strict=True, externalnames=False):
50 def parsebundlespec(repo, spec, strict=True, externalnames=False):
51 """Parse a bundle string specification into parts.
51 """Parse a bundle string specification into parts.
52
52
53 Bundle specifications denote a well-defined bundle/exchange format.
53 Bundle specifications denote a well-defined bundle/exchange format.
54 The content of a given specification should not change over time in
54 The content of a given specification should not change over time in
55 order to ensure that bundles produced by a newer version of Mercurial are
55 order to ensure that bundles produced by a newer version of Mercurial are
56 readable from an older version.
56 readable from an older version.
57
57
58 The string currently has the form:
58 The string currently has the form:
59
59
60 <compression>-<type>[;<parameter0>[;<parameter1>]]
60 <compression>-<type>[;<parameter0>[;<parameter1>]]
61
61
62 Where <compression> is one of the supported compression formats
62 Where <compression> is one of the supported compression formats
63 and <type> is (currently) a version string. A ";" can follow the type and
63 and <type> is (currently) a version string. A ";" can follow the type and
64 all text afterwards is interpreted as URI encoded, ";" delimited key=value
64 all text afterwards is interpreted as URI encoded, ";" delimited key=value
65 pairs.
65 pairs.
66
66
67 If ``strict`` is True (the default) <compression> is required. Otherwise,
67 If ``strict`` is True (the default) <compression> is required. Otherwise,
68 it is optional.
68 it is optional.
69
69
70 If ``externalnames`` is False (the default), the human-centric names will
70 If ``externalnames`` is False (the default), the human-centric names will
71 be converted to their internal representation.
71 be converted to their internal representation.
72
72
73 Returns a 3-tuple of (compression, version, parameters). Compression will
73 Returns a 3-tuple of (compression, version, parameters). Compression will
74 be ``None`` if not in strict mode and a compression isn't defined.
74 be ``None`` if not in strict mode and a compression isn't defined.
75
75
76 An ``InvalidBundleSpecification`` is raised when the specification is
76 An ``InvalidBundleSpecification`` is raised when the specification is
77 not syntactically well formed.
77 not syntactically well formed.
78
78
79 An ``UnsupportedBundleSpecification`` is raised when the compression or
79 An ``UnsupportedBundleSpecification`` is raised when the compression or
80 bundle type/version is not recognized.
80 bundle type/version is not recognized.
81
81
82 Note: this function will likely eventually return a more complex data
82 Note: this function will likely eventually return a more complex data
83 structure, including bundle2 part information.
83 structure, including bundle2 part information.
84 """
84 """
85 def parseparams(s):
85 def parseparams(s):
86 if ';' not in s:
86 if ';' not in s:
87 return s, {}
87 return s, {}
88
88
89 params = {}
89 params = {}
90 version, paramstr = s.split(';', 1)
90 version, paramstr = s.split(';', 1)
91
91
92 for p in paramstr.split(';'):
92 for p in paramstr.split(';'):
93 if '=' not in p:
93 if '=' not in p:
94 raise error.InvalidBundleSpecification(
94 raise error.InvalidBundleSpecification(
95 _('invalid bundle specification: '
95 _('invalid bundle specification: '
96 'missing "=" in parameter: %s') % p)
96 'missing "=" in parameter: %s') % p)
97
97
98 key, value = p.split('=', 1)
98 key, value = p.split('=', 1)
99 key = urlreq.unquote(key)
99 key = urlreq.unquote(key)
100 value = urlreq.unquote(value)
100 value = urlreq.unquote(value)
101 params[key] = value
101 params[key] = value
102
102
103 return version, params
103 return version, params
104
104
105
105
106 if strict and '-' not in spec:
106 if strict and '-' not in spec:
107 raise error.InvalidBundleSpecification(
107 raise error.InvalidBundleSpecification(
108 _('invalid bundle specification; '
108 _('invalid bundle specification; '
109 'must be prefixed with compression: %s') % spec)
109 'must be prefixed with compression: %s') % spec)
110
110
111 if '-' in spec:
111 if '-' in spec:
112 compression, version = spec.split('-', 1)
112 compression, version = spec.split('-', 1)
113
113
114 if compression not in util.compengines.supportedbundlenames:
114 if compression not in util.compengines.supportedbundlenames:
115 raise error.UnsupportedBundleSpecification(
115 raise error.UnsupportedBundleSpecification(
116 _('%s compression is not supported') % compression)
116 _('%s compression is not supported') % compression)
117
117
118 version, params = parseparams(version)
118 version, params = parseparams(version)
119
119
120 if version not in _bundlespeccgversions:
120 if version not in _bundlespeccgversions:
121 raise error.UnsupportedBundleSpecification(
121 raise error.UnsupportedBundleSpecification(
122 _('%s is not a recognized bundle version') % version)
122 _('%s is not a recognized bundle version') % version)
123 else:
123 else:
124 # Value could be just the compression or just the version, in which
124 # Value could be just the compression or just the version, in which
125 # case some defaults are assumed (but only when not in strict mode).
125 # case some defaults are assumed (but only when not in strict mode).
126 assert not strict
126 assert not strict
127
127
128 spec, params = parseparams(spec)
128 spec, params = parseparams(spec)
129
129
130 if spec in util.compengines.supportedbundlenames:
130 if spec in util.compengines.supportedbundlenames:
131 compression = spec
131 compression = spec
132 version = 'v1'
132 version = 'v1'
133 # Generaldelta repos require v2.
133 # Generaldelta repos require v2.
134 if 'generaldelta' in repo.requirements:
134 if 'generaldelta' in repo.requirements:
135 version = 'v2'
135 version = 'v2'
136 # Modern compression engines require v2.
136 # Modern compression engines require v2.
137 if compression not in _bundlespecv1compengines:
137 if compression not in _bundlespecv1compengines:
138 version = 'v2'
138 version = 'v2'
139 elif spec in _bundlespeccgversions:
139 elif spec in _bundlespeccgversions:
140 if spec == 'packed1':
140 if spec == 'packed1':
141 compression = 'none'
141 compression = 'none'
142 else:
142 else:
143 compression = 'bzip2'
143 compression = 'bzip2'
144 version = spec
144 version = spec
145 else:
145 else:
146 raise error.UnsupportedBundleSpecification(
146 raise error.UnsupportedBundleSpecification(
147 _('%s is not a recognized bundle specification') % spec)
147 _('%s is not a recognized bundle specification') % spec)
148
148
149 # Bundle version 1 only supports a known set of compression engines.
149 # Bundle version 1 only supports a known set of compression engines.
150 if version == 'v1' and compression not in _bundlespecv1compengines:
150 if version == 'v1' and compression not in _bundlespecv1compengines:
151 raise error.UnsupportedBundleSpecification(
151 raise error.UnsupportedBundleSpecification(
152 _('compression engine %s is not supported on v1 bundles') %
152 _('compression engine %s is not supported on v1 bundles') %
153 compression)
153 compression)
154
154
155 # The specification for packed1 can optionally declare the data formats
155 # The specification for packed1 can optionally declare the data formats
156 # required to apply it. If we see this metadata, compare against what the
156 # required to apply it. If we see this metadata, compare against what the
157 # repo supports and error if the bundle isn't compatible.
157 # repo supports and error if the bundle isn't compatible.
158 if version == 'packed1' and 'requirements' in params:
158 if version == 'packed1' and 'requirements' in params:
159 requirements = set(params['requirements'].split(','))
159 requirements = set(params['requirements'].split(','))
160 missingreqs = requirements - repo.supportedformats
160 missingreqs = requirements - repo.supportedformats
161 if missingreqs:
161 if missingreqs:
162 raise error.UnsupportedBundleSpecification(
162 raise error.UnsupportedBundleSpecification(
163 _('missing support for repository features: %s') %
163 _('missing support for repository features: %s') %
164 ', '.join(sorted(missingreqs)))
164 ', '.join(sorted(missingreqs)))
165
165
166 if not externalnames:
166 if not externalnames:
167 engine = util.compengines.forbundlename(compression)
167 engine = util.compengines.forbundlename(compression)
168 compression = engine.bundletype()[1]
168 compression = engine.bundletype()[1]
169 version = _bundlespeccgversions[version]
169 version = _bundlespeccgversions[version]
170 return compression, version, params
170 return compression, version, params
171
171
172 def readbundle(ui, fh, fname, vfs=None):
172 def readbundle(ui, fh, fname, vfs=None):
173 header = changegroup.readexactly(fh, 4)
173 header = changegroup.readexactly(fh, 4)
174
174
175 alg = None
175 alg = None
176 if not fname:
176 if not fname:
177 fname = "stream"
177 fname = "stream"
178 if not header.startswith('HG') and header.startswith('\0'):
178 if not header.startswith('HG') and header.startswith('\0'):
179 fh = changegroup.headerlessfixup(fh, header)
179 fh = changegroup.headerlessfixup(fh, header)
180 header = "HG10"
180 header = "HG10"
181 alg = 'UN'
181 alg = 'UN'
182 elif vfs:
182 elif vfs:
183 fname = vfs.join(fname)
183 fname = vfs.join(fname)
184
184
185 magic, version = header[0:2], header[2:4]
185 magic, version = header[0:2], header[2:4]
186
186
187 if magic != 'HG':
187 if magic != 'HG':
188 raise error.Abort(_('%s: not a Mercurial bundle') % fname)
188 raise error.Abort(_('%s: not a Mercurial bundle') % fname)
189 if version == '10':
189 if version == '10':
190 if alg is None:
190 if alg is None:
191 alg = changegroup.readexactly(fh, 2)
191 alg = changegroup.readexactly(fh, 2)
192 return changegroup.cg1unpacker(fh, alg)
192 return changegroup.cg1unpacker(fh, alg)
193 elif version.startswith('2'):
193 elif version.startswith('2'):
194 return bundle2.getunbundler(ui, fh, magicstring=magic + version)
194 return bundle2.getunbundler(ui, fh, magicstring=magic + version)
195 elif version == 'S1':
195 elif version == 'S1':
196 return streamclone.streamcloneapplier(fh)
196 return streamclone.streamcloneapplier(fh)
197 else:
197 else:
198 raise error.Abort(_('%s: unknown bundle version %s') % (fname, version))
198 raise error.Abort(_('%s: unknown bundle version %s') % (fname, version))
199
199
200 def getbundlespec(ui, fh):
200 def getbundlespec(ui, fh):
201 """Infer the bundlespec from a bundle file handle.
201 """Infer the bundlespec from a bundle file handle.
202
202
203 The input file handle is seeked and the original seek position is not
203 The input file handle is seeked and the original seek position is not
204 restored.
204 restored.
205 """
205 """
206 def speccompression(alg):
206 def speccompression(alg):
207 try:
207 try:
208 return util.compengines.forbundletype(alg).bundletype()[0]
208 return util.compengines.forbundletype(alg).bundletype()[0]
209 except KeyError:
209 except KeyError:
210 return None
210 return None
211
211
212 b = readbundle(ui, fh, None)
212 b = readbundle(ui, fh, None)
213 if isinstance(b, changegroup.cg1unpacker):
213 if isinstance(b, changegroup.cg1unpacker):
214 alg = b._type
214 alg = b._type
215 if alg == '_truncatedBZ':
215 if alg == '_truncatedBZ':
216 alg = 'BZ'
216 alg = 'BZ'
217 comp = speccompression(alg)
217 comp = speccompression(alg)
218 if not comp:
218 if not comp:
219 raise error.Abort(_('unknown compression algorithm: %s') % alg)
219 raise error.Abort(_('unknown compression algorithm: %s') % alg)
220 return '%s-v1' % comp
220 return '%s-v1' % comp
221 elif isinstance(b, bundle2.unbundle20):
221 elif isinstance(b, bundle2.unbundle20):
222 if 'Compression' in b.params:
222 if 'Compression' in b.params:
223 comp = speccompression(b.params['Compression'])
223 comp = speccompression(b.params['Compression'])
224 if not comp:
224 if not comp:
225 raise error.Abort(_('unknown compression algorithm: %s') % comp)
225 raise error.Abort(_('unknown compression algorithm: %s') % comp)
226 else:
226 else:
227 comp = 'none'
227 comp = 'none'
228
228
229 version = None
229 version = None
230 for part in b.iterparts():
230 for part in b.iterparts():
231 if part.type == 'changegroup':
231 if part.type == 'changegroup':
232 version = part.params['version']
232 version = part.params['version']
233 if version in ('01', '02'):
233 if version in ('01', '02'):
234 version = 'v2'
234 version = 'v2'
235 else:
235 else:
236 raise error.Abort(_('changegroup version %s does not have '
236 raise error.Abort(_('changegroup version %s does not have '
237 'a known bundlespec') % version,
237 'a known bundlespec') % version,
238 hint=_('try upgrading your Mercurial '
238 hint=_('try upgrading your Mercurial '
239 'client'))
239 'client'))
240
240
241 if not version:
241 if not version:
242 raise error.Abort(_('could not identify changegroup version in '
242 raise error.Abort(_('could not identify changegroup version in '
243 'bundle'))
243 'bundle'))
244
244
245 return '%s-%s' % (comp, version)
245 return '%s-%s' % (comp, version)
246 elif isinstance(b, streamclone.streamcloneapplier):
246 elif isinstance(b, streamclone.streamcloneapplier):
247 requirements = streamclone.readbundle1header(fh)[2]
247 requirements = streamclone.readbundle1header(fh)[2]
248 params = 'requirements=%s' % ','.join(sorted(requirements))
248 params = 'requirements=%s' % ','.join(sorted(requirements))
249 return 'none-packed1;%s' % urlreq.quote(params)
249 return 'none-packed1;%s' % urlreq.quote(params)
250 else:
250 else:
251 raise error.Abort(_('unknown bundle type: %s') % b)
251 raise error.Abort(_('unknown bundle type: %s') % b)
252
252
253 def _computeoutgoing(repo, heads, common):
253 def _computeoutgoing(repo, heads, common):
254 """Computes which revs are outgoing given a set of common
254 """Computes which revs are outgoing given a set of common
255 and a set of heads.
255 and a set of heads.
256
256
257 This is a separate function so extensions can have access to
257 This is a separate function so extensions can have access to
258 the logic.
258 the logic.
259
259
260 Returns a discovery.outgoing object.
260 Returns a discovery.outgoing object.
261 """
261 """
262 cl = repo.changelog
262 cl = repo.changelog
263 if common:
263 if common:
264 hasnode = cl.hasnode
264 hasnode = cl.hasnode
265 common = [n for n in common if hasnode(n)]
265 common = [n for n in common if hasnode(n)]
266 else:
266 else:
267 common = [nullid]
267 common = [nullid]
268 if not heads:
268 if not heads:
269 heads = cl.heads()
269 heads = cl.heads()
270 return discovery.outgoing(repo, common, heads)
270 return discovery.outgoing(repo, common, heads)
271
271
272 def _forcebundle1(op):
272 def _forcebundle1(op):
273 """return true if a pull/push must use bundle1
273 """return true if a pull/push must use bundle1
274
274
275 This function is used to allow testing of the older bundle version"""
275 This function is used to allow testing of the older bundle version"""
276 ui = op.repo.ui
276 ui = op.repo.ui
277 forcebundle1 = False
277 forcebundle1 = False
278 # The goal is this config is to allow developer to choose the bundle
278 # The goal is this config is to allow developer to choose the bundle
279 # version used during exchanged. This is especially handy during test.
279 # version used during exchanged. This is especially handy during test.
280 # Value is a list of bundle version to be picked from, highest version
280 # Value is a list of bundle version to be picked from, highest version
281 # should be used.
281 # should be used.
282 #
282 #
283 # developer config: devel.legacy.exchange
283 # developer config: devel.legacy.exchange
284 exchange = ui.configlist('devel', 'legacy.exchange')
284 exchange = ui.configlist('devel', 'legacy.exchange')
285 forcebundle1 = 'bundle2' not in exchange and 'bundle1' in exchange
285 forcebundle1 = 'bundle2' not in exchange and 'bundle1' in exchange
286 return forcebundle1 or not op.remote.capable('bundle2')
286 return forcebundle1 or not op.remote.capable('bundle2')
287
287
288 class pushoperation(object):
288 class pushoperation(object):
289 """A object that represent a single push operation
289 """A object that represent a single push operation
290
290
291 Its purpose is to carry push related state and very common operations.
291 Its purpose is to carry push related state and very common operations.
292
292
293 A new pushoperation should be created at the beginning of each push and
293 A new pushoperation should be created at the beginning of each push and
294 discarded afterward.
294 discarded afterward.
295 """
295 """
296
296
297 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
297 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
298 bookmarks=(), pushvars=None):
298 bookmarks=(), pushvars=None):
299 # repo we push from
299 # repo we push from
300 self.repo = repo
300 self.repo = repo
301 self.ui = repo.ui
301 self.ui = repo.ui
302 # repo we push to
302 # repo we push to
303 self.remote = remote
303 self.remote = remote
304 # force option provided
304 # force option provided
305 self.force = force
305 self.force = force
306 # revs to be pushed (None is "all")
306 # revs to be pushed (None is "all")
307 self.revs = revs
307 self.revs = revs
308 # bookmark explicitly pushed
308 # bookmark explicitly pushed
309 self.bookmarks = bookmarks
309 self.bookmarks = bookmarks
310 # allow push of new branch
310 # allow push of new branch
311 self.newbranch = newbranch
311 self.newbranch = newbranch
312 # step already performed
312 # step already performed
313 # (used to check what steps have been already performed through bundle2)
313 # (used to check what steps have been already performed through bundle2)
314 self.stepsdone = set()
314 self.stepsdone = set()
315 # Integer version of the changegroup push result
315 # Integer version of the changegroup push result
316 # - None means nothing to push
316 # - None means nothing to push
317 # - 0 means HTTP error
317 # - 0 means HTTP error
318 # - 1 means we pushed and remote head count is unchanged *or*
318 # - 1 means we pushed and remote head count is unchanged *or*
319 # we have outgoing changesets but refused to push
319 # we have outgoing changesets but refused to push
320 # - other values as described by addchangegroup()
320 # - other values as described by addchangegroup()
321 self.cgresult = None
321 self.cgresult = None
322 # Boolean value for the bookmark push
322 # Boolean value for the bookmark push
323 self.bkresult = None
323 self.bkresult = None
324 # discover.outgoing object (contains common and outgoing data)
324 # discover.outgoing object (contains common and outgoing data)
325 self.outgoing = None
325 self.outgoing = None
326 # all remote topological heads before the push
326 # all remote topological heads before the push
327 self.remoteheads = None
327 self.remoteheads = None
328 # Details of the remote branch pre and post push
328 # Details of the remote branch pre and post push
329 #
329 #
330 # mapping: {'branch': ([remoteheads],
330 # mapping: {'branch': ([remoteheads],
331 # [newheads],
331 # [newheads],
332 # [unsyncedheads],
332 # [unsyncedheads],
333 # [discardedheads])}
333 # [discardedheads])}
334 # - branch: the branch name
334 # - branch: the branch name
335 # - remoteheads: the list of remote heads known locally
335 # - remoteheads: the list of remote heads known locally
336 # None if the branch is new
336 # None if the branch is new
337 # - newheads: the new remote heads (known locally) with outgoing pushed
337 # - newheads: the new remote heads (known locally) with outgoing pushed
338 # - unsyncedheads: the list of remote heads unknown locally.
338 # - unsyncedheads: the list of remote heads unknown locally.
339 # - discardedheads: the list of remote heads made obsolete by the push
339 # - discardedheads: the list of remote heads made obsolete by the push
340 self.pushbranchmap = None
340 self.pushbranchmap = None
341 # testable as a boolean indicating if any nodes are missing locally.
341 # testable as a boolean indicating if any nodes are missing locally.
342 self.incoming = None
342 self.incoming = None
343 # phases changes that must be pushed along side the changesets
343 # phases changes that must be pushed along side the changesets
344 self.outdatedphases = None
344 self.outdatedphases = None
345 # phases changes that must be pushed if changeset push fails
345 # phases changes that must be pushed if changeset push fails
346 self.fallbackoutdatedphases = None
346 self.fallbackoutdatedphases = None
347 # outgoing obsmarkers
347 # outgoing obsmarkers
348 self.outobsmarkers = set()
348 self.outobsmarkers = set()
349 # outgoing bookmarks
349 # outgoing bookmarks
350 self.outbookmarks = []
350 self.outbookmarks = []
351 # transaction manager
351 # transaction manager
352 self.trmanager = None
352 self.trmanager = None
353 # map { pushkey partid -> callback handling failure}
353 # map { pushkey partid -> callback handling failure}
354 # used to handle exception from mandatory pushkey part failure
354 # used to handle exception from mandatory pushkey part failure
355 self.pkfailcb = {}
355 self.pkfailcb = {}
356 # an iterable of pushvars or None
356 # an iterable of pushvars or None
357 self.pushvars = pushvars
357 self.pushvars = pushvars
358
358
359 @util.propertycache
359 @util.propertycache
360 def futureheads(self):
360 def futureheads(self):
361 """future remote heads if the changeset push succeeds"""
361 """future remote heads if the changeset push succeeds"""
362 return self.outgoing.missingheads
362 return self.outgoing.missingheads
363
363
364 @util.propertycache
364 @util.propertycache
365 def fallbackheads(self):
365 def fallbackheads(self):
366 """future remote heads if the changeset push fails"""
366 """future remote heads if the changeset push fails"""
367 if self.revs is None:
367 if self.revs is None:
368 # not target to push, all common are relevant
368 # not target to push, all common are relevant
369 return self.outgoing.commonheads
369 return self.outgoing.commonheads
370 unfi = self.repo.unfiltered()
370 unfi = self.repo.unfiltered()
371 # I want cheads = heads(::missingheads and ::commonheads)
371 # I want cheads = heads(::missingheads and ::commonheads)
372 # (missingheads is revs with secret changeset filtered out)
372 # (missingheads is revs with secret changeset filtered out)
373 #
373 #
374 # This can be expressed as:
374 # This can be expressed as:
375 # cheads = ( (missingheads and ::commonheads)
375 # cheads = ( (missingheads and ::commonheads)
376 # + (commonheads and ::missingheads))"
376 # + (commonheads and ::missingheads))"
377 # )
377 # )
378 #
378 #
379 # while trying to push we already computed the following:
379 # while trying to push we already computed the following:
380 # common = (::commonheads)
380 # common = (::commonheads)
381 # missing = ((commonheads::missingheads) - commonheads)
381 # missing = ((commonheads::missingheads) - commonheads)
382 #
382 #
383 # We can pick:
383 # We can pick:
384 # * missingheads part of common (::commonheads)
384 # * missingheads part of common (::commonheads)
385 common = self.outgoing.common
385 common = self.outgoing.common
386 nm = self.repo.changelog.nodemap
386 nm = self.repo.changelog.nodemap
387 cheads = [node for node in self.revs if nm[node] in common]
387 cheads = [node for node in self.revs if nm[node] in common]
388 # and
388 # and
389 # * commonheads parents on missing
389 # * commonheads parents on missing
390 revset = unfi.set('%ln and parents(roots(%ln))',
390 revset = unfi.set('%ln and parents(roots(%ln))',
391 self.outgoing.commonheads,
391 self.outgoing.commonheads,
392 self.outgoing.missing)
392 self.outgoing.missing)
393 cheads.extend(c.node() for c in revset)
393 cheads.extend(c.node() for c in revset)
394 return cheads
394 return cheads
395
395
396 @property
396 @property
397 def commonheads(self):
397 def commonheads(self):
398 """set of all common heads after changeset bundle push"""
398 """set of all common heads after changeset bundle push"""
399 if self.cgresult:
399 if self.cgresult:
400 return self.futureheads
400 return self.futureheads
401 else:
401 else:
402 return self.fallbackheads
402 return self.fallbackheads
403
403
404 # mapping of message used when pushing bookmark
404 # mapping of message used when pushing bookmark
405 bookmsgmap = {'update': (_("updating bookmark %s\n"),
405 bookmsgmap = {'update': (_("updating bookmark %s\n"),
406 _('updating bookmark %s failed!\n')),
406 _('updating bookmark %s failed!\n')),
407 'export': (_("exporting bookmark %s\n"),
407 'export': (_("exporting bookmark %s\n"),
408 _('exporting bookmark %s failed!\n')),
408 _('exporting bookmark %s failed!\n')),
409 'delete': (_("deleting remote bookmark %s\n"),
409 'delete': (_("deleting remote bookmark %s\n"),
410 _('deleting remote bookmark %s failed!\n')),
410 _('deleting remote bookmark %s failed!\n')),
411 }
411 }
412
412
413
413
414 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=(),
414 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=(),
415 opargs=None):
415 opargs=None):
416 '''Push outgoing changesets (limited by revs) from a local
416 '''Push outgoing changesets (limited by revs) from a local
417 repository to remote. Return an integer:
417 repository to remote. Return an integer:
418 - None means nothing to push
418 - None means nothing to push
419 - 0 means HTTP error
419 - 0 means HTTP error
420 - 1 means we pushed and remote head count is unchanged *or*
420 - 1 means we pushed and remote head count is unchanged *or*
421 we have outgoing changesets but refused to push
421 we have outgoing changesets but refused to push
422 - other values as described by addchangegroup()
422 - other values as described by addchangegroup()
423 '''
423 '''
424 if opargs is None:
424 if opargs is None:
425 opargs = {}
425 opargs = {}
426 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks,
426 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks,
427 **pycompat.strkwargs(opargs))
427 **pycompat.strkwargs(opargs))
428 if pushop.remote.local():
428 if pushop.remote.local():
429 missing = (set(pushop.repo.requirements)
429 missing = (set(pushop.repo.requirements)
430 - pushop.remote.local().supported)
430 - pushop.remote.local().supported)
431 if missing:
431 if missing:
432 msg = _("required features are not"
432 msg = _("required features are not"
433 " supported in the destination:"
433 " supported in the destination:"
434 " %s") % (', '.join(sorted(missing)))
434 " %s") % (', '.join(sorted(missing)))
435 raise error.Abort(msg)
435 raise error.Abort(msg)
436
436
437 if not pushop.remote.canpush():
437 if not pushop.remote.canpush():
438 raise error.Abort(_("destination does not support push"))
438 raise error.Abort(_("destination does not support push"))
439
439
440 if not pushop.remote.capable('unbundle'):
440 if not pushop.remote.capable('unbundle'):
441 raise error.Abort(_('cannot push: destination does not support the '
441 raise error.Abort(_('cannot push: destination does not support the '
442 'unbundle wire protocol command'))
442 'unbundle wire protocol command'))
443
443
444 # get lock as we might write phase data
444 # get lock as we might write phase data
445 wlock = lock = None
445 wlock = lock = None
446 try:
446 try:
447 # bundle2 push may receive a reply bundle touching bookmarks or other
447 # bundle2 push may receive a reply bundle touching bookmarks or other
448 # things requiring the wlock. Take it now to ensure proper ordering.
448 # things requiring the wlock. Take it now to ensure proper ordering.
449 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
449 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
450 if (not _forcebundle1(pushop)) and maypushback:
450 if (not _forcebundle1(pushop)) and maypushback:
451 wlock = pushop.repo.wlock()
451 wlock = pushop.repo.wlock()
452 lock = pushop.repo.lock()
452 lock = pushop.repo.lock()
453 pushop.trmanager = transactionmanager(pushop.repo,
453 pushop.trmanager = transactionmanager(pushop.repo,
454 'push-response',
454 'push-response',
455 pushop.remote.url())
455 pushop.remote.url())
456 except IOError as err:
456 except IOError as err:
457 if err.errno != errno.EACCES:
457 if err.errno != errno.EACCES:
458 raise
458 raise
459 # source repo cannot be locked.
459 # source repo cannot be locked.
460 # We do not abort the push, but just disable the local phase
460 # We do not abort the push, but just disable the local phase
461 # synchronisation.
461 # synchronisation.
462 msg = 'cannot lock source repository: %s\n' % err
462 msg = 'cannot lock source repository: %s\n' % err
463 pushop.ui.debug(msg)
463 pushop.ui.debug(msg)
464
464
465 with wlock or util.nullcontextmanager(), \
465 with wlock or util.nullcontextmanager(), \
466 lock or util.nullcontextmanager(), \
466 lock or util.nullcontextmanager(), \
467 pushop.trmanager or util.nullcontextmanager():
467 pushop.trmanager or util.nullcontextmanager():
468 pushop.repo.checkpush(pushop)
468 pushop.repo.checkpush(pushop)
469 _pushdiscovery(pushop)
469 _pushdiscovery(pushop)
470 if not _forcebundle1(pushop):
470 if not _forcebundle1(pushop):
471 _pushbundle2(pushop)
471 _pushbundle2(pushop)
472 _pushchangeset(pushop)
472 _pushchangeset(pushop)
473 _pushsyncphase(pushop)
473 _pushsyncphase(pushop)
474 _pushobsolete(pushop)
474 _pushobsolete(pushop)
475 _pushbookmark(pushop)
475 _pushbookmark(pushop)
476
476
477 return pushop
477 return pushop
478
478
479 # list of steps to perform discovery before push
479 # list of steps to perform discovery before push
480 pushdiscoveryorder = []
480 pushdiscoveryorder = []
481
481
482 # Mapping between step name and function
482 # Mapping between step name and function
483 #
483 #
484 # This exists to help extensions wrap steps if necessary
484 # This exists to help extensions wrap steps if necessary
485 pushdiscoverymapping = {}
485 pushdiscoverymapping = {}
486
486
487 def pushdiscovery(stepname):
487 def pushdiscovery(stepname):
488 """decorator for function performing discovery before push
488 """decorator for function performing discovery before push
489
489
490 The function is added to the step -> function mapping and appended to the
490 The function is added to the step -> function mapping and appended to the
491 list of steps. Beware that decorated function will be added in order (this
491 list of steps. Beware that decorated function will be added in order (this
492 may matter).
492 may matter).
493
493
494 You can only use this decorator for a new step, if you want to wrap a step
494 You can only use this decorator for a new step, if you want to wrap a step
495 from an extension, change the pushdiscovery dictionary directly."""
495 from an extension, change the pushdiscovery dictionary directly."""
496 def dec(func):
496 def dec(func):
497 assert stepname not in pushdiscoverymapping
497 assert stepname not in pushdiscoverymapping
498 pushdiscoverymapping[stepname] = func
498 pushdiscoverymapping[stepname] = func
499 pushdiscoveryorder.append(stepname)
499 pushdiscoveryorder.append(stepname)
500 return func
500 return func
501 return dec
501 return dec
502
502
503 def _pushdiscovery(pushop):
503 def _pushdiscovery(pushop):
504 """Run all discovery steps"""
504 """Run all discovery steps"""
505 for stepname in pushdiscoveryorder:
505 for stepname in pushdiscoveryorder:
506 step = pushdiscoverymapping[stepname]
506 step = pushdiscoverymapping[stepname]
507 step(pushop)
507 step(pushop)
508
508
509 @pushdiscovery('changeset')
509 @pushdiscovery('changeset')
510 def _pushdiscoverychangeset(pushop):
510 def _pushdiscoverychangeset(pushop):
511 """discover the changeset that need to be pushed"""
511 """discover the changeset that need to be pushed"""
512 fci = discovery.findcommonincoming
512 fci = discovery.findcommonincoming
513 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
513 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
514 common, inc, remoteheads = commoninc
514 common, inc, remoteheads = commoninc
515 fco = discovery.findcommonoutgoing
515 fco = discovery.findcommonoutgoing
516 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
516 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
517 commoninc=commoninc, force=pushop.force)
517 commoninc=commoninc, force=pushop.force)
518 pushop.outgoing = outgoing
518 pushop.outgoing = outgoing
519 pushop.remoteheads = remoteheads
519 pushop.remoteheads = remoteheads
520 pushop.incoming = inc
520 pushop.incoming = inc
521
521
522 @pushdiscovery('phase')
522 @pushdiscovery('phase')
523 def _pushdiscoveryphase(pushop):
523 def _pushdiscoveryphase(pushop):
524 """discover the phase that needs to be pushed
524 """discover the phase that needs to be pushed
525
525
526 (computed for both success and failure case for changesets push)"""
526 (computed for both success and failure case for changesets push)"""
527 outgoing = pushop.outgoing
527 outgoing = pushop.outgoing
528 unfi = pushop.repo.unfiltered()
528 unfi = pushop.repo.unfiltered()
529 remotephases = pushop.remote.listkeys('phases')
529 remotephases = pushop.remote.listkeys('phases')
530 publishing = remotephases.get('publishing', False)
530 publishing = remotephases.get('publishing', False)
531 if (pushop.ui.configbool('ui', '_usedassubrepo')
531 if (pushop.ui.configbool('ui', '_usedassubrepo')
532 and remotephases # server supports phases
532 and remotephases # server supports phases
533 and not pushop.outgoing.missing # no changesets to be pushed
533 and not pushop.outgoing.missing # no changesets to be pushed
534 and publishing):
534 and publishing):
535 # When:
535 # When:
536 # - this is a subrepo push
536 # - this is a subrepo push
537 # - and remote support phase
537 # - and remote support phase
538 # - and no changeset are to be pushed
538 # - and no changeset are to be pushed
539 # - and remote is publishing
539 # - and remote is publishing
540 # We may be in issue 3871 case!
540 # We may be in issue 3871 case!
541 # We drop the possible phase synchronisation done by
541 # We drop the possible phase synchronisation done by
542 # courtesy to publish changesets possibly locally draft
542 # courtesy to publish changesets possibly locally draft
543 # on the remote.
543 # on the remote.
544 remotephases = {'publishing': 'True'}
544 remotephases = {'publishing': 'True'}
545 ana = phases.analyzeremotephases(pushop.repo,
545 ana = phases.analyzeremotephases(pushop.repo,
546 pushop.fallbackheads,
546 pushop.fallbackheads,
547 remotephases)
547 remotephases)
548 pheads, droots = ana
548 pheads, droots = ana
549 extracond = ''
549 extracond = ''
550 if not publishing:
550 if not publishing:
551 extracond = ' and public()'
551 extracond = ' and public()'
552 revset = 'heads((%%ln::%%ln) %s)' % extracond
552 revset = 'heads((%%ln::%%ln) %s)' % extracond
553 # Get the list of all revs draft on remote by public here.
553 # Get the list of all revs draft on remote by public here.
554 # XXX Beware that revset break if droots is not strictly
554 # XXX Beware that revset break if droots is not strictly
555 # XXX root we may want to ensure it is but it is costly
555 # XXX root we may want to ensure it is but it is costly
556 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
556 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
557 if not outgoing.missing:
557 if not outgoing.missing:
558 future = fallback
558 future = fallback
559 else:
559 else:
560 # adds changeset we are going to push as draft
560 # adds changeset we are going to push as draft
561 #
561 #
562 # should not be necessary for publishing server, but because of an
562 # should not be necessary for publishing server, but because of an
563 # issue fixed in xxxxx we have to do it anyway.
563 # issue fixed in xxxxx we have to do it anyway.
564 fdroots = list(unfi.set('roots(%ln + %ln::)',
564 fdroots = list(unfi.set('roots(%ln + %ln::)',
565 outgoing.missing, droots))
565 outgoing.missing, droots))
566 fdroots = [f.node() for f in fdroots]
566 fdroots = [f.node() for f in fdroots]
567 future = list(unfi.set(revset, fdroots, pushop.futureheads))
567 future = list(unfi.set(revset, fdroots, pushop.futureheads))
568 pushop.outdatedphases = future
568 pushop.outdatedphases = future
569 pushop.fallbackoutdatedphases = fallback
569 pushop.fallbackoutdatedphases = fallback
570
570
571 @pushdiscovery('obsmarker')
571 @pushdiscovery('obsmarker')
572 def _pushdiscoveryobsmarkers(pushop):
572 def _pushdiscoveryobsmarkers(pushop):
573 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
573 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
574 and pushop.repo.obsstore
574 and pushop.repo.obsstore
575 and 'obsolete' in pushop.remote.listkeys('namespaces')):
575 and 'obsolete' in pushop.remote.listkeys('namespaces')):
576 repo = pushop.repo
576 repo = pushop.repo
577 # very naive computation, that can be quite expensive on big repo.
577 # very naive computation, that can be quite expensive on big repo.
578 # However: evolution is currently slow on them anyway.
578 # However: evolution is currently slow on them anyway.
579 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
579 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
580 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
580 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
581
581
582 @pushdiscovery('bookmarks')
582 @pushdiscovery('bookmarks')
583 def _pushdiscoverybookmarks(pushop):
583 def _pushdiscoverybookmarks(pushop):
584 ui = pushop.ui
584 ui = pushop.ui
585 repo = pushop.repo.unfiltered()
585 repo = pushop.repo.unfiltered()
586 remote = pushop.remote
586 remote = pushop.remote
587 ui.debug("checking for updated bookmarks\n")
587 ui.debug("checking for updated bookmarks\n")
588 ancestors = ()
588 ancestors = ()
589 if pushop.revs:
589 if pushop.revs:
590 revnums = map(repo.changelog.rev, pushop.revs)
590 revnums = map(repo.changelog.rev, pushop.revs)
591 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
591 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
592 remotebookmark = remote.listkeys('bookmarks')
592 remotebookmark = remote.listkeys('bookmarks')
593
593
594 explicit = set([repo._bookmarks.expandname(bookmark)
594 explicit = set([repo._bookmarks.expandname(bookmark)
595 for bookmark in pushop.bookmarks])
595 for bookmark in pushop.bookmarks])
596
596
597 remotebookmark = bookmod.unhexlifybookmarks(remotebookmark)
597 remotebookmark = bookmod.unhexlifybookmarks(remotebookmark)
598 comp = bookmod.comparebookmarks(repo, repo._bookmarks, remotebookmark)
598 comp = bookmod.comparebookmarks(repo, repo._bookmarks, remotebookmark)
599
599
600 def safehex(x):
600 def safehex(x):
601 if x is None:
601 if x is None:
602 return x
602 return x
603 return hex(x)
603 return hex(x)
604
604
605 def hexifycompbookmarks(bookmarks):
605 def hexifycompbookmarks(bookmarks):
606 for b, scid, dcid in bookmarks:
606 for b, scid, dcid in bookmarks:
607 yield b, safehex(scid), safehex(dcid)
607 yield b, safehex(scid), safehex(dcid)
608
608
609 comp = [hexifycompbookmarks(marks) for marks in comp]
609 comp = [hexifycompbookmarks(marks) for marks in comp]
610 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
610 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
611
611
612 for b, scid, dcid in advsrc:
612 for b, scid, dcid in advsrc:
613 if b in explicit:
613 if b in explicit:
614 explicit.remove(b)
614 explicit.remove(b)
615 if not ancestors or repo[scid].rev() in ancestors:
615 if not ancestors or repo[scid].rev() in ancestors:
616 pushop.outbookmarks.append((b, dcid, scid))
616 pushop.outbookmarks.append((b, dcid, scid))
617 # search added bookmark
617 # search added bookmark
618 for b, scid, dcid in addsrc:
618 for b, scid, dcid in addsrc:
619 if b in explicit:
619 if b in explicit:
620 explicit.remove(b)
620 explicit.remove(b)
621 pushop.outbookmarks.append((b, '', scid))
621 pushop.outbookmarks.append((b, '', scid))
622 # search for overwritten bookmark
622 # search for overwritten bookmark
623 for b, scid, dcid in list(advdst) + list(diverge) + list(differ):
623 for b, scid, dcid in list(advdst) + list(diverge) + list(differ):
624 if b in explicit:
624 if b in explicit:
625 explicit.remove(b)
625 explicit.remove(b)
626 pushop.outbookmarks.append((b, dcid, scid))
626 pushop.outbookmarks.append((b, dcid, scid))
627 # search for bookmark to delete
627 # search for bookmark to delete
628 for b, scid, dcid in adddst:
628 for b, scid, dcid in adddst:
629 if b in explicit:
629 if b in explicit:
630 explicit.remove(b)
630 explicit.remove(b)
631 # treat as "deleted locally"
631 # treat as "deleted locally"
632 pushop.outbookmarks.append((b, dcid, ''))
632 pushop.outbookmarks.append((b, dcid, ''))
633 # identical bookmarks shouldn't get reported
633 # identical bookmarks shouldn't get reported
634 for b, scid, dcid in same:
634 for b, scid, dcid in same:
635 if b in explicit:
635 if b in explicit:
636 explicit.remove(b)
636 explicit.remove(b)
637
637
638 if explicit:
638 if explicit:
639 explicit = sorted(explicit)
639 explicit = sorted(explicit)
640 # we should probably list all of them
640 # we should probably list all of them
641 ui.warn(_('bookmark %s does not exist on the local '
641 ui.warn(_('bookmark %s does not exist on the local '
642 'or remote repository!\n') % explicit[0])
642 'or remote repository!\n') % explicit[0])
643 pushop.bkresult = 2
643 pushop.bkresult = 2
644
644
645 pushop.outbookmarks.sort()
645 pushop.outbookmarks.sort()
646
646
647 def _pushcheckoutgoing(pushop):
647 def _pushcheckoutgoing(pushop):
648 outgoing = pushop.outgoing
648 outgoing = pushop.outgoing
649 unfi = pushop.repo.unfiltered()
649 unfi = pushop.repo.unfiltered()
650 if not outgoing.missing:
650 if not outgoing.missing:
651 # nothing to push
651 # nothing to push
652 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
652 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
653 return False
653 return False
654 # something to push
654 # something to push
655 if not pushop.force:
655 if not pushop.force:
656 # if repo.obsstore == False --> no obsolete
656 # if repo.obsstore == False --> no obsolete
657 # then, save the iteration
657 # then, save the iteration
658 if unfi.obsstore:
658 if unfi.obsstore:
659 # this message are here for 80 char limit reason
659 # this message are here for 80 char limit reason
660 mso = _("push includes obsolete changeset: %s!")
660 mso = _("push includes obsolete changeset: %s!")
661 mspd = _("push includes phase-divergent changeset: %s!")
661 mspd = _("push includes phase-divergent changeset: %s!")
662 mscd = _("push includes content-divergent changeset: %s!")
662 mscd = _("push includes content-divergent changeset: %s!")
663 mst = {"orphan": _("push includes orphan changeset: %s!"),
663 mst = {"orphan": _("push includes orphan changeset: %s!"),
664 "phase-divergent": mspd,
664 "phase-divergent": mspd,
665 "content-divergent": mscd}
665 "content-divergent": mscd}
666 # If we are to push if there is at least one
666 # If we are to push if there is at least one
667 # obsolete or unstable changeset in missing, at
667 # obsolete or unstable changeset in missing, at
668 # least one of the missinghead will be obsolete or
668 # least one of the missinghead will be obsolete or
669 # unstable. So checking heads only is ok
669 # unstable. So checking heads only is ok
670 for node in outgoing.missingheads:
670 for node in outgoing.missingheads:
671 ctx = unfi[node]
671 ctx = unfi[node]
672 if ctx.obsolete():
672 if ctx.obsolete():
673 raise error.Abort(mso % ctx)
673 raise error.Abort(mso % ctx)
674 elif ctx.isunstable():
674 elif ctx.isunstable():
675 # TODO print more than one instability in the abort
675 # TODO print more than one instability in the abort
676 # message
676 # message
677 raise error.Abort(mst[ctx.instabilities()[0]] % ctx)
677 raise error.Abort(mst[ctx.instabilities()[0]] % ctx)
678
678
679 discovery.checkheads(pushop)
679 discovery.checkheads(pushop)
680 return True
680 return True
681
681
682 # List of names of steps to perform for an outgoing bundle2, order matters.
682 # List of names of steps to perform for an outgoing bundle2, order matters.
683 b2partsgenorder = []
683 b2partsgenorder = []
684
684
685 # Mapping between step name and function
685 # Mapping between step name and function
686 #
686 #
687 # This exists to help extensions wrap steps if necessary
687 # This exists to help extensions wrap steps if necessary
688 b2partsgenmapping = {}
688 b2partsgenmapping = {}
689
689
690 def b2partsgenerator(stepname, idx=None):
690 def b2partsgenerator(stepname, idx=None):
691 """decorator for function generating bundle2 part
691 """decorator for function generating bundle2 part
692
692
693 The function is added to the step -> function mapping and appended to the
693 The function is added to the step -> function mapping and appended to the
694 list of steps. Beware that decorated functions will be added in order
694 list of steps. Beware that decorated functions will be added in order
695 (this may matter).
695 (this may matter).
696
696
697 You can only use this decorator for new steps, if you want to wrap a step
697 You can only use this decorator for new steps, if you want to wrap a step
698 from an extension, attack the b2partsgenmapping dictionary directly."""
698 from an extension, attack the b2partsgenmapping dictionary directly."""
699 def dec(func):
699 def dec(func):
700 assert stepname not in b2partsgenmapping
700 assert stepname not in b2partsgenmapping
701 b2partsgenmapping[stepname] = func
701 b2partsgenmapping[stepname] = func
702 if idx is None:
702 if idx is None:
703 b2partsgenorder.append(stepname)
703 b2partsgenorder.append(stepname)
704 else:
704 else:
705 b2partsgenorder.insert(idx, stepname)
705 b2partsgenorder.insert(idx, stepname)
706 return func
706 return func
707 return dec
707 return dec
708
708
709 def _pushb2ctxcheckheads(pushop, bundler):
709 def _pushb2ctxcheckheads(pushop, bundler):
710 """Generate race condition checking parts
710 """Generate race condition checking parts
711
711
712 Exists as an independent function to aid extensions
712 Exists as an independent function to aid extensions
713 """
713 """
714 # * 'force' do not check for push race,
714 # * 'force' do not check for push race,
715 # * if we don't push anything, there are nothing to check.
715 # * if we don't push anything, there are nothing to check.
716 if not pushop.force and pushop.outgoing.missingheads:
716 if not pushop.force and pushop.outgoing.missingheads:
717 allowunrelated = 'related' in bundler.capabilities.get('checkheads', ())
717 allowunrelated = 'related' in bundler.capabilities.get('checkheads', ())
718 emptyremote = pushop.pushbranchmap is None
718 emptyremote = pushop.pushbranchmap is None
719 if not allowunrelated or emptyremote:
719 if not allowunrelated or emptyremote:
720 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
720 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
721 else:
721 else:
722 affected = set()
722 affected = set()
723 for branch, heads in pushop.pushbranchmap.iteritems():
723 for branch, heads in pushop.pushbranchmap.iteritems():
724 remoteheads, newheads, unsyncedheads, discardedheads = heads
724 remoteheads, newheads, unsyncedheads, discardedheads = heads
725 if remoteheads is not None:
725 if remoteheads is not None:
726 remote = set(remoteheads)
726 remote = set(remoteheads)
727 affected |= set(discardedheads) & remote
727 affected |= set(discardedheads) & remote
728 affected |= remote - set(newheads)
728 affected |= remote - set(newheads)
729 if affected:
729 if affected:
730 data = iter(sorted(affected))
730 data = iter(sorted(affected))
731 bundler.newpart('check:updated-heads', data=data)
731 bundler.newpart('check:updated-heads', data=data)
732
732
733 @b2partsgenerator('changeset')
733 @b2partsgenerator('changeset')
734 def _pushb2ctx(pushop, bundler):
734 def _pushb2ctx(pushop, bundler):
735 """handle changegroup push through bundle2
735 """handle changegroup push through bundle2
736
736
737 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
737 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
738 """
738 """
739 if 'changesets' in pushop.stepsdone:
739 if 'changesets' in pushop.stepsdone:
740 return
740 return
741 pushop.stepsdone.add('changesets')
741 pushop.stepsdone.add('changesets')
742 # Send known heads to the server for race detection.
742 # Send known heads to the server for race detection.
743 if not _pushcheckoutgoing(pushop):
743 if not _pushcheckoutgoing(pushop):
744 return
744 return
745 pushop.repo.prepushoutgoinghooks(pushop)
745 pushop.repo.prepushoutgoinghooks(pushop)
746
746
747 _pushb2ctxcheckheads(pushop, bundler)
747 _pushb2ctxcheckheads(pushop, bundler)
748
748
749 b2caps = bundle2.bundle2caps(pushop.remote)
749 b2caps = bundle2.bundle2caps(pushop.remote)
750 version = '01'
750 version = '01'
751 cgversions = b2caps.get('changegroup')
751 cgversions = b2caps.get('changegroup')
752 if cgversions: # 3.1 and 3.2 ship with an empty value
752 if cgversions: # 3.1 and 3.2 ship with an empty value
753 cgversions = [v for v in cgversions
753 cgversions = [v for v in cgversions
754 if v in changegroup.supportedoutgoingversions(
754 if v in changegroup.supportedoutgoingversions(
755 pushop.repo)]
755 pushop.repo)]
756 if not cgversions:
756 if not cgversions:
757 raise ValueError(_('no common changegroup version'))
757 raise ValueError(_('no common changegroup version'))
758 version = max(cgversions)
758 version = max(cgversions)
759 cgstream = changegroup.makestream(pushop.repo, pushop.outgoing, version,
759 cgstream = changegroup.makestream(pushop.repo, pushop.outgoing, version,
760 'push')
760 'push')
761 cgpart = bundler.newpart('changegroup', data=cgstream)
761 cgpart = bundler.newpart('changegroup', data=cgstream)
762 if cgversions:
762 if cgversions:
763 cgpart.addparam('version', version)
763 cgpart.addparam('version', version)
764 if 'treemanifest' in pushop.repo.requirements:
764 if 'treemanifest' in pushop.repo.requirements:
765 cgpart.addparam('treemanifest', '1')
765 cgpart.addparam('treemanifest', '1')
766 def handlereply(op):
766 def handlereply(op):
767 """extract addchangegroup returns from server reply"""
767 """extract addchangegroup returns from server reply"""
768 cgreplies = op.records.getreplies(cgpart.id)
768 cgreplies = op.records.getreplies(cgpart.id)
769 assert len(cgreplies['changegroup']) == 1
769 assert len(cgreplies['changegroup']) == 1
770 pushop.cgresult = cgreplies['changegroup'][0]['return']
770 pushop.cgresult = cgreplies['changegroup'][0]['return']
771 return handlereply
771 return handlereply
772
772
773 @b2partsgenerator('phase')
773 @b2partsgenerator('phase')
774 def _pushb2phases(pushop, bundler):
774 def _pushb2phases(pushop, bundler):
775 """handle phase push through bundle2"""
775 """handle phase push through bundle2"""
776 if 'phases' in pushop.stepsdone:
776 if 'phases' in pushop.stepsdone:
777 return
777 return
778 b2caps = bundle2.bundle2caps(pushop.remote)
778 b2caps = bundle2.bundle2caps(pushop.remote)
779 if not 'pushkey' in b2caps:
779 if not 'pushkey' in b2caps:
780 return
780 return
781 pushop.stepsdone.add('phases')
781 pushop.stepsdone.add('phases')
782 part2node = []
782 part2node = []
783
783
784 def handlefailure(pushop, exc):
784 def handlefailure(pushop, exc):
785 targetid = int(exc.partid)
785 targetid = int(exc.partid)
786 for partid, node in part2node:
786 for partid, node in part2node:
787 if partid == targetid:
787 if partid == targetid:
788 raise error.Abort(_('updating %s to public failed') % node)
788 raise error.Abort(_('updating %s to public failed') % node)
789
789
790 enc = pushkey.encode
790 enc = pushkey.encode
791 for newremotehead in pushop.outdatedphases:
791 for newremotehead in pushop.outdatedphases:
792 part = bundler.newpart('pushkey')
792 part = bundler.newpart('pushkey')
793 part.addparam('namespace', enc('phases'))
793 part.addparam('namespace', enc('phases'))
794 part.addparam('key', enc(newremotehead.hex()))
794 part.addparam('key', enc(newremotehead.hex()))
795 part.addparam('old', enc('%d' % phases.draft))
795 part.addparam('old', enc('%d' % phases.draft))
796 part.addparam('new', enc('%d' % phases.public))
796 part.addparam('new', enc('%d' % phases.public))
797 part2node.append((part.id, newremotehead))
797 part2node.append((part.id, newremotehead))
798 pushop.pkfailcb[part.id] = handlefailure
798 pushop.pkfailcb[part.id] = handlefailure
799
799
800 def handlereply(op):
800 def handlereply(op):
801 for partid, node in part2node:
801 for partid, node in part2node:
802 partrep = op.records.getreplies(partid)
802 partrep = op.records.getreplies(partid)
803 results = partrep['pushkey']
803 results = partrep['pushkey']
804 assert len(results) <= 1
804 assert len(results) <= 1
805 msg = None
805 msg = None
806 if not results:
806 if not results:
807 msg = _('server ignored update of %s to public!\n') % node
807 msg = _('server ignored update of %s to public!\n') % node
808 elif not int(results[0]['return']):
808 elif not int(results[0]['return']):
809 msg = _('updating %s to public failed!\n') % node
809 msg = _('updating %s to public failed!\n') % node
810 if msg is not None:
810 if msg is not None:
811 pushop.ui.warn(msg)
811 pushop.ui.warn(msg)
812 return handlereply
812 return handlereply
813
813
814 @b2partsgenerator('obsmarkers')
814 @b2partsgenerator('obsmarkers')
815 def _pushb2obsmarkers(pushop, bundler):
815 def _pushb2obsmarkers(pushop, bundler):
816 if 'obsmarkers' in pushop.stepsdone:
816 if 'obsmarkers' in pushop.stepsdone:
817 return
817 return
818 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
818 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
819 if obsolete.commonversion(remoteversions) is None:
819 if obsolete.commonversion(remoteversions) is None:
820 return
820 return
821 pushop.stepsdone.add('obsmarkers')
821 pushop.stepsdone.add('obsmarkers')
822 if pushop.outobsmarkers:
822 if pushop.outobsmarkers:
823 markers = sorted(pushop.outobsmarkers)
823 markers = sorted(pushop.outobsmarkers)
824 bundle2.buildobsmarkerspart(bundler, markers)
824 bundle2.buildobsmarkerspart(bundler, markers)
825
825
826 @b2partsgenerator('bookmarks')
826 @b2partsgenerator('bookmarks')
827 def _pushb2bookmarks(pushop, bundler):
827 def _pushb2bookmarks(pushop, bundler):
828 """handle bookmark push through bundle2"""
828 """handle bookmark push through bundle2"""
829 if 'bookmarks' in pushop.stepsdone:
829 if 'bookmarks' in pushop.stepsdone:
830 return
830 return
831 b2caps = bundle2.bundle2caps(pushop.remote)
831 b2caps = bundle2.bundle2caps(pushop.remote)
832 if 'pushkey' not in b2caps:
832 if 'pushkey' not in b2caps:
833 return
833 return
834 pushop.stepsdone.add('bookmarks')
834 pushop.stepsdone.add('bookmarks')
835 part2book = []
835 part2book = []
836 enc = pushkey.encode
836 enc = pushkey.encode
837
837
838 def handlefailure(pushop, exc):
838 def handlefailure(pushop, exc):
839 targetid = int(exc.partid)
839 targetid = int(exc.partid)
840 for partid, book, action in part2book:
840 for partid, book, action in part2book:
841 if partid == targetid:
841 if partid == targetid:
842 raise error.Abort(bookmsgmap[action][1].rstrip() % book)
842 raise error.Abort(bookmsgmap[action][1].rstrip() % book)
843 # we should not be called for part we did not generated
843 # we should not be called for part we did not generated
844 assert False
844 assert False
845
845
846 for book, old, new in pushop.outbookmarks:
846 for book, old, new in pushop.outbookmarks:
847 part = bundler.newpart('pushkey')
847 part = bundler.newpart('pushkey')
848 part.addparam('namespace', enc('bookmarks'))
848 part.addparam('namespace', enc('bookmarks'))
849 part.addparam('key', enc(book))
849 part.addparam('key', enc(book))
850 part.addparam('old', enc(old))
850 part.addparam('old', enc(old))
851 part.addparam('new', enc(new))
851 part.addparam('new', enc(new))
852 action = 'update'
852 action = 'update'
853 if not old:
853 if not old:
854 action = 'export'
854 action = 'export'
855 elif not new:
855 elif not new:
856 action = 'delete'
856 action = 'delete'
857 part2book.append((part.id, book, action))
857 part2book.append((part.id, book, action))
858 pushop.pkfailcb[part.id] = handlefailure
858 pushop.pkfailcb[part.id] = handlefailure
859
859
860 def handlereply(op):
860 def handlereply(op):
861 ui = pushop.ui
861 ui = pushop.ui
862 for partid, book, action in part2book:
862 for partid, book, action in part2book:
863 partrep = op.records.getreplies(partid)
863 partrep = op.records.getreplies(partid)
864 results = partrep['pushkey']
864 results = partrep['pushkey']
865 assert len(results) <= 1
865 assert len(results) <= 1
866 if not results:
866 if not results:
867 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
867 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
868 else:
868 else:
869 ret = int(results[0]['return'])
869 ret = int(results[0]['return'])
870 if ret:
870 if ret:
871 ui.status(bookmsgmap[action][0] % book)
871 ui.status(bookmsgmap[action][0] % book)
872 else:
872 else:
873 ui.warn(bookmsgmap[action][1] % book)
873 ui.warn(bookmsgmap[action][1] % book)
874 if pushop.bkresult is not None:
874 if pushop.bkresult is not None:
875 pushop.bkresult = 1
875 pushop.bkresult = 1
876 return handlereply
876 return handlereply
877
877
878 @b2partsgenerator('pushvars', idx=0)
878 @b2partsgenerator('pushvars', idx=0)
879 def _getbundlesendvars(pushop, bundler):
879 def _getbundlesendvars(pushop, bundler):
880 '''send shellvars via bundle2'''
880 '''send shellvars via bundle2'''
881 pushvars = pushop.pushvars
881 pushvars = pushop.pushvars
882 if pushvars:
882 if pushvars:
883 shellvars = {}
883 shellvars = {}
884 for raw in pushvars:
884 for raw in pushvars:
885 if '=' not in raw:
885 if '=' not in raw:
886 msg = ("unable to parse variable '%s', should follow "
886 msg = ("unable to parse variable '%s', should follow "
887 "'KEY=VALUE' or 'KEY=' format")
887 "'KEY=VALUE' or 'KEY=' format")
888 raise error.Abort(msg % raw)
888 raise error.Abort(msg % raw)
889 k, v = raw.split('=', 1)
889 k, v = raw.split('=', 1)
890 shellvars[k] = v
890 shellvars[k] = v
891
891
892 part = bundler.newpart('pushvars')
892 part = bundler.newpart('pushvars')
893
893
894 for key, value in shellvars.iteritems():
894 for key, value in shellvars.iteritems():
895 part.addparam(key, value, mandatory=False)
895 part.addparam(key, value, mandatory=False)
896
896
897 def _pushbundle2(pushop):
897 def _pushbundle2(pushop):
898 """push data to the remote using bundle2
898 """push data to the remote using bundle2
899
899
900 The only currently supported type of data is changegroup but this will
900 The only currently supported type of data is changegroup but this will
901 evolve in the future."""
901 evolve in the future."""
902 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
902 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
903 pushback = (pushop.trmanager
903 pushback = (pushop.trmanager
904 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
904 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
905
905
906 # create reply capability
906 # create reply capability
907 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
907 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
908 allowpushback=pushback))
908 allowpushback=pushback))
909 bundler.newpart('replycaps', data=capsblob)
909 bundler.newpart('replycaps', data=capsblob)
910 replyhandlers = []
910 replyhandlers = []
911 for partgenname in b2partsgenorder:
911 for partgenname in b2partsgenorder:
912 partgen = b2partsgenmapping[partgenname]
912 partgen = b2partsgenmapping[partgenname]
913 ret = partgen(pushop, bundler)
913 ret = partgen(pushop, bundler)
914 if callable(ret):
914 if callable(ret):
915 replyhandlers.append(ret)
915 replyhandlers.append(ret)
916 # do not push if nothing to push
916 # do not push if nothing to push
917 if bundler.nbparts <= 1:
917 if bundler.nbparts <= 1:
918 return
918 return
919 stream = util.chunkbuffer(bundler.getchunks())
919 stream = util.chunkbuffer(bundler.getchunks())
920 try:
920 try:
921 try:
921 try:
922 reply = pushop.remote.unbundle(
922 reply = pushop.remote.unbundle(
923 stream, ['force'], pushop.remote.url())
923 stream, ['force'], pushop.remote.url())
924 except error.BundleValueError as exc:
924 except error.BundleValueError as exc:
925 raise error.Abort(_('missing support for %s') % exc)
925 raise error.Abort(_('missing support for %s') % exc)
926 try:
926 try:
927 trgetter = None
927 trgetter = None
928 if pushback:
928 if pushback:
929 trgetter = pushop.trmanager.transaction
929 trgetter = pushop.trmanager.transaction
930 op = bundle2.processbundle(pushop.repo, reply, trgetter)
930 op = bundle2.processbundle(pushop.repo, reply, trgetter)
931 except error.BundleValueError as exc:
931 except error.BundleValueError as exc:
932 raise error.Abort(_('missing support for %s') % exc)
932 raise error.Abort(_('missing support for %s') % exc)
933 except bundle2.AbortFromPart as exc:
933 except bundle2.AbortFromPart as exc:
934 pushop.ui.status(_('remote: %s\n') % exc)
934 pushop.ui.status(_('remote: %s\n') % exc)
935 if exc.hint is not None:
935 if exc.hint is not None:
936 pushop.ui.status(_('remote: %s\n') % ('(%s)' % exc.hint))
936 pushop.ui.status(_('remote: %s\n') % ('(%s)' % exc.hint))
937 raise error.Abort(_('push failed on remote'))
937 raise error.Abort(_('push failed on remote'))
938 except error.PushkeyFailed as exc:
938 except error.PushkeyFailed as exc:
939 partid = int(exc.partid)
939 partid = int(exc.partid)
940 if partid not in pushop.pkfailcb:
940 if partid not in pushop.pkfailcb:
941 raise
941 raise
942 pushop.pkfailcb[partid](pushop, exc)
942 pushop.pkfailcb[partid](pushop, exc)
943 for rephand in replyhandlers:
943 for rephand in replyhandlers:
944 rephand(op)
944 rephand(op)
945
945
946 def _pushchangeset(pushop):
946 def _pushchangeset(pushop):
947 """Make the actual push of changeset bundle to remote repo"""
947 """Make the actual push of changeset bundle to remote repo"""
948 if 'changesets' in pushop.stepsdone:
948 if 'changesets' in pushop.stepsdone:
949 return
949 return
950 pushop.stepsdone.add('changesets')
950 pushop.stepsdone.add('changesets')
951 if not _pushcheckoutgoing(pushop):
951 if not _pushcheckoutgoing(pushop):
952 return
952 return
953
953
954 # Should have verified this in push().
954 # Should have verified this in push().
955 assert pushop.remote.capable('unbundle')
955 assert pushop.remote.capable('unbundle')
956
956
957 pushop.repo.prepushoutgoinghooks(pushop)
957 pushop.repo.prepushoutgoinghooks(pushop)
958 outgoing = pushop.outgoing
958 outgoing = pushop.outgoing
959 # TODO: get bundlecaps from remote
959 # TODO: get bundlecaps from remote
960 bundlecaps = None
960 bundlecaps = None
961 # create a changegroup from local
961 # create a changegroup from local
962 if pushop.revs is None and not (outgoing.excluded
962 if pushop.revs is None and not (outgoing.excluded
963 or pushop.repo.changelog.filteredrevs):
963 or pushop.repo.changelog.filteredrevs):
964 # push everything,
964 # push everything,
965 # use the fast path, no race possible on push
965 # use the fast path, no race possible on push
966 cg = changegroup.makechangegroup(pushop.repo, outgoing, '01', 'push',
966 cg = changegroup.makechangegroup(pushop.repo, outgoing, '01', 'push',
967 fastpath=True, bundlecaps=bundlecaps)
967 fastpath=True, bundlecaps=bundlecaps)
968 else:
968 else:
969 cg = changegroup.makechangegroup(pushop.repo, outgoing, '01',
969 cg = changegroup.makechangegroup(pushop.repo, outgoing, '01',
970 'push', bundlecaps=bundlecaps)
970 'push', bundlecaps=bundlecaps)
971
971
972 # apply changegroup to remote
972 # apply changegroup to remote
973 # local repo finds heads on server, finds out what
973 # local repo finds heads on server, finds out what
974 # revs it must push. once revs transferred, if server
974 # revs it must push. once revs transferred, if server
975 # finds it has different heads (someone else won
975 # finds it has different heads (someone else won
976 # commit/push race), server aborts.
976 # commit/push race), server aborts.
977 if pushop.force:
977 if pushop.force:
978 remoteheads = ['force']
978 remoteheads = ['force']
979 else:
979 else:
980 remoteheads = pushop.remoteheads
980 remoteheads = pushop.remoteheads
981 # ssh: return remote's addchangegroup()
981 # ssh: return remote's addchangegroup()
982 # http: return remote's addchangegroup() or 0 for error
982 # http: return remote's addchangegroup() or 0 for error
983 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
983 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
984 pushop.repo.url())
984 pushop.repo.url())
985
985
986 def _pushsyncphase(pushop):
986 def _pushsyncphase(pushop):
987 """synchronise phase information locally and remotely"""
987 """synchronise phase information locally and remotely"""
988 cheads = pushop.commonheads
988 cheads = pushop.commonheads
989 # even when we don't push, exchanging phase data is useful
989 # even when we don't push, exchanging phase data is useful
990 remotephases = pushop.remote.listkeys('phases')
990 remotephases = pushop.remote.listkeys('phases')
991 if (pushop.ui.configbool('ui', '_usedassubrepo')
991 if (pushop.ui.configbool('ui', '_usedassubrepo')
992 and remotephases # server supports phases
992 and remotephases # server supports phases
993 and pushop.cgresult is None # nothing was pushed
993 and pushop.cgresult is None # nothing was pushed
994 and remotephases.get('publishing', False)):
994 and remotephases.get('publishing', False)):
995 # When:
995 # When:
996 # - this is a subrepo push
996 # - this is a subrepo push
997 # - and remote support phase
997 # - and remote support phase
998 # - and no changeset was pushed
998 # - and no changeset was pushed
999 # - and remote is publishing
999 # - and remote is publishing
1000 # We may be in issue 3871 case!
1000 # We may be in issue 3871 case!
1001 # We drop the possible phase synchronisation done by
1001 # We drop the possible phase synchronisation done by
1002 # courtesy to publish changesets possibly locally draft
1002 # courtesy to publish changesets possibly locally draft
1003 # on the remote.
1003 # on the remote.
1004 remotephases = {'publishing': 'True'}
1004 remotephases = {'publishing': 'True'}
1005 if not remotephases: # old server or public only reply from non-publishing
1005 if not remotephases: # old server or public only reply from non-publishing
1006 _localphasemove(pushop, cheads)
1006 _localphasemove(pushop, cheads)
1007 # don't push any phase data as there is nothing to push
1007 # don't push any phase data as there is nothing to push
1008 else:
1008 else:
1009 ana = phases.analyzeremotephases(pushop.repo, cheads,
1009 ana = phases.analyzeremotephases(pushop.repo, cheads,
1010 remotephases)
1010 remotephases)
1011 pheads, droots = ana
1011 pheads, droots = ana
1012 ### Apply remote phase on local
1012 ### Apply remote phase on local
1013 if remotephases.get('publishing', False):
1013 if remotephases.get('publishing', False):
1014 _localphasemove(pushop, cheads)
1014 _localphasemove(pushop, cheads)
1015 else: # publish = False
1015 else: # publish = False
1016 _localphasemove(pushop, pheads)
1016 _localphasemove(pushop, pheads)
1017 _localphasemove(pushop, cheads, phases.draft)
1017 _localphasemove(pushop, cheads, phases.draft)
1018 ### Apply local phase on remote
1018 ### Apply local phase on remote
1019
1019
1020 if pushop.cgresult:
1020 if pushop.cgresult:
1021 if 'phases' in pushop.stepsdone:
1021 if 'phases' in pushop.stepsdone:
1022 # phases already pushed though bundle2
1022 # phases already pushed though bundle2
1023 return
1023 return
1024 outdated = pushop.outdatedphases
1024 outdated = pushop.outdatedphases
1025 else:
1025 else:
1026 outdated = pushop.fallbackoutdatedphases
1026 outdated = pushop.fallbackoutdatedphases
1027
1027
1028 pushop.stepsdone.add('phases')
1028 pushop.stepsdone.add('phases')
1029
1029
1030 # filter heads already turned public by the push
1030 # filter heads already turned public by the push
1031 outdated = [c for c in outdated if c.node() not in pheads]
1031 outdated = [c for c in outdated if c.node() not in pheads]
1032 # fallback to independent pushkey command
1032 # fallback to independent pushkey command
1033 for newremotehead in outdated:
1033 for newremotehead in outdated:
1034 r = pushop.remote.pushkey('phases',
1034 r = pushop.remote.pushkey('phases',
1035 newremotehead.hex(),
1035 newremotehead.hex(),
1036 str(phases.draft),
1036 str(phases.draft),
1037 str(phases.public))
1037 str(phases.public))
1038 if not r:
1038 if not r:
1039 pushop.ui.warn(_('updating %s to public failed!\n')
1039 pushop.ui.warn(_('updating %s to public failed!\n')
1040 % newremotehead)
1040 % newremotehead)
1041
1041
1042 def _localphasemove(pushop, nodes, phase=phases.public):
1042 def _localphasemove(pushop, nodes, phase=phases.public):
1043 """move <nodes> to <phase> in the local source repo"""
1043 """move <nodes> to <phase> in the local source repo"""
1044 if pushop.trmanager:
1044 if pushop.trmanager:
1045 phases.advanceboundary(pushop.repo,
1045 phases.advanceboundary(pushop.repo,
1046 pushop.trmanager.transaction(),
1046 pushop.trmanager.transaction(),
1047 phase,
1047 phase,
1048 nodes)
1048 nodes)
1049 else:
1049 else:
1050 # repo is not locked, do not change any phases!
1050 # repo is not locked, do not change any phases!
1051 # Informs the user that phases should have been moved when
1051 # Informs the user that phases should have been moved when
1052 # applicable.
1052 # applicable.
1053 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
1053 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
1054 phasestr = phases.phasenames[phase]
1054 phasestr = phases.phasenames[phase]
1055 if actualmoves:
1055 if actualmoves:
1056 pushop.ui.status(_('cannot lock source repo, skipping '
1056 pushop.ui.status(_('cannot lock source repo, skipping '
1057 'local %s phase update\n') % phasestr)
1057 'local %s phase update\n') % phasestr)
1058
1058
1059 def _pushobsolete(pushop):
1059 def _pushobsolete(pushop):
1060 """utility function to push obsolete markers to a remote"""
1060 """utility function to push obsolete markers to a remote"""
1061 if 'obsmarkers' in pushop.stepsdone:
1061 if 'obsmarkers' in pushop.stepsdone:
1062 return
1062 return
1063 repo = pushop.repo
1063 repo = pushop.repo
1064 remote = pushop.remote
1064 remote = pushop.remote
1065 pushop.stepsdone.add('obsmarkers')
1065 pushop.stepsdone.add('obsmarkers')
1066 if pushop.outobsmarkers:
1066 if pushop.outobsmarkers:
1067 pushop.ui.debug('try to push obsolete markers to remote\n')
1067 pushop.ui.debug('try to push obsolete markers to remote\n')
1068 rslts = []
1068 rslts = []
1069 remotedata = obsolete._pushkeyescape(sorted(pushop.outobsmarkers))
1069 remotedata = obsolete._pushkeyescape(sorted(pushop.outobsmarkers))
1070 for key in sorted(remotedata, reverse=True):
1070 for key in sorted(remotedata, reverse=True):
1071 # reverse sort to ensure we end with dump0
1071 # reverse sort to ensure we end with dump0
1072 data = remotedata[key]
1072 data = remotedata[key]
1073 rslts.append(remote.pushkey('obsolete', key, '', data))
1073 rslts.append(remote.pushkey('obsolete', key, '', data))
1074 if [r for r in rslts if not r]:
1074 if [r for r in rslts if not r]:
1075 msg = _('failed to push some obsolete markers!\n')
1075 msg = _('failed to push some obsolete markers!\n')
1076 repo.ui.warn(msg)
1076 repo.ui.warn(msg)
1077
1077
1078 def _pushbookmark(pushop):
1078 def _pushbookmark(pushop):
1079 """Update bookmark position on remote"""
1079 """Update bookmark position on remote"""
1080 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
1080 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
1081 return
1081 return
1082 pushop.stepsdone.add('bookmarks')
1082 pushop.stepsdone.add('bookmarks')
1083 ui = pushop.ui
1083 ui = pushop.ui
1084 remote = pushop.remote
1084 remote = pushop.remote
1085
1085
1086 for b, old, new in pushop.outbookmarks:
1086 for b, old, new in pushop.outbookmarks:
1087 action = 'update'
1087 action = 'update'
1088 if not old:
1088 if not old:
1089 action = 'export'
1089 action = 'export'
1090 elif not new:
1090 elif not new:
1091 action = 'delete'
1091 action = 'delete'
1092 if remote.pushkey('bookmarks', b, old, new):
1092 if remote.pushkey('bookmarks', b, old, new):
1093 ui.status(bookmsgmap[action][0] % b)
1093 ui.status(bookmsgmap[action][0] % b)
1094 else:
1094 else:
1095 ui.warn(bookmsgmap[action][1] % b)
1095 ui.warn(bookmsgmap[action][1] % b)
1096 # discovery can have set the value form invalid entry
1096 # discovery can have set the value form invalid entry
1097 if pushop.bkresult is not None:
1097 if pushop.bkresult is not None:
1098 pushop.bkresult = 1
1098 pushop.bkresult = 1
1099
1099
1100 class pulloperation(object):
1100 class pulloperation(object):
1101 """A object that represent a single pull operation
1101 """A object that represent a single pull operation
1102
1102
1103 It purpose is to carry pull related state and very common operation.
1103 It purpose is to carry pull related state and very common operation.
1104
1104
1105 A new should be created at the beginning of each pull and discarded
1105 A new should be created at the beginning of each pull and discarded
1106 afterward.
1106 afterward.
1107 """
1107 """
1108
1108
1109 def __init__(self, repo, remote, heads=None, force=False, bookmarks=(),
1109 def __init__(self, repo, remote, heads=None, force=False, bookmarks=(),
1110 remotebookmarks=None, streamclonerequested=None):
1110 remotebookmarks=None, streamclonerequested=None):
1111 # repo we pull into
1111 # repo we pull into
1112 self.repo = repo
1112 self.repo = repo
1113 # repo we pull from
1113 # repo we pull from
1114 self.remote = remote
1114 self.remote = remote
1115 # revision we try to pull (None is "all")
1115 # revision we try to pull (None is "all")
1116 self.heads = heads
1116 self.heads = heads
1117 # bookmark pulled explicitly
1117 # bookmark pulled explicitly
1118 self.explicitbookmarks = [repo._bookmarks.expandname(bookmark)
1118 self.explicitbookmarks = [repo._bookmarks.expandname(bookmark)
1119 for bookmark in bookmarks]
1119 for bookmark in bookmarks]
1120 # do we force pull?
1120 # do we force pull?
1121 self.force = force
1121 self.force = force
1122 # whether a streaming clone was requested
1122 # whether a streaming clone was requested
1123 self.streamclonerequested = streamclonerequested
1123 self.streamclonerequested = streamclonerequested
1124 # transaction manager
1124 # transaction manager
1125 self.trmanager = None
1125 self.trmanager = None
1126 # set of common changeset between local and remote before pull
1126 # set of common changeset between local and remote before pull
1127 self.common = None
1127 self.common = None
1128 # set of pulled head
1128 # set of pulled head
1129 self.rheads = None
1129 self.rheads = None
1130 # list of missing changeset to fetch remotely
1130 # list of missing changeset to fetch remotely
1131 self.fetch = None
1131 self.fetch = None
1132 # remote bookmarks data
1132 # remote bookmarks data
1133 self.remotebookmarks = remotebookmarks
1133 self.remotebookmarks = remotebookmarks
1134 # result of changegroup pulling (used as return code by pull)
1134 # result of changegroup pulling (used as return code by pull)
1135 self.cgresult = None
1135 self.cgresult = None
1136 # list of step already done
1136 # list of step already done
1137 self.stepsdone = set()
1137 self.stepsdone = set()
1138 # Whether we attempted a clone from pre-generated bundles.
1138 # Whether we attempted a clone from pre-generated bundles.
1139 self.clonebundleattempted = False
1139 self.clonebundleattempted = False
1140
1140
1141 @util.propertycache
1141 @util.propertycache
1142 def pulledsubset(self):
1142 def pulledsubset(self):
1143 """heads of the set of changeset target by the pull"""
1143 """heads of the set of changeset target by the pull"""
1144 # compute target subset
1144 # compute target subset
1145 if self.heads is None:
1145 if self.heads is None:
1146 # We pulled every thing possible
1146 # We pulled every thing possible
1147 # sync on everything common
1147 # sync on everything common
1148 c = set(self.common)
1148 c = set(self.common)
1149 ret = list(self.common)
1149 ret = list(self.common)
1150 for n in self.rheads:
1150 for n in self.rheads:
1151 if n not in c:
1151 if n not in c:
1152 ret.append(n)
1152 ret.append(n)
1153 return ret
1153 return ret
1154 else:
1154 else:
1155 # We pulled a specific subset
1155 # We pulled a specific subset
1156 # sync on this subset
1156 # sync on this subset
1157 return self.heads
1157 return self.heads
1158
1158
1159 @util.propertycache
1159 @util.propertycache
1160 def canusebundle2(self):
1160 def canusebundle2(self):
1161 return not _forcebundle1(self)
1161 return not _forcebundle1(self)
1162
1162
1163 @util.propertycache
1163 @util.propertycache
1164 def remotebundle2caps(self):
1164 def remotebundle2caps(self):
1165 return bundle2.bundle2caps(self.remote)
1165 return bundle2.bundle2caps(self.remote)
1166
1166
1167 def gettransaction(self):
1167 def gettransaction(self):
1168 # deprecated; talk to trmanager directly
1168 # deprecated; talk to trmanager directly
1169 return self.trmanager.transaction()
1169 return self.trmanager.transaction()
1170
1170
1171 class transactionmanager(util.transactional):
1171 class transactionmanager(util.transactional):
1172 """An object to manage the life cycle of a transaction
1172 """An object to manage the life cycle of a transaction
1173
1173
1174 It creates the transaction on demand and calls the appropriate hooks when
1174 It creates the transaction on demand and calls the appropriate hooks when
1175 closing the transaction."""
1175 closing the transaction."""
1176 def __init__(self, repo, source, url):
1176 def __init__(self, repo, source, url):
1177 self.repo = repo
1177 self.repo = repo
1178 self.source = source
1178 self.source = source
1179 self.url = url
1179 self.url = url
1180 self._tr = None
1180 self._tr = None
1181
1181
1182 def transaction(self):
1182 def transaction(self):
1183 """Return an open transaction object, constructing if necessary"""
1183 """Return an open transaction object, constructing if necessary"""
1184 if not self._tr:
1184 if not self._tr:
1185 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
1185 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
1186 self._tr = self.repo.transaction(trname)
1186 self._tr = self.repo.transaction(trname)
1187 self._tr.hookargs['source'] = self.source
1187 self._tr.hookargs['source'] = self.source
1188 self._tr.hookargs['url'] = self.url
1188 self._tr.hookargs['url'] = self.url
1189 return self._tr
1189 return self._tr
1190
1190
1191 def close(self):
1191 def close(self):
1192 """close transaction if created"""
1192 """close transaction if created"""
1193 if self._tr is not None:
1193 if self._tr is not None:
1194 self._tr.close()
1194 self._tr.close()
1195
1195
1196 def release(self):
1196 def release(self):
1197 """release transaction if created"""
1197 """release transaction if created"""
1198 if self._tr is not None:
1198 if self._tr is not None:
1199 self._tr.release()
1199 self._tr.release()
1200
1200
1201 def pull(repo, remote, heads=None, force=False, bookmarks=(), opargs=None,
1201 def pull(repo, remote, heads=None, force=False, bookmarks=(), opargs=None,
1202 streamclonerequested=None):
1202 streamclonerequested=None):
1203 """Fetch repository data from a remote.
1203 """Fetch repository data from a remote.
1204
1204
1205 This is the main function used to retrieve data from a remote repository.
1205 This is the main function used to retrieve data from a remote repository.
1206
1206
1207 ``repo`` is the local repository to clone into.
1207 ``repo`` is the local repository to clone into.
1208 ``remote`` is a peer instance.
1208 ``remote`` is a peer instance.
1209 ``heads`` is an iterable of revisions we want to pull. ``None`` (the
1209 ``heads`` is an iterable of revisions we want to pull. ``None`` (the
1210 default) means to pull everything from the remote.
1210 default) means to pull everything from the remote.
1211 ``bookmarks`` is an iterable of bookmarks requesting to be pulled. By
1211 ``bookmarks`` is an iterable of bookmarks requesting to be pulled. By
1212 default, all remote bookmarks are pulled.
1212 default, all remote bookmarks are pulled.
1213 ``opargs`` are additional keyword arguments to pass to ``pulloperation``
1213 ``opargs`` are additional keyword arguments to pass to ``pulloperation``
1214 initialization.
1214 initialization.
1215 ``streamclonerequested`` is a boolean indicating whether a "streaming
1215 ``streamclonerequested`` is a boolean indicating whether a "streaming
1216 clone" is requested. A "streaming clone" is essentially a raw file copy
1216 clone" is requested. A "streaming clone" is essentially a raw file copy
1217 of revlogs from the server. This only works when the local repository is
1217 of revlogs from the server. This only works when the local repository is
1218 empty. The default value of ``None`` means to respect the server
1218 empty. The default value of ``None`` means to respect the server
1219 configuration for preferring stream clones.
1219 configuration for preferring stream clones.
1220
1220
1221 Returns the ``pulloperation`` created for this pull.
1221 Returns the ``pulloperation`` created for this pull.
1222 """
1222 """
1223 if opargs is None:
1223 if opargs is None:
1224 opargs = {}
1224 opargs = {}
1225 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks,
1225 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks,
1226 streamclonerequested=streamclonerequested, **opargs)
1226 streamclonerequested=streamclonerequested, **opargs)
1227
1227
1228 peerlocal = pullop.remote.local()
1228 peerlocal = pullop.remote.local()
1229 if peerlocal:
1229 if peerlocal:
1230 missing = set(peerlocal.requirements) - pullop.repo.supported
1230 missing = set(peerlocal.requirements) - pullop.repo.supported
1231 if missing:
1231 if missing:
1232 msg = _("required features are not"
1232 msg = _("required features are not"
1233 " supported in the destination:"
1233 " supported in the destination:"
1234 " %s") % (', '.join(sorted(missing)))
1234 " %s") % (', '.join(sorted(missing)))
1235 raise error.Abort(msg)
1235 raise error.Abort(msg)
1236
1236
1237 wlock = lock = None
1237 wlock = lock = None
1238 try:
1238 try:
1239 wlock = pullop.repo.wlock()
1239 wlock = pullop.repo.wlock()
1240 lock = pullop.repo.lock()
1240 lock = pullop.repo.lock()
1241 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
1241 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
1242 streamclone.maybeperformlegacystreamclone(pullop)
1242 streamclone.maybeperformlegacystreamclone(pullop)
1243 # This should ideally be in _pullbundle2(). However, it needs to run
1243 # This should ideally be in _pullbundle2(). However, it needs to run
1244 # before discovery to avoid extra work.
1244 # before discovery to avoid extra work.
1245 _maybeapplyclonebundle(pullop)
1245 _maybeapplyclonebundle(pullop)
1246 _pulldiscovery(pullop)
1246 _pulldiscovery(pullop)
1247 if pullop.canusebundle2:
1247 if pullop.canusebundle2:
1248 _pullbundle2(pullop)
1248 _pullbundle2(pullop)
1249 _pullchangeset(pullop)
1249 _pullchangeset(pullop)
1250 _pullphase(pullop)
1250 _pullphase(pullop)
1251 _pullbookmarks(pullop)
1251 _pullbookmarks(pullop)
1252 _pullobsolete(pullop)
1252 _pullobsolete(pullop)
1253 pullop.trmanager.close()
1253 pullop.trmanager.close()
1254 finally:
1254 finally:
1255 lockmod.release(pullop.trmanager, lock, wlock)
1255 lockmod.release(pullop.trmanager, lock, wlock)
1256
1256
1257 return pullop
1257 return pullop
1258
1258
1259 # list of steps to perform discovery before pull
1259 # list of steps to perform discovery before pull
1260 pulldiscoveryorder = []
1260 pulldiscoveryorder = []
1261
1261
1262 # Mapping between step name and function
1262 # Mapping between step name and function
1263 #
1263 #
1264 # This exists to help extensions wrap steps if necessary
1264 # This exists to help extensions wrap steps if necessary
1265 pulldiscoverymapping = {}
1265 pulldiscoverymapping = {}
1266
1266
1267 def pulldiscovery(stepname):
1267 def pulldiscovery(stepname):
1268 """decorator for function performing discovery before pull
1268 """decorator for function performing discovery before pull
1269
1269
1270 The function is added to the step -> function mapping and appended to the
1270 The function is added to the step -> function mapping and appended to the
1271 list of steps. Beware that decorated function will be added in order (this
1271 list of steps. Beware that decorated function will be added in order (this
1272 may matter).
1272 may matter).
1273
1273
1274 You can only use this decorator for a new step, if you want to wrap a step
1274 You can only use this decorator for a new step, if you want to wrap a step
1275 from an extension, change the pulldiscovery dictionary directly."""
1275 from an extension, change the pulldiscovery dictionary directly."""
1276 def dec(func):
1276 def dec(func):
1277 assert stepname not in pulldiscoverymapping
1277 assert stepname not in pulldiscoverymapping
1278 pulldiscoverymapping[stepname] = func
1278 pulldiscoverymapping[stepname] = func
1279 pulldiscoveryorder.append(stepname)
1279 pulldiscoveryorder.append(stepname)
1280 return func
1280 return func
1281 return dec
1281 return dec
1282
1282
1283 def _pulldiscovery(pullop):
1283 def _pulldiscovery(pullop):
1284 """Run all discovery steps"""
1284 """Run all discovery steps"""
1285 for stepname in pulldiscoveryorder:
1285 for stepname in pulldiscoveryorder:
1286 step = pulldiscoverymapping[stepname]
1286 step = pulldiscoverymapping[stepname]
1287 step(pullop)
1287 step(pullop)
1288
1288
1289 @pulldiscovery('b1:bookmarks')
1289 @pulldiscovery('b1:bookmarks')
1290 def _pullbookmarkbundle1(pullop):
1290 def _pullbookmarkbundle1(pullop):
1291 """fetch bookmark data in bundle1 case
1291 """fetch bookmark data in bundle1 case
1292
1292
1293 If not using bundle2, we have to fetch bookmarks before changeset
1293 If not using bundle2, we have to fetch bookmarks before changeset
1294 discovery to reduce the chance and impact of race conditions."""
1294 discovery to reduce the chance and impact of race conditions."""
1295 if pullop.remotebookmarks is not None:
1295 if pullop.remotebookmarks is not None:
1296 return
1296 return
1297 if pullop.canusebundle2 and 'listkeys' in pullop.remotebundle2caps:
1297 if pullop.canusebundle2 and 'listkeys' in pullop.remotebundle2caps:
1298 # all known bundle2 servers now support listkeys, but lets be nice with
1298 # all known bundle2 servers now support listkeys, but lets be nice with
1299 # new implementation.
1299 # new implementation.
1300 return
1300 return
1301 pullop.remotebookmarks = pullop.remote.listkeys('bookmarks')
1301 pullop.remotebookmarks = pullop.remote.listkeys('bookmarks')
1302
1302
1303
1303
1304 @pulldiscovery('changegroup')
1304 @pulldiscovery('changegroup')
1305 def _pulldiscoverychangegroup(pullop):
1305 def _pulldiscoverychangegroup(pullop):
1306 """discovery phase for the pull
1306 """discovery phase for the pull
1307
1307
1308 Current handle changeset discovery only, will change handle all discovery
1308 Current handle changeset discovery only, will change handle all discovery
1309 at some point."""
1309 at some point."""
1310 tmp = discovery.findcommonincoming(pullop.repo,
1310 tmp = discovery.findcommonincoming(pullop.repo,
1311 pullop.remote,
1311 pullop.remote,
1312 heads=pullop.heads,
1312 heads=pullop.heads,
1313 force=pullop.force)
1313 force=pullop.force)
1314 common, fetch, rheads = tmp
1314 common, fetch, rheads = tmp
1315 nm = pullop.repo.unfiltered().changelog.nodemap
1315 nm = pullop.repo.unfiltered().changelog.nodemap
1316 if fetch and rheads:
1316 if fetch and rheads:
1317 # If a remote heads is filtered locally, put in back in common.
1317 # If a remote heads is filtered locally, put in back in common.
1318 #
1318 #
1319 # This is a hackish solution to catch most of "common but locally
1319 # This is a hackish solution to catch most of "common but locally
1320 # hidden situation". We do not performs discovery on unfiltered
1320 # hidden situation". We do not performs discovery on unfiltered
1321 # repository because it end up doing a pathological amount of round
1321 # repository because it end up doing a pathological amount of round
1322 # trip for w huge amount of changeset we do not care about.
1322 # trip for w huge amount of changeset we do not care about.
1323 #
1323 #
1324 # If a set of such "common but filtered" changeset exist on the server
1324 # If a set of such "common but filtered" changeset exist on the server
1325 # but are not including a remote heads, we'll not be able to detect it,
1325 # but are not including a remote heads, we'll not be able to detect it,
1326 scommon = set(common)
1326 scommon = set(common)
1327 for n in rheads:
1327 for n in rheads:
1328 if n in nm:
1328 if n in nm:
1329 if n not in scommon:
1329 if n not in scommon:
1330 common.append(n)
1330 common.append(n)
1331 if set(rheads).issubset(set(common)):
1331 if set(rheads).issubset(set(common)):
1332 fetch = []
1332 fetch = []
1333 pullop.common = common
1333 pullop.common = common
1334 pullop.fetch = fetch
1334 pullop.fetch = fetch
1335 pullop.rheads = rheads
1335 pullop.rheads = rheads
1336
1336
1337 def _pullbundle2(pullop):
1337 def _pullbundle2(pullop):
1338 """pull data using bundle2
1338 """pull data using bundle2
1339
1339
1340 For now, the only supported data are changegroup."""
1340 For now, the only supported data are changegroup."""
1341 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
1341 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
1342
1342
1343 # At the moment we don't do stream clones over bundle2. If that is
1343 # At the moment we don't do stream clones over bundle2. If that is
1344 # implemented then here's where the check for that will go.
1344 # implemented then here's where the check for that will go.
1345 streaming = False
1345 streaming = False
1346
1346
1347 # pulling changegroup
1347 # pulling changegroup
1348 pullop.stepsdone.add('changegroup')
1348 pullop.stepsdone.add('changegroup')
1349
1349
1350 kwargs['common'] = pullop.common
1350 kwargs['common'] = pullop.common
1351 kwargs['heads'] = pullop.heads or pullop.rheads
1351 kwargs['heads'] = pullop.heads or pullop.rheads
1352 kwargs['cg'] = pullop.fetch
1352 kwargs['cg'] = pullop.fetch
1353
1353
1354 ui = pullop.repo.ui
1354 ui = pullop.repo.ui
1355 legacyphase = 'phases' in ui.configlist('devel', 'legacy.exchange')
1355 legacyphase = 'phases' in ui.configlist('devel', 'legacy.exchange')
1356 if (not legacyphase and 'heads' in pullop.remotebundle2caps.get('phases')):
1356 if (not legacyphase and 'heads' in pullop.remotebundle2caps.get('phases')):
1357 kwargs['phases'] = True
1357 kwargs['phases'] = True
1358 pullop.stepsdone.add('phases')
1358 pullop.stepsdone.add('phases')
1359
1359
1360 if 'listkeys' in pullop.remotebundle2caps:
1360 if 'listkeys' in pullop.remotebundle2caps:
1361 if 'phases' not in pullop.stepsdone:
1361 if 'phases' not in pullop.stepsdone:
1362 kwargs['listkeys'] = ['phases']
1362 kwargs['listkeys'] = ['phases']
1363 if pullop.remotebookmarks is None:
1363 if pullop.remotebookmarks is None:
1364 # make sure to always includes bookmark data when migrating
1364 # make sure to always includes bookmark data when migrating
1365 # `hg incoming --bundle` to using this function.
1365 # `hg incoming --bundle` to using this function.
1366 kwargs.setdefault('listkeys', []).append('bookmarks')
1366 kwargs.setdefault('listkeys', []).append('bookmarks')
1367
1367
1368 # If this is a full pull / clone and the server supports the clone bundles
1368 # If this is a full pull / clone and the server supports the clone bundles
1369 # feature, tell the server whether we attempted a clone bundle. The
1369 # feature, tell the server whether we attempted a clone bundle. The
1370 # presence of this flag indicates the client supports clone bundles. This
1370 # presence of this flag indicates the client supports clone bundles. This
1371 # will enable the server to treat clients that support clone bundles
1371 # will enable the server to treat clients that support clone bundles
1372 # differently from those that don't.
1372 # differently from those that don't.
1373 if (pullop.remote.capable('clonebundles')
1373 if (pullop.remote.capable('clonebundles')
1374 and pullop.heads is None and list(pullop.common) == [nullid]):
1374 and pullop.heads is None and list(pullop.common) == [nullid]):
1375 kwargs['cbattempted'] = pullop.clonebundleattempted
1375 kwargs['cbattempted'] = pullop.clonebundleattempted
1376
1376
1377 if streaming:
1377 if streaming:
1378 pullop.repo.ui.status(_('streaming all changes\n'))
1378 pullop.repo.ui.status(_('streaming all changes\n'))
1379 elif not pullop.fetch:
1379 elif not pullop.fetch:
1380 pullop.repo.ui.status(_("no changes found\n"))
1380 pullop.repo.ui.status(_("no changes found\n"))
1381 pullop.cgresult = 0
1381 pullop.cgresult = 0
1382 else:
1382 else:
1383 if pullop.heads is None and list(pullop.common) == [nullid]:
1383 if pullop.heads is None and list(pullop.common) == [nullid]:
1384 pullop.repo.ui.status(_("requesting all changes\n"))
1384 pullop.repo.ui.status(_("requesting all changes\n"))
1385 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1385 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1386 remoteversions = bundle2.obsmarkersversion(pullop.remotebundle2caps)
1386 remoteversions = bundle2.obsmarkersversion(pullop.remotebundle2caps)
1387 if obsolete.commonversion(remoteversions) is not None:
1387 if obsolete.commonversion(remoteversions) is not None:
1388 kwargs['obsmarkers'] = True
1388 kwargs['obsmarkers'] = True
1389 pullop.stepsdone.add('obsmarkers')
1389 pullop.stepsdone.add('obsmarkers')
1390 _pullbundle2extraprepare(pullop, kwargs)
1390 _pullbundle2extraprepare(pullop, kwargs)
1391 bundle = pullop.remote.getbundle('pull', **pycompat.strkwargs(kwargs))
1391 bundle = pullop.remote.getbundle('pull', **pycompat.strkwargs(kwargs))
1392 try:
1392 try:
1393 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
1393 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
1394 except bundle2.AbortFromPart as exc:
1394 except bundle2.AbortFromPart as exc:
1395 pullop.repo.ui.status(_('remote: abort: %s\n') % exc)
1395 pullop.repo.ui.status(_('remote: abort: %s\n') % exc)
1396 raise error.Abort(_('pull failed on remote'), hint=exc.hint)
1396 raise error.Abort(_('pull failed on remote'), hint=exc.hint)
1397 except error.BundleValueError as exc:
1397 except error.BundleValueError as exc:
1398 raise error.Abort(_('missing support for %s') % exc)
1398 raise error.Abort(_('missing support for %s') % exc)
1399
1399
1400 if pullop.fetch:
1400 if pullop.fetch:
1401 pullop.cgresult = bundle2.combinechangegroupresults(op)
1401 pullop.cgresult = bundle2.combinechangegroupresults(op)
1402
1402
1403 # If the bundle had a phase-heads part, then phase exchange is already done
1404 if op.records['phase-heads']:
1405 pullop.stepsdone.add('phases')
1406
1407 # processing phases change
1403 # processing phases change
1408 for namespace, value in op.records['listkeys']:
1404 for namespace, value in op.records['listkeys']:
1409 if namespace == 'phases':
1405 if namespace == 'phases':
1410 _pullapplyphases(pullop, value)
1406 _pullapplyphases(pullop, value)
1411
1407
1412 # processing bookmark update
1408 # processing bookmark update
1413 for namespace, value in op.records['listkeys']:
1409 for namespace, value in op.records['listkeys']:
1414 if namespace == 'bookmarks':
1410 if namespace == 'bookmarks':
1415 pullop.remotebookmarks = value
1411 pullop.remotebookmarks = value
1416
1412
1417 # bookmark data were either already there or pulled in the bundle
1413 # bookmark data were either already there or pulled in the bundle
1418 if pullop.remotebookmarks is not None:
1414 if pullop.remotebookmarks is not None:
1419 _pullbookmarks(pullop)
1415 _pullbookmarks(pullop)
1420
1416
1421 def _pullbundle2extraprepare(pullop, kwargs):
1417 def _pullbundle2extraprepare(pullop, kwargs):
1422 """hook function so that extensions can extend the getbundle call"""
1418 """hook function so that extensions can extend the getbundle call"""
1423 pass
1419 pass
1424
1420
1425 def _pullchangeset(pullop):
1421 def _pullchangeset(pullop):
1426 """pull changeset from unbundle into the local repo"""
1422 """pull changeset from unbundle into the local repo"""
1427 # We delay the open of the transaction as late as possible so we
1423 # We delay the open of the transaction as late as possible so we
1428 # don't open transaction for nothing or you break future useful
1424 # don't open transaction for nothing or you break future useful
1429 # rollback call
1425 # rollback call
1430 if 'changegroup' in pullop.stepsdone:
1426 if 'changegroup' in pullop.stepsdone:
1431 return
1427 return
1432 pullop.stepsdone.add('changegroup')
1428 pullop.stepsdone.add('changegroup')
1433 if not pullop.fetch:
1429 if not pullop.fetch:
1434 pullop.repo.ui.status(_("no changes found\n"))
1430 pullop.repo.ui.status(_("no changes found\n"))
1435 pullop.cgresult = 0
1431 pullop.cgresult = 0
1436 return
1432 return
1437 tr = pullop.gettransaction()
1433 tr = pullop.gettransaction()
1438 if pullop.heads is None and list(pullop.common) == [nullid]:
1434 if pullop.heads is None and list(pullop.common) == [nullid]:
1439 pullop.repo.ui.status(_("requesting all changes\n"))
1435 pullop.repo.ui.status(_("requesting all changes\n"))
1440 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1436 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1441 # issue1320, avoid a race if remote changed after discovery
1437 # issue1320, avoid a race if remote changed after discovery
1442 pullop.heads = pullop.rheads
1438 pullop.heads = pullop.rheads
1443
1439
1444 if pullop.remote.capable('getbundle'):
1440 if pullop.remote.capable('getbundle'):
1445 # TODO: get bundlecaps from remote
1441 # TODO: get bundlecaps from remote
1446 cg = pullop.remote.getbundle('pull', common=pullop.common,
1442 cg = pullop.remote.getbundle('pull', common=pullop.common,
1447 heads=pullop.heads or pullop.rheads)
1443 heads=pullop.heads or pullop.rheads)
1448 elif pullop.heads is None:
1444 elif pullop.heads is None:
1449 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1445 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1450 elif not pullop.remote.capable('changegroupsubset'):
1446 elif not pullop.remote.capable('changegroupsubset'):
1451 raise error.Abort(_("partial pull cannot be done because "
1447 raise error.Abort(_("partial pull cannot be done because "
1452 "other repository doesn't support "
1448 "other repository doesn't support "
1453 "changegroupsubset."))
1449 "changegroupsubset."))
1454 else:
1450 else:
1455 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1451 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1456 bundleop = bundle2.applybundle(pullop.repo, cg, tr, 'pull',
1452 bundleop = bundle2.applybundle(pullop.repo, cg, tr, 'pull',
1457 pullop.remote.url())
1453 pullop.remote.url())
1458 pullop.cgresult = bundle2.combinechangegroupresults(bundleop)
1454 pullop.cgresult = bundle2.combinechangegroupresults(bundleop)
1459
1455
1460 def _pullphase(pullop):
1456 def _pullphase(pullop):
1461 # Get remote phases data from remote
1457 # Get remote phases data from remote
1462 if 'phases' in pullop.stepsdone:
1458 if 'phases' in pullop.stepsdone:
1463 return
1459 return
1464 remotephases = pullop.remote.listkeys('phases')
1460 remotephases = pullop.remote.listkeys('phases')
1465 _pullapplyphases(pullop, remotephases)
1461 _pullapplyphases(pullop, remotephases)
1466
1462
1467 def _pullapplyphases(pullop, remotephases):
1463 def _pullapplyphases(pullop, remotephases):
1468 """apply phase movement from observed remote state"""
1464 """apply phase movement from observed remote state"""
1469 if 'phases' in pullop.stepsdone:
1465 if 'phases' in pullop.stepsdone:
1470 return
1466 return
1471 pullop.stepsdone.add('phases')
1467 pullop.stepsdone.add('phases')
1472 publishing = bool(remotephases.get('publishing', False))
1468 publishing = bool(remotephases.get('publishing', False))
1473 if remotephases and not publishing:
1469 if remotephases and not publishing:
1474 # remote is new and non-publishing
1470 # remote is new and non-publishing
1475 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1471 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1476 pullop.pulledsubset,
1472 pullop.pulledsubset,
1477 remotephases)
1473 remotephases)
1478 dheads = pullop.pulledsubset
1474 dheads = pullop.pulledsubset
1479 else:
1475 else:
1480 # Remote is old or publishing all common changesets
1476 # Remote is old or publishing all common changesets
1481 # should be seen as public
1477 # should be seen as public
1482 pheads = pullop.pulledsubset
1478 pheads = pullop.pulledsubset
1483 dheads = []
1479 dheads = []
1484 unfi = pullop.repo.unfiltered()
1480 unfi = pullop.repo.unfiltered()
1485 phase = unfi._phasecache.phase
1481 phase = unfi._phasecache.phase
1486 rev = unfi.changelog.nodemap.get
1482 rev = unfi.changelog.nodemap.get
1487 public = phases.public
1483 public = phases.public
1488 draft = phases.draft
1484 draft = phases.draft
1489
1485
1490 # exclude changesets already public locally and update the others
1486 # exclude changesets already public locally and update the others
1491 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1487 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1492 if pheads:
1488 if pheads:
1493 tr = pullop.gettransaction()
1489 tr = pullop.gettransaction()
1494 phases.advanceboundary(pullop.repo, tr, public, pheads)
1490 phases.advanceboundary(pullop.repo, tr, public, pheads)
1495
1491
1496 # exclude changesets already draft locally and update the others
1492 # exclude changesets already draft locally and update the others
1497 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1493 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1498 if dheads:
1494 if dheads:
1499 tr = pullop.gettransaction()
1495 tr = pullop.gettransaction()
1500 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1496 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1501
1497
1502 def _pullbookmarks(pullop):
1498 def _pullbookmarks(pullop):
1503 """process the remote bookmark information to update the local one"""
1499 """process the remote bookmark information to update the local one"""
1504 if 'bookmarks' in pullop.stepsdone:
1500 if 'bookmarks' in pullop.stepsdone:
1505 return
1501 return
1506 pullop.stepsdone.add('bookmarks')
1502 pullop.stepsdone.add('bookmarks')
1507 repo = pullop.repo
1503 repo = pullop.repo
1508 remotebookmarks = pullop.remotebookmarks
1504 remotebookmarks = pullop.remotebookmarks
1509 remotebookmarks = bookmod.unhexlifybookmarks(remotebookmarks)
1505 remotebookmarks = bookmod.unhexlifybookmarks(remotebookmarks)
1510 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1506 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1511 pullop.remote.url(),
1507 pullop.remote.url(),
1512 pullop.gettransaction,
1508 pullop.gettransaction,
1513 explicit=pullop.explicitbookmarks)
1509 explicit=pullop.explicitbookmarks)
1514
1510
1515 def _pullobsolete(pullop):
1511 def _pullobsolete(pullop):
1516 """utility function to pull obsolete markers from a remote
1512 """utility function to pull obsolete markers from a remote
1517
1513
1518 The `gettransaction` is function that return the pull transaction, creating
1514 The `gettransaction` is function that return the pull transaction, creating
1519 one if necessary. We return the transaction to inform the calling code that
1515 one if necessary. We return the transaction to inform the calling code that
1520 a new transaction have been created (when applicable).
1516 a new transaction have been created (when applicable).
1521
1517
1522 Exists mostly to allow overriding for experimentation purpose"""
1518 Exists mostly to allow overriding for experimentation purpose"""
1523 if 'obsmarkers' in pullop.stepsdone:
1519 if 'obsmarkers' in pullop.stepsdone:
1524 return
1520 return
1525 pullop.stepsdone.add('obsmarkers')
1521 pullop.stepsdone.add('obsmarkers')
1526 tr = None
1522 tr = None
1527 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1523 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1528 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1524 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1529 remoteobs = pullop.remote.listkeys('obsolete')
1525 remoteobs = pullop.remote.listkeys('obsolete')
1530 if 'dump0' in remoteobs:
1526 if 'dump0' in remoteobs:
1531 tr = pullop.gettransaction()
1527 tr = pullop.gettransaction()
1532 markers = []
1528 markers = []
1533 for key in sorted(remoteobs, reverse=True):
1529 for key in sorted(remoteobs, reverse=True):
1534 if key.startswith('dump'):
1530 if key.startswith('dump'):
1535 data = util.b85decode(remoteobs[key])
1531 data = util.b85decode(remoteobs[key])
1536 version, newmarks = obsolete._readmarkers(data)
1532 version, newmarks = obsolete._readmarkers(data)
1537 markers += newmarks
1533 markers += newmarks
1538 if markers:
1534 if markers:
1539 pullop.repo.obsstore.add(tr, markers)
1535 pullop.repo.obsstore.add(tr, markers)
1540 pullop.repo.invalidatevolatilesets()
1536 pullop.repo.invalidatevolatilesets()
1541 return tr
1537 return tr
1542
1538
1543 def caps20to10(repo):
1539 def caps20to10(repo):
1544 """return a set with appropriate options to use bundle20 during getbundle"""
1540 """return a set with appropriate options to use bundle20 during getbundle"""
1545 caps = {'HG20'}
1541 caps = {'HG20'}
1546 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1542 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1547 caps.add('bundle2=' + urlreq.quote(capsblob))
1543 caps.add('bundle2=' + urlreq.quote(capsblob))
1548 return caps
1544 return caps
1549
1545
1550 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1546 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1551 getbundle2partsorder = []
1547 getbundle2partsorder = []
1552
1548
1553 # Mapping between step name and function
1549 # Mapping between step name and function
1554 #
1550 #
1555 # This exists to help extensions wrap steps if necessary
1551 # This exists to help extensions wrap steps if necessary
1556 getbundle2partsmapping = {}
1552 getbundle2partsmapping = {}
1557
1553
1558 def getbundle2partsgenerator(stepname, idx=None):
1554 def getbundle2partsgenerator(stepname, idx=None):
1559 """decorator for function generating bundle2 part for getbundle
1555 """decorator for function generating bundle2 part for getbundle
1560
1556
1561 The function is added to the step -> function mapping and appended to the
1557 The function is added to the step -> function mapping and appended to the
1562 list of steps. Beware that decorated functions will be added in order
1558 list of steps. Beware that decorated functions will be added in order
1563 (this may matter).
1559 (this may matter).
1564
1560
1565 You can only use this decorator for new steps, if you want to wrap a step
1561 You can only use this decorator for new steps, if you want to wrap a step
1566 from an extension, attack the getbundle2partsmapping dictionary directly."""
1562 from an extension, attack the getbundle2partsmapping dictionary directly."""
1567 def dec(func):
1563 def dec(func):
1568 assert stepname not in getbundle2partsmapping
1564 assert stepname not in getbundle2partsmapping
1569 getbundle2partsmapping[stepname] = func
1565 getbundle2partsmapping[stepname] = func
1570 if idx is None:
1566 if idx is None:
1571 getbundle2partsorder.append(stepname)
1567 getbundle2partsorder.append(stepname)
1572 else:
1568 else:
1573 getbundle2partsorder.insert(idx, stepname)
1569 getbundle2partsorder.insert(idx, stepname)
1574 return func
1570 return func
1575 return dec
1571 return dec
1576
1572
1577 def bundle2requested(bundlecaps):
1573 def bundle2requested(bundlecaps):
1578 if bundlecaps is not None:
1574 if bundlecaps is not None:
1579 return any(cap.startswith('HG2') for cap in bundlecaps)
1575 return any(cap.startswith('HG2') for cap in bundlecaps)
1580 return False
1576 return False
1581
1577
1582 def getbundlechunks(repo, source, heads=None, common=None, bundlecaps=None,
1578 def getbundlechunks(repo, source, heads=None, common=None, bundlecaps=None,
1583 **kwargs):
1579 **kwargs):
1584 """Return chunks constituting a bundle's raw data.
1580 """Return chunks constituting a bundle's raw data.
1585
1581
1586 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
1582 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
1587 passed.
1583 passed.
1588
1584
1589 Returns an iterator over raw chunks (of varying sizes).
1585 Returns an iterator over raw chunks (of varying sizes).
1590 """
1586 """
1591 kwargs = pycompat.byteskwargs(kwargs)
1587 kwargs = pycompat.byteskwargs(kwargs)
1592 usebundle2 = bundle2requested(bundlecaps)
1588 usebundle2 = bundle2requested(bundlecaps)
1593 # bundle10 case
1589 # bundle10 case
1594 if not usebundle2:
1590 if not usebundle2:
1595 if bundlecaps and not kwargs.get('cg', True):
1591 if bundlecaps and not kwargs.get('cg', True):
1596 raise ValueError(_('request for bundle10 must include changegroup'))
1592 raise ValueError(_('request for bundle10 must include changegroup'))
1597
1593
1598 if kwargs:
1594 if kwargs:
1599 raise ValueError(_('unsupported getbundle arguments: %s')
1595 raise ValueError(_('unsupported getbundle arguments: %s')
1600 % ', '.join(sorted(kwargs.keys())))
1596 % ', '.join(sorted(kwargs.keys())))
1601 outgoing = _computeoutgoing(repo, heads, common)
1597 outgoing = _computeoutgoing(repo, heads, common)
1602 return changegroup.makestream(repo, outgoing, '01', source,
1598 return changegroup.makestream(repo, outgoing, '01', source,
1603 bundlecaps=bundlecaps)
1599 bundlecaps=bundlecaps)
1604
1600
1605 # bundle20 case
1601 # bundle20 case
1606 b2caps = {}
1602 b2caps = {}
1607 for bcaps in bundlecaps:
1603 for bcaps in bundlecaps:
1608 if bcaps.startswith('bundle2='):
1604 if bcaps.startswith('bundle2='):
1609 blob = urlreq.unquote(bcaps[len('bundle2='):])
1605 blob = urlreq.unquote(bcaps[len('bundle2='):])
1610 b2caps.update(bundle2.decodecaps(blob))
1606 b2caps.update(bundle2.decodecaps(blob))
1611 bundler = bundle2.bundle20(repo.ui, b2caps)
1607 bundler = bundle2.bundle20(repo.ui, b2caps)
1612
1608
1613 kwargs['heads'] = heads
1609 kwargs['heads'] = heads
1614 kwargs['common'] = common
1610 kwargs['common'] = common
1615
1611
1616 for name in getbundle2partsorder:
1612 for name in getbundle2partsorder:
1617 func = getbundle2partsmapping[name]
1613 func = getbundle2partsmapping[name]
1618 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1614 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1619 **pycompat.strkwargs(kwargs))
1615 **pycompat.strkwargs(kwargs))
1620
1616
1621 return bundler.getchunks()
1617 return bundler.getchunks()
1622
1618
1623 @getbundle2partsgenerator('changegroup')
1619 @getbundle2partsgenerator('changegroup')
1624 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1620 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1625 b2caps=None, heads=None, common=None, **kwargs):
1621 b2caps=None, heads=None, common=None, **kwargs):
1626 """add a changegroup part to the requested bundle"""
1622 """add a changegroup part to the requested bundle"""
1627 cgstream = None
1623 cgstream = None
1628 if kwargs.get('cg', True):
1624 if kwargs.get('cg', True):
1629 # build changegroup bundle here.
1625 # build changegroup bundle here.
1630 version = '01'
1626 version = '01'
1631 cgversions = b2caps.get('changegroup')
1627 cgversions = b2caps.get('changegroup')
1632 if cgversions: # 3.1 and 3.2 ship with an empty value
1628 if cgversions: # 3.1 and 3.2 ship with an empty value
1633 cgversions = [v for v in cgversions
1629 cgversions = [v for v in cgversions
1634 if v in changegroup.supportedoutgoingversions(repo)]
1630 if v in changegroup.supportedoutgoingversions(repo)]
1635 if not cgversions:
1631 if not cgversions:
1636 raise ValueError(_('no common changegroup version'))
1632 raise ValueError(_('no common changegroup version'))
1637 version = max(cgversions)
1633 version = max(cgversions)
1638 outgoing = _computeoutgoing(repo, heads, common)
1634 outgoing = _computeoutgoing(repo, heads, common)
1639 if outgoing.missing:
1635 if outgoing.missing:
1640 cgstream = changegroup.makestream(repo, outgoing, version, source,
1636 cgstream = changegroup.makestream(repo, outgoing, version, source,
1641 bundlecaps=bundlecaps)
1637 bundlecaps=bundlecaps)
1642
1638
1643 if cgstream:
1639 if cgstream:
1644 part = bundler.newpart('changegroup', data=cgstream)
1640 part = bundler.newpart('changegroup', data=cgstream)
1645 if cgversions:
1641 if cgversions:
1646 part.addparam('version', version)
1642 part.addparam('version', version)
1647 part.addparam('nbchanges', '%d' % len(outgoing.missing),
1643 part.addparam('nbchanges', '%d' % len(outgoing.missing),
1648 mandatory=False)
1644 mandatory=False)
1649 if 'treemanifest' in repo.requirements:
1645 if 'treemanifest' in repo.requirements:
1650 part.addparam('treemanifest', '1')
1646 part.addparam('treemanifest', '1')
1651
1647
1652 @getbundle2partsgenerator('listkeys')
1648 @getbundle2partsgenerator('listkeys')
1653 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1649 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1654 b2caps=None, **kwargs):
1650 b2caps=None, **kwargs):
1655 """add parts containing listkeys namespaces to the requested bundle"""
1651 """add parts containing listkeys namespaces to the requested bundle"""
1656 listkeys = kwargs.get('listkeys', ())
1652 listkeys = kwargs.get('listkeys', ())
1657 for namespace in listkeys:
1653 for namespace in listkeys:
1658 part = bundler.newpart('listkeys')
1654 part = bundler.newpart('listkeys')
1659 part.addparam('namespace', namespace)
1655 part.addparam('namespace', namespace)
1660 keys = repo.listkeys(namespace).items()
1656 keys = repo.listkeys(namespace).items()
1661 part.data = pushkey.encodekeys(keys)
1657 part.data = pushkey.encodekeys(keys)
1662
1658
1663 @getbundle2partsgenerator('obsmarkers')
1659 @getbundle2partsgenerator('obsmarkers')
1664 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1660 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1665 b2caps=None, heads=None, **kwargs):
1661 b2caps=None, heads=None, **kwargs):
1666 """add an obsolescence markers part to the requested bundle"""
1662 """add an obsolescence markers part to the requested bundle"""
1667 if kwargs.get('obsmarkers', False):
1663 if kwargs.get('obsmarkers', False):
1668 if heads is None:
1664 if heads is None:
1669 heads = repo.heads()
1665 heads = repo.heads()
1670 subset = [c.node() for c in repo.set('::%ln', heads)]
1666 subset = [c.node() for c in repo.set('::%ln', heads)]
1671 markers = repo.obsstore.relevantmarkers(subset)
1667 markers = repo.obsstore.relevantmarkers(subset)
1672 markers = sorted(markers)
1668 markers = sorted(markers)
1673 bundle2.buildobsmarkerspart(bundler, markers)
1669 bundle2.buildobsmarkerspart(bundler, markers)
1674
1670
1675 @getbundle2partsgenerator('phases')
1671 @getbundle2partsgenerator('phases')
1676 def _getbundlephasespart(bundler, repo, source, bundlecaps=None,
1672 def _getbundlephasespart(bundler, repo, source, bundlecaps=None,
1677 b2caps=None, heads=None, **kwargs):
1673 b2caps=None, heads=None, **kwargs):
1678 """add phase heads part to the requested bundle"""
1674 """add phase heads part to the requested bundle"""
1679 if kwargs.get('phases', False):
1675 if kwargs.get('phases', False):
1680 if not 'heads' in b2caps.get('phases'):
1676 if not 'heads' in b2caps.get('phases'):
1681 raise ValueError(_('no common phases exchange method'))
1677 raise ValueError(_('no common phases exchange method'))
1682 if heads is None:
1678 if heads is None:
1683 heads = repo.heads()
1679 heads = repo.heads()
1684
1680
1685 headsbyphase = collections.defaultdict(set)
1681 headsbyphase = collections.defaultdict(set)
1686 if repo.publishing():
1682 if repo.publishing():
1687 headsbyphase[phases.public] = heads
1683 headsbyphase[phases.public] = heads
1688 else:
1684 else:
1689 # find the appropriate heads to move
1685 # find the appropriate heads to move
1690
1686
1691 phase = repo._phasecache.phase
1687 phase = repo._phasecache.phase
1692 node = repo.changelog.node
1688 node = repo.changelog.node
1693 rev = repo.changelog.rev
1689 rev = repo.changelog.rev
1694 for h in heads:
1690 for h in heads:
1695 headsbyphase[phase(repo, rev(h))].add(h)
1691 headsbyphase[phase(repo, rev(h))].add(h)
1696 seenphases = list(headsbyphase.keys())
1692 seenphases = list(headsbyphase.keys())
1697
1693
1698 # We do not handle anything but public and draft phase for now)
1694 # We do not handle anything but public and draft phase for now)
1699 if seenphases:
1695 if seenphases:
1700 assert max(seenphases) <= phases.draft
1696 assert max(seenphases) <= phases.draft
1701
1697
1702 # if client is pulling non-public changesets, we need to find
1698 # if client is pulling non-public changesets, we need to find
1703 # intermediate public heads.
1699 # intermediate public heads.
1704 draftheads = headsbyphase.get(phases.draft, set())
1700 draftheads = headsbyphase.get(phases.draft, set())
1705 if draftheads:
1701 if draftheads:
1706 publicheads = headsbyphase.get(phases.public, set())
1702 publicheads = headsbyphase.get(phases.public, set())
1707
1703
1708 revset = 'heads(only(%ln, %ln) and public())'
1704 revset = 'heads(only(%ln, %ln) and public())'
1709 extraheads = repo.revs(revset, draftheads, publicheads)
1705 extraheads = repo.revs(revset, draftheads, publicheads)
1710 for r in extraheads:
1706 for r in extraheads:
1711 headsbyphase[phases.public].add(node(r))
1707 headsbyphase[phases.public].add(node(r))
1712
1708
1713 # transform data in a format used by the encoding function
1709 # transform data in a format used by the encoding function
1714 phasemapping = []
1710 phasemapping = []
1715 for phase in phases.allphases:
1711 for phase in phases.allphases:
1716 phasemapping.append(sorted(headsbyphase[phase]))
1712 phasemapping.append(sorted(headsbyphase[phase]))
1717
1713
1718 # generate the actual part
1714 # generate the actual part
1719 phasedata = phases.binaryencode(phasemapping)
1715 phasedata = phases.binaryencode(phasemapping)
1720 bundler.newpart('phase-heads', data=phasedata)
1716 bundler.newpart('phase-heads', data=phasedata)
1721
1717
1722 @getbundle2partsgenerator('hgtagsfnodes')
1718 @getbundle2partsgenerator('hgtagsfnodes')
1723 def _getbundletagsfnodes(bundler, repo, source, bundlecaps=None,
1719 def _getbundletagsfnodes(bundler, repo, source, bundlecaps=None,
1724 b2caps=None, heads=None, common=None,
1720 b2caps=None, heads=None, common=None,
1725 **kwargs):
1721 **kwargs):
1726 """Transfer the .hgtags filenodes mapping.
1722 """Transfer the .hgtags filenodes mapping.
1727
1723
1728 Only values for heads in this bundle will be transferred.
1724 Only values for heads in this bundle will be transferred.
1729
1725
1730 The part data consists of pairs of 20 byte changeset node and .hgtags
1726 The part data consists of pairs of 20 byte changeset node and .hgtags
1731 filenodes raw values.
1727 filenodes raw values.
1732 """
1728 """
1733 # Don't send unless:
1729 # Don't send unless:
1734 # - changeset are being exchanged,
1730 # - changeset are being exchanged,
1735 # - the client supports it.
1731 # - the client supports it.
1736 if not (kwargs.get('cg', True) and 'hgtagsfnodes' in b2caps):
1732 if not (kwargs.get('cg', True) and 'hgtagsfnodes' in b2caps):
1737 return
1733 return
1738
1734
1739 outgoing = _computeoutgoing(repo, heads, common)
1735 outgoing = _computeoutgoing(repo, heads, common)
1740 bundle2.addparttagsfnodescache(repo, bundler, outgoing)
1736 bundle2.addparttagsfnodescache(repo, bundler, outgoing)
1741
1737
1742 def _getbookmarks(repo, **kwargs):
1738 def _getbookmarks(repo, **kwargs):
1743 """Returns bookmark to node mapping.
1739 """Returns bookmark to node mapping.
1744
1740
1745 This function is primarily used to generate `bookmarks` bundle2 part.
1741 This function is primarily used to generate `bookmarks` bundle2 part.
1746 It is a separate function in order to make it easy to wrap it
1742 It is a separate function in order to make it easy to wrap it
1747 in extensions. Passing `kwargs` to the function makes it easy to
1743 in extensions. Passing `kwargs` to the function makes it easy to
1748 add new parameters in extensions.
1744 add new parameters in extensions.
1749 """
1745 """
1750
1746
1751 return dict(bookmod.listbinbookmarks(repo))
1747 return dict(bookmod.listbinbookmarks(repo))
1752
1748
1753 def check_heads(repo, their_heads, context):
1749 def check_heads(repo, their_heads, context):
1754 """check if the heads of a repo have been modified
1750 """check if the heads of a repo have been modified
1755
1751
1756 Used by peer for unbundling.
1752 Used by peer for unbundling.
1757 """
1753 """
1758 heads = repo.heads()
1754 heads = repo.heads()
1759 heads_hash = hashlib.sha1(''.join(sorted(heads))).digest()
1755 heads_hash = hashlib.sha1(''.join(sorted(heads))).digest()
1760 if not (their_heads == ['force'] or their_heads == heads or
1756 if not (their_heads == ['force'] or their_heads == heads or
1761 their_heads == ['hashed', heads_hash]):
1757 their_heads == ['hashed', heads_hash]):
1762 # someone else committed/pushed/unbundled while we
1758 # someone else committed/pushed/unbundled while we
1763 # were transferring data
1759 # were transferring data
1764 raise error.PushRaced('repository changed while %s - '
1760 raise error.PushRaced('repository changed while %s - '
1765 'please try again' % context)
1761 'please try again' % context)
1766
1762
1767 def unbundle(repo, cg, heads, source, url):
1763 def unbundle(repo, cg, heads, source, url):
1768 """Apply a bundle to a repo.
1764 """Apply a bundle to a repo.
1769
1765
1770 this function makes sure the repo is locked during the application and have
1766 this function makes sure the repo is locked during the application and have
1771 mechanism to check that no push race occurred between the creation of the
1767 mechanism to check that no push race occurred between the creation of the
1772 bundle and its application.
1768 bundle and its application.
1773
1769
1774 If the push was raced as PushRaced exception is raised."""
1770 If the push was raced as PushRaced exception is raised."""
1775 r = 0
1771 r = 0
1776 # need a transaction when processing a bundle2 stream
1772 # need a transaction when processing a bundle2 stream
1777 # [wlock, lock, tr] - needs to be an array so nested functions can modify it
1773 # [wlock, lock, tr] - needs to be an array so nested functions can modify it
1778 lockandtr = [None, None, None]
1774 lockandtr = [None, None, None]
1779 recordout = None
1775 recordout = None
1780 # quick fix for output mismatch with bundle2 in 3.4
1776 # quick fix for output mismatch with bundle2 in 3.4
1781 captureoutput = repo.ui.configbool('experimental', 'bundle2-output-capture')
1777 captureoutput = repo.ui.configbool('experimental', 'bundle2-output-capture')
1782 if url.startswith('remote:http:') or url.startswith('remote:https:'):
1778 if url.startswith('remote:http:') or url.startswith('remote:https:'):
1783 captureoutput = True
1779 captureoutput = True
1784 try:
1780 try:
1785 # note: outside bundle1, 'heads' is expected to be empty and this
1781 # note: outside bundle1, 'heads' is expected to be empty and this
1786 # 'check_heads' call wil be a no-op
1782 # 'check_heads' call wil be a no-op
1787 check_heads(repo, heads, 'uploading changes')
1783 check_heads(repo, heads, 'uploading changes')
1788 # push can proceed
1784 # push can proceed
1789 if not isinstance(cg, bundle2.unbundle20):
1785 if not isinstance(cg, bundle2.unbundle20):
1790 # legacy case: bundle1 (changegroup 01)
1786 # legacy case: bundle1 (changegroup 01)
1791 txnname = "\n".join([source, util.hidepassword(url)])
1787 txnname = "\n".join([source, util.hidepassword(url)])
1792 with repo.lock(), repo.transaction(txnname) as tr:
1788 with repo.lock(), repo.transaction(txnname) as tr:
1793 op = bundle2.applybundle(repo, cg, tr, source, url)
1789 op = bundle2.applybundle(repo, cg, tr, source, url)
1794 r = bundle2.combinechangegroupresults(op)
1790 r = bundle2.combinechangegroupresults(op)
1795 else:
1791 else:
1796 r = None
1792 r = None
1797 try:
1793 try:
1798 def gettransaction():
1794 def gettransaction():
1799 if not lockandtr[2]:
1795 if not lockandtr[2]:
1800 lockandtr[0] = repo.wlock()
1796 lockandtr[0] = repo.wlock()
1801 lockandtr[1] = repo.lock()
1797 lockandtr[1] = repo.lock()
1802 lockandtr[2] = repo.transaction(source)
1798 lockandtr[2] = repo.transaction(source)
1803 lockandtr[2].hookargs['source'] = source
1799 lockandtr[2].hookargs['source'] = source
1804 lockandtr[2].hookargs['url'] = url
1800 lockandtr[2].hookargs['url'] = url
1805 lockandtr[2].hookargs['bundle2'] = '1'
1801 lockandtr[2].hookargs['bundle2'] = '1'
1806 return lockandtr[2]
1802 return lockandtr[2]
1807
1803
1808 # Do greedy locking by default until we're satisfied with lazy
1804 # Do greedy locking by default until we're satisfied with lazy
1809 # locking.
1805 # locking.
1810 if not repo.ui.configbool('experimental', 'bundle2lazylocking'):
1806 if not repo.ui.configbool('experimental', 'bundle2lazylocking'):
1811 gettransaction()
1807 gettransaction()
1812
1808
1813 op = bundle2.bundleoperation(repo, gettransaction,
1809 op = bundle2.bundleoperation(repo, gettransaction,
1814 captureoutput=captureoutput)
1810 captureoutput=captureoutput)
1815 try:
1811 try:
1816 op = bundle2.processbundle(repo, cg, op=op)
1812 op = bundle2.processbundle(repo, cg, op=op)
1817 finally:
1813 finally:
1818 r = op.reply
1814 r = op.reply
1819 if captureoutput and r is not None:
1815 if captureoutput and r is not None:
1820 repo.ui.pushbuffer(error=True, subproc=True)
1816 repo.ui.pushbuffer(error=True, subproc=True)
1821 def recordout(output):
1817 def recordout(output):
1822 r.newpart('output', data=output, mandatory=False)
1818 r.newpart('output', data=output, mandatory=False)
1823 if lockandtr[2] is not None:
1819 if lockandtr[2] is not None:
1824 lockandtr[2].close()
1820 lockandtr[2].close()
1825 except BaseException as exc:
1821 except BaseException as exc:
1826 exc.duringunbundle2 = True
1822 exc.duringunbundle2 = True
1827 if captureoutput and r is not None:
1823 if captureoutput and r is not None:
1828 parts = exc._bundle2salvagedoutput = r.salvageoutput()
1824 parts = exc._bundle2salvagedoutput = r.salvageoutput()
1829 def recordout(output):
1825 def recordout(output):
1830 part = bundle2.bundlepart('output', data=output,
1826 part = bundle2.bundlepart('output', data=output,
1831 mandatory=False)
1827 mandatory=False)
1832 parts.append(part)
1828 parts.append(part)
1833 raise
1829 raise
1834 finally:
1830 finally:
1835 lockmod.release(lockandtr[2], lockandtr[1], lockandtr[0])
1831 lockmod.release(lockandtr[2], lockandtr[1], lockandtr[0])
1836 if recordout is not None:
1832 if recordout is not None:
1837 recordout(repo.ui.popbuffer())
1833 recordout(repo.ui.popbuffer())
1838 return r
1834 return r
1839
1835
1840 def _maybeapplyclonebundle(pullop):
1836 def _maybeapplyclonebundle(pullop):
1841 """Apply a clone bundle from a remote, if possible."""
1837 """Apply a clone bundle from a remote, if possible."""
1842
1838
1843 repo = pullop.repo
1839 repo = pullop.repo
1844 remote = pullop.remote
1840 remote = pullop.remote
1845
1841
1846 if not repo.ui.configbool('ui', 'clonebundles'):
1842 if not repo.ui.configbool('ui', 'clonebundles'):
1847 return
1843 return
1848
1844
1849 # Only run if local repo is empty.
1845 # Only run if local repo is empty.
1850 if len(repo):
1846 if len(repo):
1851 return
1847 return
1852
1848
1853 if pullop.heads:
1849 if pullop.heads:
1854 return
1850 return
1855
1851
1856 if not remote.capable('clonebundles'):
1852 if not remote.capable('clonebundles'):
1857 return
1853 return
1858
1854
1859 res = remote._call('clonebundles')
1855 res = remote._call('clonebundles')
1860
1856
1861 # If we call the wire protocol command, that's good enough to record the
1857 # If we call the wire protocol command, that's good enough to record the
1862 # attempt.
1858 # attempt.
1863 pullop.clonebundleattempted = True
1859 pullop.clonebundleattempted = True
1864
1860
1865 entries = parseclonebundlesmanifest(repo, res)
1861 entries = parseclonebundlesmanifest(repo, res)
1866 if not entries:
1862 if not entries:
1867 repo.ui.note(_('no clone bundles available on remote; '
1863 repo.ui.note(_('no clone bundles available on remote; '
1868 'falling back to regular clone\n'))
1864 'falling back to regular clone\n'))
1869 return
1865 return
1870
1866
1871 entries = filterclonebundleentries(repo, entries)
1867 entries = filterclonebundleentries(repo, entries)
1872 if not entries:
1868 if not entries:
1873 # There is a thundering herd concern here. However, if a server
1869 # There is a thundering herd concern here. However, if a server
1874 # operator doesn't advertise bundles appropriate for its clients,
1870 # operator doesn't advertise bundles appropriate for its clients,
1875 # they deserve what's coming. Furthermore, from a client's
1871 # they deserve what's coming. Furthermore, from a client's
1876 # perspective, no automatic fallback would mean not being able to
1872 # perspective, no automatic fallback would mean not being able to
1877 # clone!
1873 # clone!
1878 repo.ui.warn(_('no compatible clone bundles available on server; '
1874 repo.ui.warn(_('no compatible clone bundles available on server; '
1879 'falling back to regular clone\n'))
1875 'falling back to regular clone\n'))
1880 repo.ui.warn(_('(you may want to report this to the server '
1876 repo.ui.warn(_('(you may want to report this to the server '
1881 'operator)\n'))
1877 'operator)\n'))
1882 return
1878 return
1883
1879
1884 entries = sortclonebundleentries(repo.ui, entries)
1880 entries = sortclonebundleentries(repo.ui, entries)
1885
1881
1886 url = entries[0]['URL']
1882 url = entries[0]['URL']
1887 repo.ui.status(_('applying clone bundle from %s\n') % url)
1883 repo.ui.status(_('applying clone bundle from %s\n') % url)
1888 if trypullbundlefromurl(repo.ui, repo, url):
1884 if trypullbundlefromurl(repo.ui, repo, url):
1889 repo.ui.status(_('finished applying clone bundle\n'))
1885 repo.ui.status(_('finished applying clone bundle\n'))
1890 # Bundle failed.
1886 # Bundle failed.
1891 #
1887 #
1892 # We abort by default to avoid the thundering herd of
1888 # We abort by default to avoid the thundering herd of
1893 # clients flooding a server that was expecting expensive
1889 # clients flooding a server that was expecting expensive
1894 # clone load to be offloaded.
1890 # clone load to be offloaded.
1895 elif repo.ui.configbool('ui', 'clonebundlefallback'):
1891 elif repo.ui.configbool('ui', 'clonebundlefallback'):
1896 repo.ui.warn(_('falling back to normal clone\n'))
1892 repo.ui.warn(_('falling back to normal clone\n'))
1897 else:
1893 else:
1898 raise error.Abort(_('error applying bundle'),
1894 raise error.Abort(_('error applying bundle'),
1899 hint=_('if this error persists, consider contacting '
1895 hint=_('if this error persists, consider contacting '
1900 'the server operator or disable clone '
1896 'the server operator or disable clone '
1901 'bundles via '
1897 'bundles via '
1902 '"--config ui.clonebundles=false"'))
1898 '"--config ui.clonebundles=false"'))
1903
1899
1904 def parseclonebundlesmanifest(repo, s):
1900 def parseclonebundlesmanifest(repo, s):
1905 """Parses the raw text of a clone bundles manifest.
1901 """Parses the raw text of a clone bundles manifest.
1906
1902
1907 Returns a list of dicts. The dicts have a ``URL`` key corresponding
1903 Returns a list of dicts. The dicts have a ``URL`` key corresponding
1908 to the URL and other keys are the attributes for the entry.
1904 to the URL and other keys are the attributes for the entry.
1909 """
1905 """
1910 m = []
1906 m = []
1911 for line in s.splitlines():
1907 for line in s.splitlines():
1912 fields = line.split()
1908 fields = line.split()
1913 if not fields:
1909 if not fields:
1914 continue
1910 continue
1915 attrs = {'URL': fields[0]}
1911 attrs = {'URL': fields[0]}
1916 for rawattr in fields[1:]:
1912 for rawattr in fields[1:]:
1917 key, value = rawattr.split('=', 1)
1913 key, value = rawattr.split('=', 1)
1918 key = urlreq.unquote(key)
1914 key = urlreq.unquote(key)
1919 value = urlreq.unquote(value)
1915 value = urlreq.unquote(value)
1920 attrs[key] = value
1916 attrs[key] = value
1921
1917
1922 # Parse BUNDLESPEC into components. This makes client-side
1918 # Parse BUNDLESPEC into components. This makes client-side
1923 # preferences easier to specify since you can prefer a single
1919 # preferences easier to specify since you can prefer a single
1924 # component of the BUNDLESPEC.
1920 # component of the BUNDLESPEC.
1925 if key == 'BUNDLESPEC':
1921 if key == 'BUNDLESPEC':
1926 try:
1922 try:
1927 comp, version, params = parsebundlespec(repo, value,
1923 comp, version, params = parsebundlespec(repo, value,
1928 externalnames=True)
1924 externalnames=True)
1929 attrs['COMPRESSION'] = comp
1925 attrs['COMPRESSION'] = comp
1930 attrs['VERSION'] = version
1926 attrs['VERSION'] = version
1931 except error.InvalidBundleSpecification:
1927 except error.InvalidBundleSpecification:
1932 pass
1928 pass
1933 except error.UnsupportedBundleSpecification:
1929 except error.UnsupportedBundleSpecification:
1934 pass
1930 pass
1935
1931
1936 m.append(attrs)
1932 m.append(attrs)
1937
1933
1938 return m
1934 return m
1939
1935
1940 def filterclonebundleentries(repo, entries):
1936 def filterclonebundleentries(repo, entries):
1941 """Remove incompatible clone bundle manifest entries.
1937 """Remove incompatible clone bundle manifest entries.
1942
1938
1943 Accepts a list of entries parsed with ``parseclonebundlesmanifest``
1939 Accepts a list of entries parsed with ``parseclonebundlesmanifest``
1944 and returns a new list consisting of only the entries that this client
1940 and returns a new list consisting of only the entries that this client
1945 should be able to apply.
1941 should be able to apply.
1946
1942
1947 There is no guarantee we'll be able to apply all returned entries because
1943 There is no guarantee we'll be able to apply all returned entries because
1948 the metadata we use to filter on may be missing or wrong.
1944 the metadata we use to filter on may be missing or wrong.
1949 """
1945 """
1950 newentries = []
1946 newentries = []
1951 for entry in entries:
1947 for entry in entries:
1952 spec = entry.get('BUNDLESPEC')
1948 spec = entry.get('BUNDLESPEC')
1953 if spec:
1949 if spec:
1954 try:
1950 try:
1955 parsebundlespec(repo, spec, strict=True)
1951 parsebundlespec(repo, spec, strict=True)
1956 except error.InvalidBundleSpecification as e:
1952 except error.InvalidBundleSpecification as e:
1957 repo.ui.debug(str(e) + '\n')
1953 repo.ui.debug(str(e) + '\n')
1958 continue
1954 continue
1959 except error.UnsupportedBundleSpecification as e:
1955 except error.UnsupportedBundleSpecification as e:
1960 repo.ui.debug('filtering %s because unsupported bundle '
1956 repo.ui.debug('filtering %s because unsupported bundle '
1961 'spec: %s\n' % (entry['URL'], str(e)))
1957 'spec: %s\n' % (entry['URL'], str(e)))
1962 continue
1958 continue
1963
1959
1964 if 'REQUIRESNI' in entry and not sslutil.hassni:
1960 if 'REQUIRESNI' in entry and not sslutil.hassni:
1965 repo.ui.debug('filtering %s because SNI not supported\n' %
1961 repo.ui.debug('filtering %s because SNI not supported\n' %
1966 entry['URL'])
1962 entry['URL'])
1967 continue
1963 continue
1968
1964
1969 newentries.append(entry)
1965 newentries.append(entry)
1970
1966
1971 return newentries
1967 return newentries
1972
1968
1973 class clonebundleentry(object):
1969 class clonebundleentry(object):
1974 """Represents an item in a clone bundles manifest.
1970 """Represents an item in a clone bundles manifest.
1975
1971
1976 This rich class is needed to support sorting since sorted() in Python 3
1972 This rich class is needed to support sorting since sorted() in Python 3
1977 doesn't support ``cmp`` and our comparison is complex enough that ``key=``
1973 doesn't support ``cmp`` and our comparison is complex enough that ``key=``
1978 won't work.
1974 won't work.
1979 """
1975 """
1980
1976
1981 def __init__(self, value, prefers):
1977 def __init__(self, value, prefers):
1982 self.value = value
1978 self.value = value
1983 self.prefers = prefers
1979 self.prefers = prefers
1984
1980
1985 def _cmp(self, other):
1981 def _cmp(self, other):
1986 for prefkey, prefvalue in self.prefers:
1982 for prefkey, prefvalue in self.prefers:
1987 avalue = self.value.get(prefkey)
1983 avalue = self.value.get(prefkey)
1988 bvalue = other.value.get(prefkey)
1984 bvalue = other.value.get(prefkey)
1989
1985
1990 # Special case for b missing attribute and a matches exactly.
1986 # Special case for b missing attribute and a matches exactly.
1991 if avalue is not None and bvalue is None and avalue == prefvalue:
1987 if avalue is not None and bvalue is None and avalue == prefvalue:
1992 return -1
1988 return -1
1993
1989
1994 # Special case for a missing attribute and b matches exactly.
1990 # Special case for a missing attribute and b matches exactly.
1995 if bvalue is not None and avalue is None and bvalue == prefvalue:
1991 if bvalue is not None and avalue is None and bvalue == prefvalue:
1996 return 1
1992 return 1
1997
1993
1998 # We can't compare unless attribute present on both.
1994 # We can't compare unless attribute present on both.
1999 if avalue is None or bvalue is None:
1995 if avalue is None or bvalue is None:
2000 continue
1996 continue
2001
1997
2002 # Same values should fall back to next attribute.
1998 # Same values should fall back to next attribute.
2003 if avalue == bvalue:
1999 if avalue == bvalue:
2004 continue
2000 continue
2005
2001
2006 # Exact matches come first.
2002 # Exact matches come first.
2007 if avalue == prefvalue:
2003 if avalue == prefvalue:
2008 return -1
2004 return -1
2009 if bvalue == prefvalue:
2005 if bvalue == prefvalue:
2010 return 1
2006 return 1
2011
2007
2012 # Fall back to next attribute.
2008 # Fall back to next attribute.
2013 continue
2009 continue
2014
2010
2015 # If we got here we couldn't sort by attributes and prefers. Fall
2011 # If we got here we couldn't sort by attributes and prefers. Fall
2016 # back to index order.
2012 # back to index order.
2017 return 0
2013 return 0
2018
2014
2019 def __lt__(self, other):
2015 def __lt__(self, other):
2020 return self._cmp(other) < 0
2016 return self._cmp(other) < 0
2021
2017
2022 def __gt__(self, other):
2018 def __gt__(self, other):
2023 return self._cmp(other) > 0
2019 return self._cmp(other) > 0
2024
2020
2025 def __eq__(self, other):
2021 def __eq__(self, other):
2026 return self._cmp(other) == 0
2022 return self._cmp(other) == 0
2027
2023
2028 def __le__(self, other):
2024 def __le__(self, other):
2029 return self._cmp(other) <= 0
2025 return self._cmp(other) <= 0
2030
2026
2031 def __ge__(self, other):
2027 def __ge__(self, other):
2032 return self._cmp(other) >= 0
2028 return self._cmp(other) >= 0
2033
2029
2034 def __ne__(self, other):
2030 def __ne__(self, other):
2035 return self._cmp(other) != 0
2031 return self._cmp(other) != 0
2036
2032
2037 def sortclonebundleentries(ui, entries):
2033 def sortclonebundleentries(ui, entries):
2038 prefers = ui.configlist('ui', 'clonebundleprefers')
2034 prefers = ui.configlist('ui', 'clonebundleprefers')
2039 if not prefers:
2035 if not prefers:
2040 return list(entries)
2036 return list(entries)
2041
2037
2042 prefers = [p.split('=', 1) for p in prefers]
2038 prefers = [p.split('=', 1) for p in prefers]
2043
2039
2044 items = sorted(clonebundleentry(v, prefers) for v in entries)
2040 items = sorted(clonebundleentry(v, prefers) for v in entries)
2045 return [i.value for i in items]
2041 return [i.value for i in items]
2046
2042
2047 def trypullbundlefromurl(ui, repo, url):
2043 def trypullbundlefromurl(ui, repo, url):
2048 """Attempt to apply a bundle from a URL."""
2044 """Attempt to apply a bundle from a URL."""
2049 with repo.lock(), repo.transaction('bundleurl') as tr:
2045 with repo.lock(), repo.transaction('bundleurl') as tr:
2050 try:
2046 try:
2051 fh = urlmod.open(ui, url)
2047 fh = urlmod.open(ui, url)
2052 cg = readbundle(ui, fh, 'stream')
2048 cg = readbundle(ui, fh, 'stream')
2053
2049
2054 if isinstance(cg, streamclone.streamcloneapplier):
2050 if isinstance(cg, streamclone.streamcloneapplier):
2055 cg.apply(repo)
2051 cg.apply(repo)
2056 else:
2052 else:
2057 bundle2.applybundle(repo, cg, tr, 'clonebundles', url)
2053 bundle2.applybundle(repo, cg, tr, 'clonebundles', url)
2058 return True
2054 return True
2059 except urlerr.httperror as e:
2055 except urlerr.httperror as e:
2060 ui.warn(_('HTTP error fetching bundle: %s\n') % str(e))
2056 ui.warn(_('HTTP error fetching bundle: %s\n') % str(e))
2061 except urlerr.urlerror as e:
2057 except urlerr.urlerror as e:
2062 ui.warn(_('error fetching bundle: %s\n') % e.reason)
2058 ui.warn(_('error fetching bundle: %s\n') % e.reason)
2063
2059
2064 return False
2060 return False
General Comments 0
You need to be logged in to leave comments. Login now