##// END OF EJS Templates
bundle2.getunbundler: rename "header" to "magicstring"...
Pierre-Yves David -
r25640:39f0064a default
parent child Browse files
Show More
@@ -1,1410 +1,1410
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 import errno
148 import errno
149 import sys
149 import sys
150 import util
150 import util
151 import struct
151 import struct
152 import urllib
152 import urllib
153 import string
153 import string
154 import obsolete
154 import obsolete
155 import pushkey
155 import pushkey
156 import url
156 import url
157 import re
157 import re
158
158
159 import changegroup, error, tags
159 import changegroup, error, tags
160 from i18n import _
160 from i18n import _
161
161
162 _pack = struct.pack
162 _pack = struct.pack
163 _unpack = struct.unpack
163 _unpack = struct.unpack
164
164
165 _fstreamparamsize = '>i'
165 _fstreamparamsize = '>i'
166 _fpartheadersize = '>i'
166 _fpartheadersize = '>i'
167 _fparttypesize = '>B'
167 _fparttypesize = '>B'
168 _fpartid = '>I'
168 _fpartid = '>I'
169 _fpayloadsize = '>i'
169 _fpayloadsize = '>i'
170 _fpartparamcount = '>BB'
170 _fpartparamcount = '>BB'
171
171
172 preferedchunksize = 4096
172 preferedchunksize = 4096
173
173
174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
175
175
176 def outdebug(ui, message):
176 def outdebug(ui, message):
177 """debug regarding output stream (bundling)"""
177 """debug regarding output stream (bundling)"""
178 if ui.configbool('devel', 'bundle2.debug', False):
178 if ui.configbool('devel', 'bundle2.debug', False):
179 ui.debug('bundle2-output: %s\n' % message)
179 ui.debug('bundle2-output: %s\n' % message)
180
180
181 def indebug(ui, message):
181 def indebug(ui, message):
182 """debug on input stream (unbundling)"""
182 """debug on input stream (unbundling)"""
183 if ui.configbool('devel', 'bundle2.debug', False):
183 if ui.configbool('devel', 'bundle2.debug', False):
184 ui.debug('bundle2-input: %s\n' % message)
184 ui.debug('bundle2-input: %s\n' % message)
185
185
186 def validateparttype(parttype):
186 def validateparttype(parttype):
187 """raise ValueError if a parttype contains invalid character"""
187 """raise ValueError if a parttype contains invalid character"""
188 if _parttypeforbidden.search(parttype):
188 if _parttypeforbidden.search(parttype):
189 raise ValueError(parttype)
189 raise ValueError(parttype)
190
190
191 def _makefpartparamsizes(nbparams):
191 def _makefpartparamsizes(nbparams):
192 """return a struct format to read part parameter sizes
192 """return a struct format to read part parameter sizes
193
193
194 The number parameters is variable so we need to build that format
194 The number parameters is variable so we need to build that format
195 dynamically.
195 dynamically.
196 """
196 """
197 return '>'+('BB'*nbparams)
197 return '>'+('BB'*nbparams)
198
198
199 parthandlermapping = {}
199 parthandlermapping = {}
200
200
201 def parthandler(parttype, params=()):
201 def parthandler(parttype, params=()):
202 """decorator that register a function as a bundle2 part handler
202 """decorator that register a function as a bundle2 part handler
203
203
204 eg::
204 eg::
205
205
206 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
206 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
207 def myparttypehandler(...):
207 def myparttypehandler(...):
208 '''process a part of type "my part".'''
208 '''process a part of type "my part".'''
209 ...
209 ...
210 """
210 """
211 validateparttype(parttype)
211 validateparttype(parttype)
212 def _decorator(func):
212 def _decorator(func):
213 lparttype = parttype.lower() # enforce lower case matching.
213 lparttype = parttype.lower() # enforce lower case matching.
214 assert lparttype not in parthandlermapping
214 assert lparttype not in parthandlermapping
215 parthandlermapping[lparttype] = func
215 parthandlermapping[lparttype] = func
216 func.params = frozenset(params)
216 func.params = frozenset(params)
217 return func
217 return func
218 return _decorator
218 return _decorator
219
219
220 class unbundlerecords(object):
220 class unbundlerecords(object):
221 """keep record of what happens during and unbundle
221 """keep record of what happens during and unbundle
222
222
223 New records are added using `records.add('cat', obj)`. Where 'cat' is a
223 New records are added using `records.add('cat', obj)`. Where 'cat' is a
224 category of record and obj is an arbitrary object.
224 category of record and obj is an arbitrary object.
225
225
226 `records['cat']` will return all entries of this category 'cat'.
226 `records['cat']` will return all entries of this category 'cat'.
227
227
228 Iterating on the object itself will yield `('category', obj)` tuples
228 Iterating on the object itself will yield `('category', obj)` tuples
229 for all entries.
229 for all entries.
230
230
231 All iterations happens in chronological order.
231 All iterations happens in chronological order.
232 """
232 """
233
233
234 def __init__(self):
234 def __init__(self):
235 self._categories = {}
235 self._categories = {}
236 self._sequences = []
236 self._sequences = []
237 self._replies = {}
237 self._replies = {}
238
238
239 def add(self, category, entry, inreplyto=None):
239 def add(self, category, entry, inreplyto=None):
240 """add a new record of a given category.
240 """add a new record of a given category.
241
241
242 The entry can then be retrieved in the list returned by
242 The entry can then be retrieved in the list returned by
243 self['category']."""
243 self['category']."""
244 self._categories.setdefault(category, []).append(entry)
244 self._categories.setdefault(category, []).append(entry)
245 self._sequences.append((category, entry))
245 self._sequences.append((category, entry))
246 if inreplyto is not None:
246 if inreplyto is not None:
247 self.getreplies(inreplyto).add(category, entry)
247 self.getreplies(inreplyto).add(category, entry)
248
248
249 def getreplies(self, partid):
249 def getreplies(self, partid):
250 """get the records that are replies to a specific part"""
250 """get the records that are replies to a specific part"""
251 return self._replies.setdefault(partid, unbundlerecords())
251 return self._replies.setdefault(partid, unbundlerecords())
252
252
253 def __getitem__(self, cat):
253 def __getitem__(self, cat):
254 return tuple(self._categories.get(cat, ()))
254 return tuple(self._categories.get(cat, ()))
255
255
256 def __iter__(self):
256 def __iter__(self):
257 return iter(self._sequences)
257 return iter(self._sequences)
258
258
259 def __len__(self):
259 def __len__(self):
260 return len(self._sequences)
260 return len(self._sequences)
261
261
262 def __nonzero__(self):
262 def __nonzero__(self):
263 return bool(self._sequences)
263 return bool(self._sequences)
264
264
265 class bundleoperation(object):
265 class bundleoperation(object):
266 """an object that represents a single bundling process
266 """an object that represents a single bundling process
267
267
268 Its purpose is to carry unbundle-related objects and states.
268 Its purpose is to carry unbundle-related objects and states.
269
269
270 A new object should be created at the beginning of each bundle processing.
270 A new object should be created at the beginning of each bundle processing.
271 The object is to be returned by the processing function.
271 The object is to be returned by the processing function.
272
272
273 The object has very little content now it will ultimately contain:
273 The object has very little content now it will ultimately contain:
274 * an access to the repo the bundle is applied to,
274 * an access to the repo the bundle is applied to,
275 * a ui object,
275 * a ui object,
276 * a way to retrieve a transaction to add changes to the repo,
276 * a way to retrieve a transaction to add changes to the repo,
277 * a way to record the result of processing each part,
277 * a way to record the result of processing each part,
278 * a way to construct a bundle response when applicable.
278 * a way to construct a bundle response when applicable.
279 """
279 """
280
280
281 def __init__(self, repo, transactiongetter, captureoutput=True):
281 def __init__(self, repo, transactiongetter, captureoutput=True):
282 self.repo = repo
282 self.repo = repo
283 self.ui = repo.ui
283 self.ui = repo.ui
284 self.records = unbundlerecords()
284 self.records = unbundlerecords()
285 self.gettransaction = transactiongetter
285 self.gettransaction = transactiongetter
286 self.reply = None
286 self.reply = None
287 self.captureoutput = captureoutput
287 self.captureoutput = captureoutput
288
288
289 class TransactionUnavailable(RuntimeError):
289 class TransactionUnavailable(RuntimeError):
290 pass
290 pass
291
291
292 def _notransaction():
292 def _notransaction():
293 """default method to get a transaction while processing a bundle
293 """default method to get a transaction while processing a bundle
294
294
295 Raise an exception to highlight the fact that no transaction was expected
295 Raise an exception to highlight the fact that no transaction was expected
296 to be created"""
296 to be created"""
297 raise TransactionUnavailable()
297 raise TransactionUnavailable()
298
298
299 def processbundle(repo, unbundler, transactiongetter=None, op=None):
299 def processbundle(repo, unbundler, transactiongetter=None, op=None):
300 """This function process a bundle, apply effect to/from a repo
300 """This function process a bundle, apply effect to/from a repo
301
301
302 It iterates over each part then searches for and uses the proper handling
302 It iterates over each part then searches for and uses the proper handling
303 code to process the part. Parts are processed in order.
303 code to process the part. Parts are processed in order.
304
304
305 This is very early version of this function that will be strongly reworked
305 This is very early version of this function that will be strongly reworked
306 before final usage.
306 before final usage.
307
307
308 Unknown Mandatory part will abort the process.
308 Unknown Mandatory part will abort the process.
309
309
310 It is temporarily possible to provide a prebuilt bundleoperation to the
310 It is temporarily possible to provide a prebuilt bundleoperation to the
311 function. This is used to ensure output is properly propagated in case of
311 function. This is used to ensure output is properly propagated in case of
312 an error during the unbundling. This output capturing part will likely be
312 an error during the unbundling. This output capturing part will likely be
313 reworked and this ability will probably go away in the process.
313 reworked and this ability will probably go away in the process.
314 """
314 """
315 if op is None:
315 if op is None:
316 if transactiongetter is None:
316 if transactiongetter is None:
317 transactiongetter = _notransaction
317 transactiongetter = _notransaction
318 op = bundleoperation(repo, transactiongetter)
318 op = bundleoperation(repo, transactiongetter)
319 # todo:
319 # todo:
320 # - replace this is a init function soon.
320 # - replace this is a init function soon.
321 # - exception catching
321 # - exception catching
322 unbundler.params
322 unbundler.params
323 if repo.ui.debugflag:
323 if repo.ui.debugflag:
324 msg = ['bundle2-input-bundle:']
324 msg = ['bundle2-input-bundle:']
325 if unbundler.params:
325 if unbundler.params:
326 msg.append(' %i params')
326 msg.append(' %i params')
327 if op.gettransaction is None:
327 if op.gettransaction is None:
328 msg.append(' no-transaction')
328 msg.append(' no-transaction')
329 else:
329 else:
330 msg.append(' with-transaction')
330 msg.append(' with-transaction')
331 msg.append('\n')
331 msg.append('\n')
332 repo.ui.debug(''.join(msg))
332 repo.ui.debug(''.join(msg))
333 iterparts = enumerate(unbundler.iterparts())
333 iterparts = enumerate(unbundler.iterparts())
334 part = None
334 part = None
335 nbpart = 0
335 nbpart = 0
336 try:
336 try:
337 for nbpart, part in iterparts:
337 for nbpart, part in iterparts:
338 _processpart(op, part)
338 _processpart(op, part)
339 except BaseException, exc:
339 except BaseException, exc:
340 for nbpart, part in iterparts:
340 for nbpart, part in iterparts:
341 # consume the bundle content
341 # consume the bundle content
342 part.seek(0, 2)
342 part.seek(0, 2)
343 # Small hack to let caller code distinguish exceptions from bundle2
343 # Small hack to let caller code distinguish exceptions from bundle2
344 # processing from processing the old format. This is mostly
344 # processing from processing the old format. This is mostly
345 # needed to handle different return codes to unbundle according to the
345 # needed to handle different return codes to unbundle according to the
346 # type of bundle. We should probably clean up or drop this return code
346 # type of bundle. We should probably clean up or drop this return code
347 # craziness in a future version.
347 # craziness in a future version.
348 exc.duringunbundle2 = True
348 exc.duringunbundle2 = True
349 salvaged = []
349 salvaged = []
350 replycaps = None
350 replycaps = None
351 if op.reply is not None:
351 if op.reply is not None:
352 salvaged = op.reply.salvageoutput()
352 salvaged = op.reply.salvageoutput()
353 replycaps = op.reply.capabilities
353 replycaps = op.reply.capabilities
354 exc._replycaps = replycaps
354 exc._replycaps = replycaps
355 exc._bundle2salvagedoutput = salvaged
355 exc._bundle2salvagedoutput = salvaged
356 raise
356 raise
357 finally:
357 finally:
358 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
358 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
359
359
360 return op
360 return op
361
361
362 def _processpart(op, part):
362 def _processpart(op, part):
363 """process a single part from a bundle
363 """process a single part from a bundle
364
364
365 The part is guaranteed to have been fully consumed when the function exits
365 The part is guaranteed to have been fully consumed when the function exits
366 (even if an exception is raised)."""
366 (even if an exception is raised)."""
367 status = 'unknown' # used by debug output
367 status = 'unknown' # used by debug output
368 try:
368 try:
369 try:
369 try:
370 handler = parthandlermapping.get(part.type)
370 handler = parthandlermapping.get(part.type)
371 if handler is None:
371 if handler is None:
372 status = 'unsupported-type'
372 status = 'unsupported-type'
373 raise error.UnsupportedPartError(parttype=part.type)
373 raise error.UnsupportedPartError(parttype=part.type)
374 indebug(op.ui, 'found a handler for part %r' % part.type)
374 indebug(op.ui, 'found a handler for part %r' % part.type)
375 unknownparams = part.mandatorykeys - handler.params
375 unknownparams = part.mandatorykeys - handler.params
376 if unknownparams:
376 if unknownparams:
377 unknownparams = list(unknownparams)
377 unknownparams = list(unknownparams)
378 unknownparams.sort()
378 unknownparams.sort()
379 status = 'unsupported-params (%s)' % unknownparams
379 status = 'unsupported-params (%s)' % unknownparams
380 raise error.UnsupportedPartError(parttype=part.type,
380 raise error.UnsupportedPartError(parttype=part.type,
381 params=unknownparams)
381 params=unknownparams)
382 status = 'supported'
382 status = 'supported'
383 except error.UnsupportedPartError, exc:
383 except error.UnsupportedPartError, exc:
384 if part.mandatory: # mandatory parts
384 if part.mandatory: # mandatory parts
385 raise
385 raise
386 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
386 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
387 return # skip to part processing
387 return # skip to part processing
388 finally:
388 finally:
389 if op.ui.debugflag:
389 if op.ui.debugflag:
390 msg = ['bundle2-input-part: "%s"' % part.type]
390 msg = ['bundle2-input-part: "%s"' % part.type]
391 if not part.mandatory:
391 if not part.mandatory:
392 msg.append(' (advisory)')
392 msg.append(' (advisory)')
393 nbmp = len(part.mandatorykeys)
393 nbmp = len(part.mandatorykeys)
394 nbap = len(part.params) - nbmp
394 nbap = len(part.params) - nbmp
395 if nbmp or nbap:
395 if nbmp or nbap:
396 msg.append(' (params:')
396 msg.append(' (params:')
397 if nbmp:
397 if nbmp:
398 msg.append(' %i mandatory' % nbmp)
398 msg.append(' %i mandatory' % nbmp)
399 if nbap:
399 if nbap:
400 msg.append(' %i advisory' % nbmp)
400 msg.append(' %i advisory' % nbmp)
401 msg.append(')')
401 msg.append(')')
402 msg.append(' %s\n' % status)
402 msg.append(' %s\n' % status)
403 op.ui.debug(''.join(msg))
403 op.ui.debug(''.join(msg))
404
404
405 # handler is called outside the above try block so that we don't
405 # handler is called outside the above try block so that we don't
406 # risk catching KeyErrors from anything other than the
406 # risk catching KeyErrors from anything other than the
407 # parthandlermapping lookup (any KeyError raised by handler()
407 # parthandlermapping lookup (any KeyError raised by handler()
408 # itself represents a defect of a different variety).
408 # itself represents a defect of a different variety).
409 output = None
409 output = None
410 if op.captureoutput and op.reply is not None:
410 if op.captureoutput and op.reply is not None:
411 op.ui.pushbuffer(error=True, subproc=True)
411 op.ui.pushbuffer(error=True, subproc=True)
412 output = ''
412 output = ''
413 try:
413 try:
414 handler(op, part)
414 handler(op, part)
415 finally:
415 finally:
416 if output is not None:
416 if output is not None:
417 output = op.ui.popbuffer()
417 output = op.ui.popbuffer()
418 if output:
418 if output:
419 outpart = op.reply.newpart('output', data=output,
419 outpart = op.reply.newpart('output', data=output,
420 mandatory=False)
420 mandatory=False)
421 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
421 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
422 finally:
422 finally:
423 # consume the part content to not corrupt the stream.
423 # consume the part content to not corrupt the stream.
424 part.seek(0, 2)
424 part.seek(0, 2)
425
425
426
426
427 def decodecaps(blob):
427 def decodecaps(blob):
428 """decode a bundle2 caps bytes blob into a dictionary
428 """decode a bundle2 caps bytes blob into a dictionary
429
429
430 The blob is a list of capabilities (one per line)
430 The blob is a list of capabilities (one per line)
431 Capabilities may have values using a line of the form::
431 Capabilities may have values using a line of the form::
432
432
433 capability=value1,value2,value3
433 capability=value1,value2,value3
434
434
435 The values are always a list."""
435 The values are always a list."""
436 caps = {}
436 caps = {}
437 for line in blob.splitlines():
437 for line in blob.splitlines():
438 if not line:
438 if not line:
439 continue
439 continue
440 if '=' not in line:
440 if '=' not in line:
441 key, vals = line, ()
441 key, vals = line, ()
442 else:
442 else:
443 key, vals = line.split('=', 1)
443 key, vals = line.split('=', 1)
444 vals = vals.split(',')
444 vals = vals.split(',')
445 key = urllib.unquote(key)
445 key = urllib.unquote(key)
446 vals = [urllib.unquote(v) for v in vals]
446 vals = [urllib.unquote(v) for v in vals]
447 caps[key] = vals
447 caps[key] = vals
448 return caps
448 return caps
449
449
450 def encodecaps(caps):
450 def encodecaps(caps):
451 """encode a bundle2 caps dictionary into a bytes blob"""
451 """encode a bundle2 caps dictionary into a bytes blob"""
452 chunks = []
452 chunks = []
453 for ca in sorted(caps):
453 for ca in sorted(caps):
454 vals = caps[ca]
454 vals = caps[ca]
455 ca = urllib.quote(ca)
455 ca = urllib.quote(ca)
456 vals = [urllib.quote(v) for v in vals]
456 vals = [urllib.quote(v) for v in vals]
457 if vals:
457 if vals:
458 ca = "%s=%s" % (ca, ','.join(vals))
458 ca = "%s=%s" % (ca, ','.join(vals))
459 chunks.append(ca)
459 chunks.append(ca)
460 return '\n'.join(chunks)
460 return '\n'.join(chunks)
461
461
462 class bundle20(object):
462 class bundle20(object):
463 """represent an outgoing bundle2 container
463 """represent an outgoing bundle2 container
464
464
465 Use the `addparam` method to add stream level parameter. and `newpart` to
465 Use the `addparam` method to add stream level parameter. and `newpart` to
466 populate it. Then call `getchunks` to retrieve all the binary chunks of
466 populate it. Then call `getchunks` to retrieve all the binary chunks of
467 data that compose the bundle2 container."""
467 data that compose the bundle2 container."""
468
468
469 _magicstring = 'HG20'
469 _magicstring = 'HG20'
470
470
471 def __init__(self, ui, capabilities=()):
471 def __init__(self, ui, capabilities=()):
472 self.ui = ui
472 self.ui = ui
473 self._params = []
473 self._params = []
474 self._parts = []
474 self._parts = []
475 self.capabilities = dict(capabilities)
475 self.capabilities = dict(capabilities)
476
476
477 @property
477 @property
478 def nbparts(self):
478 def nbparts(self):
479 """total number of parts added to the bundler"""
479 """total number of parts added to the bundler"""
480 return len(self._parts)
480 return len(self._parts)
481
481
482 # methods used to defines the bundle2 content
482 # methods used to defines the bundle2 content
483 def addparam(self, name, value=None):
483 def addparam(self, name, value=None):
484 """add a stream level parameter"""
484 """add a stream level parameter"""
485 if not name:
485 if not name:
486 raise ValueError('empty parameter name')
486 raise ValueError('empty parameter name')
487 if name[0] not in string.letters:
487 if name[0] not in string.letters:
488 raise ValueError('non letter first character: %r' % name)
488 raise ValueError('non letter first character: %r' % name)
489 self._params.append((name, value))
489 self._params.append((name, value))
490
490
491 def addpart(self, part):
491 def addpart(self, part):
492 """add a new part to the bundle2 container
492 """add a new part to the bundle2 container
493
493
494 Parts contains the actual applicative payload."""
494 Parts contains the actual applicative payload."""
495 assert part.id is None
495 assert part.id is None
496 part.id = len(self._parts) # very cheap counter
496 part.id = len(self._parts) # very cheap counter
497 self._parts.append(part)
497 self._parts.append(part)
498
498
499 def newpart(self, typeid, *args, **kwargs):
499 def newpart(self, typeid, *args, **kwargs):
500 """create a new part and add it to the containers
500 """create a new part and add it to the containers
501
501
502 As the part is directly added to the containers. For now, this means
502 As the part is directly added to the containers. For now, this means
503 that any failure to properly initialize the part after calling
503 that any failure to properly initialize the part after calling
504 ``newpart`` should result in a failure of the whole bundling process.
504 ``newpart`` should result in a failure of the whole bundling process.
505
505
506 You can still fall back to manually create and add if you need better
506 You can still fall back to manually create and add if you need better
507 control."""
507 control."""
508 part = bundlepart(typeid, *args, **kwargs)
508 part = bundlepart(typeid, *args, **kwargs)
509 self.addpart(part)
509 self.addpart(part)
510 return part
510 return part
511
511
512 # methods used to generate the bundle2 stream
512 # methods used to generate the bundle2 stream
513 def getchunks(self):
513 def getchunks(self):
514 if self.ui.debugflag:
514 if self.ui.debugflag:
515 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
515 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
516 if self._params:
516 if self._params:
517 msg.append(' (%i params)' % len(self._params))
517 msg.append(' (%i params)' % len(self._params))
518 msg.append(' %i parts total\n' % len(self._parts))
518 msg.append(' %i parts total\n' % len(self._parts))
519 self.ui.debug(''.join(msg))
519 self.ui.debug(''.join(msg))
520 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
520 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
521 yield self._magicstring
521 yield self._magicstring
522 param = self._paramchunk()
522 param = self._paramchunk()
523 outdebug(self.ui, 'bundle parameter: %s' % param)
523 outdebug(self.ui, 'bundle parameter: %s' % param)
524 yield _pack(_fstreamparamsize, len(param))
524 yield _pack(_fstreamparamsize, len(param))
525 if param:
525 if param:
526 yield param
526 yield param
527
527
528 outdebug(self.ui, 'start of parts')
528 outdebug(self.ui, 'start of parts')
529 for part in self._parts:
529 for part in self._parts:
530 outdebug(self.ui, 'bundle part: "%s"' % part.type)
530 outdebug(self.ui, 'bundle part: "%s"' % part.type)
531 for chunk in part.getchunks(ui=self.ui):
531 for chunk in part.getchunks(ui=self.ui):
532 yield chunk
532 yield chunk
533 outdebug(self.ui, 'end of bundle')
533 outdebug(self.ui, 'end of bundle')
534 yield _pack(_fpartheadersize, 0)
534 yield _pack(_fpartheadersize, 0)
535
535
536 def _paramchunk(self):
536 def _paramchunk(self):
537 """return a encoded version of all stream parameters"""
537 """return a encoded version of all stream parameters"""
538 blocks = []
538 blocks = []
539 for par, value in self._params:
539 for par, value in self._params:
540 par = urllib.quote(par)
540 par = urllib.quote(par)
541 if value is not None:
541 if value is not None:
542 value = urllib.quote(value)
542 value = urllib.quote(value)
543 par = '%s=%s' % (par, value)
543 par = '%s=%s' % (par, value)
544 blocks.append(par)
544 blocks.append(par)
545 return ' '.join(blocks)
545 return ' '.join(blocks)
546
546
547 def salvageoutput(self):
547 def salvageoutput(self):
548 """return a list with a copy of all output parts in the bundle
548 """return a list with a copy of all output parts in the bundle
549
549
550 This is meant to be used during error handling to make sure we preserve
550 This is meant to be used during error handling to make sure we preserve
551 server output"""
551 server output"""
552 salvaged = []
552 salvaged = []
553 for part in self._parts:
553 for part in self._parts:
554 if part.type.startswith('output'):
554 if part.type.startswith('output'):
555 salvaged.append(part.copy())
555 salvaged.append(part.copy())
556 return salvaged
556 return salvaged
557
557
558
558
559 class unpackermixin(object):
559 class unpackermixin(object):
560 """A mixin to extract bytes and struct data from a stream"""
560 """A mixin to extract bytes and struct data from a stream"""
561
561
562 def __init__(self, fp):
562 def __init__(self, fp):
563 self._fp = fp
563 self._fp = fp
564 self._seekable = (util.safehasattr(fp, 'seek') and
564 self._seekable = (util.safehasattr(fp, 'seek') and
565 util.safehasattr(fp, 'tell'))
565 util.safehasattr(fp, 'tell'))
566
566
567 def _unpack(self, format):
567 def _unpack(self, format):
568 """unpack this struct format from the stream"""
568 """unpack this struct format from the stream"""
569 data = self._readexact(struct.calcsize(format))
569 data = self._readexact(struct.calcsize(format))
570 return _unpack(format, data)
570 return _unpack(format, data)
571
571
572 def _readexact(self, size):
572 def _readexact(self, size):
573 """read exactly <size> bytes from the stream"""
573 """read exactly <size> bytes from the stream"""
574 return changegroup.readexactly(self._fp, size)
574 return changegroup.readexactly(self._fp, size)
575
575
576 def seek(self, offset, whence=0):
576 def seek(self, offset, whence=0):
577 """move the underlying file pointer"""
577 """move the underlying file pointer"""
578 if self._seekable:
578 if self._seekable:
579 return self._fp.seek(offset, whence)
579 return self._fp.seek(offset, whence)
580 else:
580 else:
581 raise NotImplementedError(_('File pointer is not seekable'))
581 raise NotImplementedError(_('File pointer is not seekable'))
582
582
583 def tell(self):
583 def tell(self):
584 """return the file offset, or None if file is not seekable"""
584 """return the file offset, or None if file is not seekable"""
585 if self._seekable:
585 if self._seekable:
586 try:
586 try:
587 return self._fp.tell()
587 return self._fp.tell()
588 except IOError, e:
588 except IOError, e:
589 if e.errno == errno.ESPIPE:
589 if e.errno == errno.ESPIPE:
590 self._seekable = False
590 self._seekable = False
591 else:
591 else:
592 raise
592 raise
593 return None
593 return None
594
594
595 def close(self):
595 def close(self):
596 """close underlying file"""
596 """close underlying file"""
597 if util.safehasattr(self._fp, 'close'):
597 if util.safehasattr(self._fp, 'close'):
598 return self._fp.close()
598 return self._fp.close()
599
599
600 def getunbundler(ui, fp, header=None):
600 def getunbundler(ui, fp, magicstring=None):
601 """return a valid unbundler object for a given header"""
601 """return a valid unbundler object for a given magicstring"""
602 if header is None:
602 if magicstring is None:
603 header = changegroup.readexactly(fp, 4)
603 magicstring = changegroup.readexactly(fp, 4)
604 magic, version = header[0:2], header[2:4]
604 magic, version = magicstring[0:2], magicstring[2:4]
605 if magic != 'HG':
605 if magic != 'HG':
606 raise util.Abort(_('not a Mercurial bundle'))
606 raise util.Abort(_('not a Mercurial bundle'))
607 unbundlerclass = formatmap.get(version)
607 unbundlerclass = formatmap.get(version)
608 if unbundlerclass is None:
608 if unbundlerclass is None:
609 raise util.Abort(_('unknown bundle version %s') % version)
609 raise util.Abort(_('unknown bundle version %s') % version)
610 unbundler = unbundlerclass(ui, fp)
610 unbundler = unbundlerclass(ui, fp)
611 indebug(ui, 'start processing of %s stream' % header)
611 indebug(ui, 'start processing of %s stream' % magicstring)
612 return unbundler
612 return unbundler
613
613
614 class unbundle20(unpackermixin):
614 class unbundle20(unpackermixin):
615 """interpret a bundle2 stream
615 """interpret a bundle2 stream
616
616
617 This class is fed with a binary stream and yields parts through its
617 This class is fed with a binary stream and yields parts through its
618 `iterparts` methods."""
618 `iterparts` methods."""
619
619
620 def __init__(self, ui, fp):
620 def __init__(self, ui, fp):
621 """If header is specified, we do not read it out of the stream."""
621 """If header is specified, we do not read it out of the stream."""
622 self.ui = ui
622 self.ui = ui
623 super(unbundle20, self).__init__(fp)
623 super(unbundle20, self).__init__(fp)
624
624
625 @util.propertycache
625 @util.propertycache
626 def params(self):
626 def params(self):
627 """dictionary of stream level parameters"""
627 """dictionary of stream level parameters"""
628 indebug(self.ui, 'reading bundle2 stream parameters')
628 indebug(self.ui, 'reading bundle2 stream parameters')
629 params = {}
629 params = {}
630 paramssize = self._unpack(_fstreamparamsize)[0]
630 paramssize = self._unpack(_fstreamparamsize)[0]
631 if paramssize < 0:
631 if paramssize < 0:
632 raise error.BundleValueError('negative bundle param size: %i'
632 raise error.BundleValueError('negative bundle param size: %i'
633 % paramssize)
633 % paramssize)
634 if paramssize:
634 if paramssize:
635 for p in self._readexact(paramssize).split(' '):
635 for p in self._readexact(paramssize).split(' '):
636 p = p.split('=', 1)
636 p = p.split('=', 1)
637 p = [urllib.unquote(i) for i in p]
637 p = [urllib.unquote(i) for i in p]
638 if len(p) < 2:
638 if len(p) < 2:
639 p.append(None)
639 p.append(None)
640 self._processparam(*p)
640 self._processparam(*p)
641 params[p[0]] = p[1]
641 params[p[0]] = p[1]
642 return params
642 return params
643
643
644 def _processparam(self, name, value):
644 def _processparam(self, name, value):
645 """process a parameter, applying its effect if needed
645 """process a parameter, applying its effect if needed
646
646
647 Parameter starting with a lower case letter are advisory and will be
647 Parameter starting with a lower case letter are advisory and will be
648 ignored when unknown. Those starting with an upper case letter are
648 ignored when unknown. Those starting with an upper case letter are
649 mandatory and will this function will raise a KeyError when unknown.
649 mandatory and will this function will raise a KeyError when unknown.
650
650
651 Note: no option are currently supported. Any input will be either
651 Note: no option are currently supported. Any input will be either
652 ignored or failing.
652 ignored or failing.
653 """
653 """
654 if not name:
654 if not name:
655 raise ValueError('empty parameter name')
655 raise ValueError('empty parameter name')
656 if name[0] not in string.letters:
656 if name[0] not in string.letters:
657 raise ValueError('non letter first character: %r' % name)
657 raise ValueError('non letter first character: %r' % name)
658 # Some logic will be later added here to try to process the option for
658 # Some logic will be later added here to try to process the option for
659 # a dict of known parameter.
659 # a dict of known parameter.
660 if name[0].islower():
660 if name[0].islower():
661 indebug(self.ui, "ignoring unknown parameter %r" % name)
661 indebug(self.ui, "ignoring unknown parameter %r" % name)
662 else:
662 else:
663 raise error.UnsupportedPartError(params=(name,))
663 raise error.UnsupportedPartError(params=(name,))
664
664
665
665
666 def iterparts(self):
666 def iterparts(self):
667 """yield all parts contained in the stream"""
667 """yield all parts contained in the stream"""
668 # make sure param have been loaded
668 # make sure param have been loaded
669 self.params
669 self.params
670 indebug(self.ui, 'start extraction of bundle2 parts')
670 indebug(self.ui, 'start extraction of bundle2 parts')
671 headerblock = self._readpartheader()
671 headerblock = self._readpartheader()
672 while headerblock is not None:
672 while headerblock is not None:
673 part = unbundlepart(self.ui, headerblock, self._fp)
673 part = unbundlepart(self.ui, headerblock, self._fp)
674 yield part
674 yield part
675 part.seek(0, 2)
675 part.seek(0, 2)
676 headerblock = self._readpartheader()
676 headerblock = self._readpartheader()
677 indebug(self.ui, 'end of bundle2 stream')
677 indebug(self.ui, 'end of bundle2 stream')
678
678
679 def _readpartheader(self):
679 def _readpartheader(self):
680 """reads a part header size and return the bytes blob
680 """reads a part header size and return the bytes blob
681
681
682 returns None if empty"""
682 returns None if empty"""
683 headersize = self._unpack(_fpartheadersize)[0]
683 headersize = self._unpack(_fpartheadersize)[0]
684 if headersize < 0:
684 if headersize < 0:
685 raise error.BundleValueError('negative part header size: %i'
685 raise error.BundleValueError('negative part header size: %i'
686 % headersize)
686 % headersize)
687 indebug(self.ui, 'part header size: %i' % headersize)
687 indebug(self.ui, 'part header size: %i' % headersize)
688 if headersize:
688 if headersize:
689 return self._readexact(headersize)
689 return self._readexact(headersize)
690 return None
690 return None
691
691
692 def compressed(self):
692 def compressed(self):
693 return False
693 return False
694
694
695 formatmap = {'20': unbundle20}
695 formatmap = {'20': unbundle20}
696
696
697 class bundlepart(object):
697 class bundlepart(object):
698 """A bundle2 part contains application level payload
698 """A bundle2 part contains application level payload
699
699
700 The part `type` is used to route the part to the application level
700 The part `type` is used to route the part to the application level
701 handler.
701 handler.
702
702
703 The part payload is contained in ``part.data``. It could be raw bytes or a
703 The part payload is contained in ``part.data``. It could be raw bytes or a
704 generator of byte chunks.
704 generator of byte chunks.
705
705
706 You can add parameters to the part using the ``addparam`` method.
706 You can add parameters to the part using the ``addparam`` method.
707 Parameters can be either mandatory (default) or advisory. Remote side
707 Parameters can be either mandatory (default) or advisory. Remote side
708 should be able to safely ignore the advisory ones.
708 should be able to safely ignore the advisory ones.
709
709
710 Both data and parameters cannot be modified after the generation has begun.
710 Both data and parameters cannot be modified after the generation has begun.
711 """
711 """
712
712
713 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
713 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
714 data='', mandatory=True):
714 data='', mandatory=True):
715 validateparttype(parttype)
715 validateparttype(parttype)
716 self.id = None
716 self.id = None
717 self.type = parttype
717 self.type = parttype
718 self._data = data
718 self._data = data
719 self._mandatoryparams = list(mandatoryparams)
719 self._mandatoryparams = list(mandatoryparams)
720 self._advisoryparams = list(advisoryparams)
720 self._advisoryparams = list(advisoryparams)
721 # checking for duplicated entries
721 # checking for duplicated entries
722 self._seenparams = set()
722 self._seenparams = set()
723 for pname, __ in self._mandatoryparams + self._advisoryparams:
723 for pname, __ in self._mandatoryparams + self._advisoryparams:
724 if pname in self._seenparams:
724 if pname in self._seenparams:
725 raise RuntimeError('duplicated params: %s' % pname)
725 raise RuntimeError('duplicated params: %s' % pname)
726 self._seenparams.add(pname)
726 self._seenparams.add(pname)
727 # status of the part's generation:
727 # status of the part's generation:
728 # - None: not started,
728 # - None: not started,
729 # - False: currently generated,
729 # - False: currently generated,
730 # - True: generation done.
730 # - True: generation done.
731 self._generated = None
731 self._generated = None
732 self.mandatory = mandatory
732 self.mandatory = mandatory
733
733
734 def copy(self):
734 def copy(self):
735 """return a copy of the part
735 """return a copy of the part
736
736
737 The new part have the very same content but no partid assigned yet.
737 The new part have the very same content but no partid assigned yet.
738 Parts with generated data cannot be copied."""
738 Parts with generated data cannot be copied."""
739 assert not util.safehasattr(self.data, 'next')
739 assert not util.safehasattr(self.data, 'next')
740 return self.__class__(self.type, self._mandatoryparams,
740 return self.__class__(self.type, self._mandatoryparams,
741 self._advisoryparams, self._data, self.mandatory)
741 self._advisoryparams, self._data, self.mandatory)
742
742
743 # methods used to defines the part content
743 # methods used to defines the part content
744 def __setdata(self, data):
744 def __setdata(self, data):
745 if self._generated is not None:
745 if self._generated is not None:
746 raise error.ReadOnlyPartError('part is being generated')
746 raise error.ReadOnlyPartError('part is being generated')
747 self._data = data
747 self._data = data
748 def __getdata(self):
748 def __getdata(self):
749 return self._data
749 return self._data
750 data = property(__getdata, __setdata)
750 data = property(__getdata, __setdata)
751
751
752 @property
752 @property
753 def mandatoryparams(self):
753 def mandatoryparams(self):
754 # make it an immutable tuple to force people through ``addparam``
754 # make it an immutable tuple to force people through ``addparam``
755 return tuple(self._mandatoryparams)
755 return tuple(self._mandatoryparams)
756
756
757 @property
757 @property
758 def advisoryparams(self):
758 def advisoryparams(self):
759 # make it an immutable tuple to force people through ``addparam``
759 # make it an immutable tuple to force people through ``addparam``
760 return tuple(self._advisoryparams)
760 return tuple(self._advisoryparams)
761
761
762 def addparam(self, name, value='', mandatory=True):
762 def addparam(self, name, value='', mandatory=True):
763 if self._generated is not None:
763 if self._generated is not None:
764 raise error.ReadOnlyPartError('part is being generated')
764 raise error.ReadOnlyPartError('part is being generated')
765 if name in self._seenparams:
765 if name in self._seenparams:
766 raise ValueError('duplicated params: %s' % name)
766 raise ValueError('duplicated params: %s' % name)
767 self._seenparams.add(name)
767 self._seenparams.add(name)
768 params = self._advisoryparams
768 params = self._advisoryparams
769 if mandatory:
769 if mandatory:
770 params = self._mandatoryparams
770 params = self._mandatoryparams
771 params.append((name, value))
771 params.append((name, value))
772
772
773 # methods used to generates the bundle2 stream
773 # methods used to generates the bundle2 stream
774 def getchunks(self, ui):
774 def getchunks(self, ui):
775 if self._generated is not None:
775 if self._generated is not None:
776 raise RuntimeError('part can only be consumed once')
776 raise RuntimeError('part can only be consumed once')
777 self._generated = False
777 self._generated = False
778
778
779 if ui.debugflag:
779 if ui.debugflag:
780 msg = ['bundle2-output-part: "%s"' % self.type]
780 msg = ['bundle2-output-part: "%s"' % self.type]
781 if not self.mandatory:
781 if not self.mandatory:
782 msg.append(' (advisory)')
782 msg.append(' (advisory)')
783 nbmp = len(self.mandatoryparams)
783 nbmp = len(self.mandatoryparams)
784 nbap = len(self.advisoryparams)
784 nbap = len(self.advisoryparams)
785 if nbmp or nbap:
785 if nbmp or nbap:
786 msg.append(' (params:')
786 msg.append(' (params:')
787 if nbmp:
787 if nbmp:
788 msg.append(' %i mandatory' % nbmp)
788 msg.append(' %i mandatory' % nbmp)
789 if nbap:
789 if nbap:
790 msg.append(' %i advisory' % nbmp)
790 msg.append(' %i advisory' % nbmp)
791 msg.append(')')
791 msg.append(')')
792 if not self.data:
792 if not self.data:
793 msg.append(' empty payload')
793 msg.append(' empty payload')
794 elif util.safehasattr(self.data, 'next'):
794 elif util.safehasattr(self.data, 'next'):
795 msg.append(' streamed payload')
795 msg.append(' streamed payload')
796 else:
796 else:
797 msg.append(' %i bytes payload' % len(self.data))
797 msg.append(' %i bytes payload' % len(self.data))
798 msg.append('\n')
798 msg.append('\n')
799 ui.debug(''.join(msg))
799 ui.debug(''.join(msg))
800
800
801 #### header
801 #### header
802 if self.mandatory:
802 if self.mandatory:
803 parttype = self.type.upper()
803 parttype = self.type.upper()
804 else:
804 else:
805 parttype = self.type.lower()
805 parttype = self.type.lower()
806 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
806 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
807 ## parttype
807 ## parttype
808 header = [_pack(_fparttypesize, len(parttype)),
808 header = [_pack(_fparttypesize, len(parttype)),
809 parttype, _pack(_fpartid, self.id),
809 parttype, _pack(_fpartid, self.id),
810 ]
810 ]
811 ## parameters
811 ## parameters
812 # count
812 # count
813 manpar = self.mandatoryparams
813 manpar = self.mandatoryparams
814 advpar = self.advisoryparams
814 advpar = self.advisoryparams
815 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
815 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
816 # size
816 # size
817 parsizes = []
817 parsizes = []
818 for key, value in manpar:
818 for key, value in manpar:
819 parsizes.append(len(key))
819 parsizes.append(len(key))
820 parsizes.append(len(value))
820 parsizes.append(len(value))
821 for key, value in advpar:
821 for key, value in advpar:
822 parsizes.append(len(key))
822 parsizes.append(len(key))
823 parsizes.append(len(value))
823 parsizes.append(len(value))
824 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
824 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
825 header.append(paramsizes)
825 header.append(paramsizes)
826 # key, value
826 # key, value
827 for key, value in manpar:
827 for key, value in manpar:
828 header.append(key)
828 header.append(key)
829 header.append(value)
829 header.append(value)
830 for key, value in advpar:
830 for key, value in advpar:
831 header.append(key)
831 header.append(key)
832 header.append(value)
832 header.append(value)
833 ## finalize header
833 ## finalize header
834 headerchunk = ''.join(header)
834 headerchunk = ''.join(header)
835 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
835 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
836 yield _pack(_fpartheadersize, len(headerchunk))
836 yield _pack(_fpartheadersize, len(headerchunk))
837 yield headerchunk
837 yield headerchunk
838 ## payload
838 ## payload
839 try:
839 try:
840 for chunk in self._payloadchunks():
840 for chunk in self._payloadchunks():
841 outdebug(ui, 'payload chunk size: %i' % len(chunk))
841 outdebug(ui, 'payload chunk size: %i' % len(chunk))
842 yield _pack(_fpayloadsize, len(chunk))
842 yield _pack(_fpayloadsize, len(chunk))
843 yield chunk
843 yield chunk
844 except BaseException, exc:
844 except BaseException, exc:
845 # backup exception data for later
845 # backup exception data for later
846 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
846 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
847 % exc)
847 % exc)
848 exc_info = sys.exc_info()
848 exc_info = sys.exc_info()
849 msg = 'unexpected error: %s' % exc
849 msg = 'unexpected error: %s' % exc
850 interpart = bundlepart('error:abort', [('message', msg)],
850 interpart = bundlepart('error:abort', [('message', msg)],
851 mandatory=False)
851 mandatory=False)
852 interpart.id = 0
852 interpart.id = 0
853 yield _pack(_fpayloadsize, -1)
853 yield _pack(_fpayloadsize, -1)
854 for chunk in interpart.getchunks(ui=ui):
854 for chunk in interpart.getchunks(ui=ui):
855 yield chunk
855 yield chunk
856 outdebug(ui, 'closing payload chunk')
856 outdebug(ui, 'closing payload chunk')
857 # abort current part payload
857 # abort current part payload
858 yield _pack(_fpayloadsize, 0)
858 yield _pack(_fpayloadsize, 0)
859 raise exc_info[0], exc_info[1], exc_info[2]
859 raise exc_info[0], exc_info[1], exc_info[2]
860 # end of payload
860 # end of payload
861 outdebug(ui, 'closing payload chunk')
861 outdebug(ui, 'closing payload chunk')
862 yield _pack(_fpayloadsize, 0)
862 yield _pack(_fpayloadsize, 0)
863 self._generated = True
863 self._generated = True
864
864
865 def _payloadchunks(self):
865 def _payloadchunks(self):
866 """yield chunks of a the part payload
866 """yield chunks of a the part payload
867
867
868 Exists to handle the different methods to provide data to a part."""
868 Exists to handle the different methods to provide data to a part."""
869 # we only support fixed size data now.
869 # we only support fixed size data now.
870 # This will be improved in the future.
870 # This will be improved in the future.
871 if util.safehasattr(self.data, 'next'):
871 if util.safehasattr(self.data, 'next'):
872 buff = util.chunkbuffer(self.data)
872 buff = util.chunkbuffer(self.data)
873 chunk = buff.read(preferedchunksize)
873 chunk = buff.read(preferedchunksize)
874 while chunk:
874 while chunk:
875 yield chunk
875 yield chunk
876 chunk = buff.read(preferedchunksize)
876 chunk = buff.read(preferedchunksize)
877 elif len(self.data):
877 elif len(self.data):
878 yield self.data
878 yield self.data
879
879
880
880
881 flaginterrupt = -1
881 flaginterrupt = -1
882
882
883 class interrupthandler(unpackermixin):
883 class interrupthandler(unpackermixin):
884 """read one part and process it with restricted capability
884 """read one part and process it with restricted capability
885
885
886 This allows to transmit exception raised on the producer size during part
886 This allows to transmit exception raised on the producer size during part
887 iteration while the consumer is reading a part.
887 iteration while the consumer is reading a part.
888
888
889 Part processed in this manner only have access to a ui object,"""
889 Part processed in this manner only have access to a ui object,"""
890
890
891 def __init__(self, ui, fp):
891 def __init__(self, ui, fp):
892 super(interrupthandler, self).__init__(fp)
892 super(interrupthandler, self).__init__(fp)
893 self.ui = ui
893 self.ui = ui
894
894
895 def _readpartheader(self):
895 def _readpartheader(self):
896 """reads a part header size and return the bytes blob
896 """reads a part header size and return the bytes blob
897
897
898 returns None if empty"""
898 returns None if empty"""
899 headersize = self._unpack(_fpartheadersize)[0]
899 headersize = self._unpack(_fpartheadersize)[0]
900 if headersize < 0:
900 if headersize < 0:
901 raise error.BundleValueError('negative part header size: %i'
901 raise error.BundleValueError('negative part header size: %i'
902 % headersize)
902 % headersize)
903 indebug(self.ui, 'part header size: %i\n' % headersize)
903 indebug(self.ui, 'part header size: %i\n' % headersize)
904 if headersize:
904 if headersize:
905 return self._readexact(headersize)
905 return self._readexact(headersize)
906 return None
906 return None
907
907
908 def __call__(self):
908 def __call__(self):
909
909
910 self.ui.debug('bundle2-input-stream-interrupt:'
910 self.ui.debug('bundle2-input-stream-interrupt:'
911 ' opening out of band context\n')
911 ' opening out of band context\n')
912 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
912 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
913 headerblock = self._readpartheader()
913 headerblock = self._readpartheader()
914 if headerblock is None:
914 if headerblock is None:
915 indebug(self.ui, 'no part found during interruption.')
915 indebug(self.ui, 'no part found during interruption.')
916 return
916 return
917 part = unbundlepart(self.ui, headerblock, self._fp)
917 part = unbundlepart(self.ui, headerblock, self._fp)
918 op = interruptoperation(self.ui)
918 op = interruptoperation(self.ui)
919 _processpart(op, part)
919 _processpart(op, part)
920 self.ui.debug('bundle2-input-stream-interrupt:'
920 self.ui.debug('bundle2-input-stream-interrupt:'
921 ' closing out of band context\n')
921 ' closing out of band context\n')
922
922
923 class interruptoperation(object):
923 class interruptoperation(object):
924 """A limited operation to be use by part handler during interruption
924 """A limited operation to be use by part handler during interruption
925
925
926 It only have access to an ui object.
926 It only have access to an ui object.
927 """
927 """
928
928
929 def __init__(self, ui):
929 def __init__(self, ui):
930 self.ui = ui
930 self.ui = ui
931 self.reply = None
931 self.reply = None
932 self.captureoutput = False
932 self.captureoutput = False
933
933
934 @property
934 @property
935 def repo(self):
935 def repo(self):
936 raise RuntimeError('no repo access from stream interruption')
936 raise RuntimeError('no repo access from stream interruption')
937
937
938 def gettransaction(self):
938 def gettransaction(self):
939 raise TransactionUnavailable('no repo access from stream interruption')
939 raise TransactionUnavailable('no repo access from stream interruption')
940
940
941 class unbundlepart(unpackermixin):
941 class unbundlepart(unpackermixin):
942 """a bundle part read from a bundle"""
942 """a bundle part read from a bundle"""
943
943
944 def __init__(self, ui, header, fp):
944 def __init__(self, ui, header, fp):
945 super(unbundlepart, self).__init__(fp)
945 super(unbundlepart, self).__init__(fp)
946 self.ui = ui
946 self.ui = ui
947 # unbundle state attr
947 # unbundle state attr
948 self._headerdata = header
948 self._headerdata = header
949 self._headeroffset = 0
949 self._headeroffset = 0
950 self._initialized = False
950 self._initialized = False
951 self.consumed = False
951 self.consumed = False
952 # part data
952 # part data
953 self.id = None
953 self.id = None
954 self.type = None
954 self.type = None
955 self.mandatoryparams = None
955 self.mandatoryparams = None
956 self.advisoryparams = None
956 self.advisoryparams = None
957 self.params = None
957 self.params = None
958 self.mandatorykeys = ()
958 self.mandatorykeys = ()
959 self._payloadstream = None
959 self._payloadstream = None
960 self._readheader()
960 self._readheader()
961 self._mandatory = None
961 self._mandatory = None
962 self._chunkindex = [] #(payload, file) position tuples for chunk starts
962 self._chunkindex = [] #(payload, file) position tuples for chunk starts
963 self._pos = 0
963 self._pos = 0
964
964
965 def _fromheader(self, size):
965 def _fromheader(self, size):
966 """return the next <size> byte from the header"""
966 """return the next <size> byte from the header"""
967 offset = self._headeroffset
967 offset = self._headeroffset
968 data = self._headerdata[offset:(offset + size)]
968 data = self._headerdata[offset:(offset + size)]
969 self._headeroffset = offset + size
969 self._headeroffset = offset + size
970 return data
970 return data
971
971
972 def _unpackheader(self, format):
972 def _unpackheader(self, format):
973 """read given format from header
973 """read given format from header
974
974
975 This automatically compute the size of the format to read."""
975 This automatically compute the size of the format to read."""
976 data = self._fromheader(struct.calcsize(format))
976 data = self._fromheader(struct.calcsize(format))
977 return _unpack(format, data)
977 return _unpack(format, data)
978
978
979 def _initparams(self, mandatoryparams, advisoryparams):
979 def _initparams(self, mandatoryparams, advisoryparams):
980 """internal function to setup all logic related parameters"""
980 """internal function to setup all logic related parameters"""
981 # make it read only to prevent people touching it by mistake.
981 # make it read only to prevent people touching it by mistake.
982 self.mandatoryparams = tuple(mandatoryparams)
982 self.mandatoryparams = tuple(mandatoryparams)
983 self.advisoryparams = tuple(advisoryparams)
983 self.advisoryparams = tuple(advisoryparams)
984 # user friendly UI
984 # user friendly UI
985 self.params = dict(self.mandatoryparams)
985 self.params = dict(self.mandatoryparams)
986 self.params.update(dict(self.advisoryparams))
986 self.params.update(dict(self.advisoryparams))
987 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
987 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
988
988
989 def _payloadchunks(self, chunknum=0):
989 def _payloadchunks(self, chunknum=0):
990 '''seek to specified chunk and start yielding data'''
990 '''seek to specified chunk and start yielding data'''
991 if len(self._chunkindex) == 0:
991 if len(self._chunkindex) == 0:
992 assert chunknum == 0, 'Must start with chunk 0'
992 assert chunknum == 0, 'Must start with chunk 0'
993 self._chunkindex.append((0, super(unbundlepart, self).tell()))
993 self._chunkindex.append((0, super(unbundlepart, self).tell()))
994 else:
994 else:
995 assert chunknum < len(self._chunkindex), \
995 assert chunknum < len(self._chunkindex), \
996 'Unknown chunk %d' % chunknum
996 'Unknown chunk %d' % chunknum
997 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
997 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
998
998
999 pos = self._chunkindex[chunknum][0]
999 pos = self._chunkindex[chunknum][0]
1000 payloadsize = self._unpack(_fpayloadsize)[0]
1000 payloadsize = self._unpack(_fpayloadsize)[0]
1001 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1001 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1002 while payloadsize:
1002 while payloadsize:
1003 if payloadsize == flaginterrupt:
1003 if payloadsize == flaginterrupt:
1004 # interruption detection, the handler will now read a
1004 # interruption detection, the handler will now read a
1005 # single part and process it.
1005 # single part and process it.
1006 interrupthandler(self.ui, self._fp)()
1006 interrupthandler(self.ui, self._fp)()
1007 elif payloadsize < 0:
1007 elif payloadsize < 0:
1008 msg = 'negative payload chunk size: %i' % payloadsize
1008 msg = 'negative payload chunk size: %i' % payloadsize
1009 raise error.BundleValueError(msg)
1009 raise error.BundleValueError(msg)
1010 else:
1010 else:
1011 result = self._readexact(payloadsize)
1011 result = self._readexact(payloadsize)
1012 chunknum += 1
1012 chunknum += 1
1013 pos += payloadsize
1013 pos += payloadsize
1014 if chunknum == len(self._chunkindex):
1014 if chunknum == len(self._chunkindex):
1015 self._chunkindex.append((pos,
1015 self._chunkindex.append((pos,
1016 super(unbundlepart, self).tell()))
1016 super(unbundlepart, self).tell()))
1017 yield result
1017 yield result
1018 payloadsize = self._unpack(_fpayloadsize)[0]
1018 payloadsize = self._unpack(_fpayloadsize)[0]
1019 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1019 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1020
1020
1021 def _findchunk(self, pos):
1021 def _findchunk(self, pos):
1022 '''for a given payload position, return a chunk number and offset'''
1022 '''for a given payload position, return a chunk number and offset'''
1023 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1023 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1024 if ppos == pos:
1024 if ppos == pos:
1025 return chunk, 0
1025 return chunk, 0
1026 elif ppos > pos:
1026 elif ppos > pos:
1027 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1027 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1028 raise ValueError('Unknown chunk')
1028 raise ValueError('Unknown chunk')
1029
1029
1030 def _readheader(self):
1030 def _readheader(self):
1031 """read the header and setup the object"""
1031 """read the header and setup the object"""
1032 typesize = self._unpackheader(_fparttypesize)[0]
1032 typesize = self._unpackheader(_fparttypesize)[0]
1033 self.type = self._fromheader(typesize)
1033 self.type = self._fromheader(typesize)
1034 indebug(self.ui, 'part type: "%s"' % self.type)
1034 indebug(self.ui, 'part type: "%s"' % self.type)
1035 self.id = self._unpackheader(_fpartid)[0]
1035 self.id = self._unpackheader(_fpartid)[0]
1036 indebug(self.ui, 'part id: "%s"' % self.id)
1036 indebug(self.ui, 'part id: "%s"' % self.id)
1037 # extract mandatory bit from type
1037 # extract mandatory bit from type
1038 self.mandatory = (self.type != self.type.lower())
1038 self.mandatory = (self.type != self.type.lower())
1039 self.type = self.type.lower()
1039 self.type = self.type.lower()
1040 ## reading parameters
1040 ## reading parameters
1041 # param count
1041 # param count
1042 mancount, advcount = self._unpackheader(_fpartparamcount)
1042 mancount, advcount = self._unpackheader(_fpartparamcount)
1043 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1043 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1044 # param size
1044 # param size
1045 fparamsizes = _makefpartparamsizes(mancount + advcount)
1045 fparamsizes = _makefpartparamsizes(mancount + advcount)
1046 paramsizes = self._unpackheader(fparamsizes)
1046 paramsizes = self._unpackheader(fparamsizes)
1047 # make it a list of couple again
1047 # make it a list of couple again
1048 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1048 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1049 # split mandatory from advisory
1049 # split mandatory from advisory
1050 mansizes = paramsizes[:mancount]
1050 mansizes = paramsizes[:mancount]
1051 advsizes = paramsizes[mancount:]
1051 advsizes = paramsizes[mancount:]
1052 # retrieve param value
1052 # retrieve param value
1053 manparams = []
1053 manparams = []
1054 for key, value in mansizes:
1054 for key, value in mansizes:
1055 manparams.append((self._fromheader(key), self._fromheader(value)))
1055 manparams.append((self._fromheader(key), self._fromheader(value)))
1056 advparams = []
1056 advparams = []
1057 for key, value in advsizes:
1057 for key, value in advsizes:
1058 advparams.append((self._fromheader(key), self._fromheader(value)))
1058 advparams.append((self._fromheader(key), self._fromheader(value)))
1059 self._initparams(manparams, advparams)
1059 self._initparams(manparams, advparams)
1060 ## part payload
1060 ## part payload
1061 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1061 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1062 # we read the data, tell it
1062 # we read the data, tell it
1063 self._initialized = True
1063 self._initialized = True
1064
1064
1065 def read(self, size=None):
1065 def read(self, size=None):
1066 """read payload data"""
1066 """read payload data"""
1067 if not self._initialized:
1067 if not self._initialized:
1068 self._readheader()
1068 self._readheader()
1069 if size is None:
1069 if size is None:
1070 data = self._payloadstream.read()
1070 data = self._payloadstream.read()
1071 else:
1071 else:
1072 data = self._payloadstream.read(size)
1072 data = self._payloadstream.read(size)
1073 self._pos += len(data)
1073 self._pos += len(data)
1074 if size is None or len(data) < size:
1074 if size is None or len(data) < size:
1075 if not self.consumed and self._pos:
1075 if not self.consumed and self._pos:
1076 self.ui.debug('bundle2-input-part: total payload size %i\n'
1076 self.ui.debug('bundle2-input-part: total payload size %i\n'
1077 % self._pos)
1077 % self._pos)
1078 self.consumed = True
1078 self.consumed = True
1079 return data
1079 return data
1080
1080
1081 def tell(self):
1081 def tell(self):
1082 return self._pos
1082 return self._pos
1083
1083
1084 def seek(self, offset, whence=0):
1084 def seek(self, offset, whence=0):
1085 if whence == 0:
1085 if whence == 0:
1086 newpos = offset
1086 newpos = offset
1087 elif whence == 1:
1087 elif whence == 1:
1088 newpos = self._pos + offset
1088 newpos = self._pos + offset
1089 elif whence == 2:
1089 elif whence == 2:
1090 if not self.consumed:
1090 if not self.consumed:
1091 self.read()
1091 self.read()
1092 newpos = self._chunkindex[-1][0] - offset
1092 newpos = self._chunkindex[-1][0] - offset
1093 else:
1093 else:
1094 raise ValueError('Unknown whence value: %r' % (whence,))
1094 raise ValueError('Unknown whence value: %r' % (whence,))
1095
1095
1096 if newpos > self._chunkindex[-1][0] and not self.consumed:
1096 if newpos > self._chunkindex[-1][0] and not self.consumed:
1097 self.read()
1097 self.read()
1098 if not 0 <= newpos <= self._chunkindex[-1][0]:
1098 if not 0 <= newpos <= self._chunkindex[-1][0]:
1099 raise ValueError('Offset out of range')
1099 raise ValueError('Offset out of range')
1100
1100
1101 if self._pos != newpos:
1101 if self._pos != newpos:
1102 chunk, internaloffset = self._findchunk(newpos)
1102 chunk, internaloffset = self._findchunk(newpos)
1103 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1103 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1104 adjust = self.read(internaloffset)
1104 adjust = self.read(internaloffset)
1105 if len(adjust) != internaloffset:
1105 if len(adjust) != internaloffset:
1106 raise util.Abort(_('Seek failed\n'))
1106 raise util.Abort(_('Seek failed\n'))
1107 self._pos = newpos
1107 self._pos = newpos
1108
1108
1109 # These are only the static capabilities.
1109 # These are only the static capabilities.
1110 # Check the 'getrepocaps' function for the rest.
1110 # Check the 'getrepocaps' function for the rest.
1111 capabilities = {'HG20': (),
1111 capabilities = {'HG20': (),
1112 'error': ('abort', 'unsupportedcontent', 'pushraced',
1112 'error': ('abort', 'unsupportedcontent', 'pushraced',
1113 'pushkey'),
1113 'pushkey'),
1114 'listkeys': (),
1114 'listkeys': (),
1115 'pushkey': (),
1115 'pushkey': (),
1116 'digests': tuple(sorted(util.DIGESTS.keys())),
1116 'digests': tuple(sorted(util.DIGESTS.keys())),
1117 'remote-changegroup': ('http', 'https'),
1117 'remote-changegroup': ('http', 'https'),
1118 'hgtagsfnodes': (),
1118 'hgtagsfnodes': (),
1119 }
1119 }
1120
1120
1121 def getrepocaps(repo, allowpushback=False):
1121 def getrepocaps(repo, allowpushback=False):
1122 """return the bundle2 capabilities for a given repo
1122 """return the bundle2 capabilities for a given repo
1123
1123
1124 Exists to allow extensions (like evolution) to mutate the capabilities.
1124 Exists to allow extensions (like evolution) to mutate the capabilities.
1125 """
1125 """
1126 caps = capabilities.copy()
1126 caps = capabilities.copy()
1127 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1127 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1128 if obsolete.isenabled(repo, obsolete.exchangeopt):
1128 if obsolete.isenabled(repo, obsolete.exchangeopt):
1129 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1129 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1130 caps['obsmarkers'] = supportedformat
1130 caps['obsmarkers'] = supportedformat
1131 if allowpushback:
1131 if allowpushback:
1132 caps['pushback'] = ()
1132 caps['pushback'] = ()
1133 return caps
1133 return caps
1134
1134
1135 def bundle2caps(remote):
1135 def bundle2caps(remote):
1136 """return the bundle capabilities of a peer as dict"""
1136 """return the bundle capabilities of a peer as dict"""
1137 raw = remote.capable('bundle2')
1137 raw = remote.capable('bundle2')
1138 if not raw and raw != '':
1138 if not raw and raw != '':
1139 return {}
1139 return {}
1140 capsblob = urllib.unquote(remote.capable('bundle2'))
1140 capsblob = urllib.unquote(remote.capable('bundle2'))
1141 return decodecaps(capsblob)
1141 return decodecaps(capsblob)
1142
1142
1143 def obsmarkersversion(caps):
1143 def obsmarkersversion(caps):
1144 """extract the list of supported obsmarkers versions from a bundle2caps dict
1144 """extract the list of supported obsmarkers versions from a bundle2caps dict
1145 """
1145 """
1146 obscaps = caps.get('obsmarkers', ())
1146 obscaps = caps.get('obsmarkers', ())
1147 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1147 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1148
1148
1149 @parthandler('changegroup', ('version', 'nbchanges'))
1149 @parthandler('changegroup', ('version', 'nbchanges'))
1150 def handlechangegroup(op, inpart):
1150 def handlechangegroup(op, inpart):
1151 """apply a changegroup part on the repo
1151 """apply a changegroup part on the repo
1152
1152
1153 This is a very early implementation that will massive rework before being
1153 This is a very early implementation that will massive rework before being
1154 inflicted to any end-user.
1154 inflicted to any end-user.
1155 """
1155 """
1156 # Make sure we trigger a transaction creation
1156 # Make sure we trigger a transaction creation
1157 #
1157 #
1158 # The addchangegroup function will get a transaction object by itself, but
1158 # The addchangegroup function will get a transaction object by itself, but
1159 # we need to make sure we trigger the creation of a transaction object used
1159 # we need to make sure we trigger the creation of a transaction object used
1160 # for the whole processing scope.
1160 # for the whole processing scope.
1161 op.gettransaction()
1161 op.gettransaction()
1162 unpackerversion = inpart.params.get('version', '01')
1162 unpackerversion = inpart.params.get('version', '01')
1163 # We should raise an appropriate exception here
1163 # We should raise an appropriate exception here
1164 unpacker = changegroup.packermap[unpackerversion][1]
1164 unpacker = changegroup.packermap[unpackerversion][1]
1165 cg = unpacker(inpart, 'UN')
1165 cg = unpacker(inpart, 'UN')
1166 # the source and url passed here are overwritten by the one contained in
1166 # the source and url passed here are overwritten by the one contained in
1167 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1167 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1168 nbchangesets = None
1168 nbchangesets = None
1169 if 'nbchanges' in inpart.params:
1169 if 'nbchanges' in inpart.params:
1170 nbchangesets = int(inpart.params.get('nbchanges'))
1170 nbchangesets = int(inpart.params.get('nbchanges'))
1171 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2',
1171 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2',
1172 expectedtotal=nbchangesets)
1172 expectedtotal=nbchangesets)
1173 op.records.add('changegroup', {'return': ret})
1173 op.records.add('changegroup', {'return': ret})
1174 if op.reply is not None:
1174 if op.reply is not None:
1175 # This is definitely not the final form of this
1175 # This is definitely not the final form of this
1176 # return. But one need to start somewhere.
1176 # return. But one need to start somewhere.
1177 part = op.reply.newpart('reply:changegroup', mandatory=False)
1177 part = op.reply.newpart('reply:changegroup', mandatory=False)
1178 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1178 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1179 part.addparam('return', '%i' % ret, mandatory=False)
1179 part.addparam('return', '%i' % ret, mandatory=False)
1180 assert not inpart.read()
1180 assert not inpart.read()
1181
1181
1182 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1182 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1183 ['digest:%s' % k for k in util.DIGESTS.keys()])
1183 ['digest:%s' % k for k in util.DIGESTS.keys()])
1184 @parthandler('remote-changegroup', _remotechangegroupparams)
1184 @parthandler('remote-changegroup', _remotechangegroupparams)
1185 def handleremotechangegroup(op, inpart):
1185 def handleremotechangegroup(op, inpart):
1186 """apply a bundle10 on the repo, given an url and validation information
1186 """apply a bundle10 on the repo, given an url and validation information
1187
1187
1188 All the information about the remote bundle to import are given as
1188 All the information about the remote bundle to import are given as
1189 parameters. The parameters include:
1189 parameters. The parameters include:
1190 - url: the url to the bundle10.
1190 - url: the url to the bundle10.
1191 - size: the bundle10 file size. It is used to validate what was
1191 - size: the bundle10 file size. It is used to validate what was
1192 retrieved by the client matches the server knowledge about the bundle.
1192 retrieved by the client matches the server knowledge about the bundle.
1193 - digests: a space separated list of the digest types provided as
1193 - digests: a space separated list of the digest types provided as
1194 parameters.
1194 parameters.
1195 - digest:<digest-type>: the hexadecimal representation of the digest with
1195 - digest:<digest-type>: the hexadecimal representation of the digest with
1196 that name. Like the size, it is used to validate what was retrieved by
1196 that name. Like the size, it is used to validate what was retrieved by
1197 the client matches what the server knows about the bundle.
1197 the client matches what the server knows about the bundle.
1198
1198
1199 When multiple digest types are given, all of them are checked.
1199 When multiple digest types are given, all of them are checked.
1200 """
1200 """
1201 try:
1201 try:
1202 raw_url = inpart.params['url']
1202 raw_url = inpart.params['url']
1203 except KeyError:
1203 except KeyError:
1204 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1204 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1205 parsed_url = util.url(raw_url)
1205 parsed_url = util.url(raw_url)
1206 if parsed_url.scheme not in capabilities['remote-changegroup']:
1206 if parsed_url.scheme not in capabilities['remote-changegroup']:
1207 raise util.Abort(_('remote-changegroup does not support %s urls') %
1207 raise util.Abort(_('remote-changegroup does not support %s urls') %
1208 parsed_url.scheme)
1208 parsed_url.scheme)
1209
1209
1210 try:
1210 try:
1211 size = int(inpart.params['size'])
1211 size = int(inpart.params['size'])
1212 except ValueError:
1212 except ValueError:
1213 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1213 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1214 % 'size')
1214 % 'size')
1215 except KeyError:
1215 except KeyError:
1216 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1216 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1217
1217
1218 digests = {}
1218 digests = {}
1219 for typ in inpart.params.get('digests', '').split():
1219 for typ in inpart.params.get('digests', '').split():
1220 param = 'digest:%s' % typ
1220 param = 'digest:%s' % typ
1221 try:
1221 try:
1222 value = inpart.params[param]
1222 value = inpart.params[param]
1223 except KeyError:
1223 except KeyError:
1224 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1224 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1225 param)
1225 param)
1226 digests[typ] = value
1226 digests[typ] = value
1227
1227
1228 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1228 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1229
1229
1230 # Make sure we trigger a transaction creation
1230 # Make sure we trigger a transaction creation
1231 #
1231 #
1232 # The addchangegroup function will get a transaction object by itself, but
1232 # The addchangegroup function will get a transaction object by itself, but
1233 # we need to make sure we trigger the creation of a transaction object used
1233 # we need to make sure we trigger the creation of a transaction object used
1234 # for the whole processing scope.
1234 # for the whole processing scope.
1235 op.gettransaction()
1235 op.gettransaction()
1236 import exchange
1236 import exchange
1237 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1237 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1238 if not isinstance(cg, changegroup.cg1unpacker):
1238 if not isinstance(cg, changegroup.cg1unpacker):
1239 raise util.Abort(_('%s: not a bundle version 1.0') %
1239 raise util.Abort(_('%s: not a bundle version 1.0') %
1240 util.hidepassword(raw_url))
1240 util.hidepassword(raw_url))
1241 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1241 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1242 op.records.add('changegroup', {'return': ret})
1242 op.records.add('changegroup', {'return': ret})
1243 if op.reply is not None:
1243 if op.reply is not None:
1244 # This is definitely not the final form of this
1244 # This is definitely not the final form of this
1245 # return. But one need to start somewhere.
1245 # return. But one need to start somewhere.
1246 part = op.reply.newpart('reply:changegroup')
1246 part = op.reply.newpart('reply:changegroup')
1247 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1247 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1248 part.addparam('return', '%i' % ret, mandatory=False)
1248 part.addparam('return', '%i' % ret, mandatory=False)
1249 try:
1249 try:
1250 real_part.validate()
1250 real_part.validate()
1251 except util.Abort, e:
1251 except util.Abort, e:
1252 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1252 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1253 (util.hidepassword(raw_url), str(e)))
1253 (util.hidepassword(raw_url), str(e)))
1254 assert not inpart.read()
1254 assert not inpart.read()
1255
1255
1256 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1256 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1257 def handlereplychangegroup(op, inpart):
1257 def handlereplychangegroup(op, inpart):
1258 ret = int(inpart.params['return'])
1258 ret = int(inpart.params['return'])
1259 replyto = int(inpart.params['in-reply-to'])
1259 replyto = int(inpart.params['in-reply-to'])
1260 op.records.add('changegroup', {'return': ret}, replyto)
1260 op.records.add('changegroup', {'return': ret}, replyto)
1261
1261
1262 @parthandler('check:heads')
1262 @parthandler('check:heads')
1263 def handlecheckheads(op, inpart):
1263 def handlecheckheads(op, inpart):
1264 """check that head of the repo did not change
1264 """check that head of the repo did not change
1265
1265
1266 This is used to detect a push race when using unbundle.
1266 This is used to detect a push race when using unbundle.
1267 This replaces the "heads" argument of unbundle."""
1267 This replaces the "heads" argument of unbundle."""
1268 h = inpart.read(20)
1268 h = inpart.read(20)
1269 heads = []
1269 heads = []
1270 while len(h) == 20:
1270 while len(h) == 20:
1271 heads.append(h)
1271 heads.append(h)
1272 h = inpart.read(20)
1272 h = inpart.read(20)
1273 assert not h
1273 assert not h
1274 if heads != op.repo.heads():
1274 if heads != op.repo.heads():
1275 raise error.PushRaced('repository changed while pushing - '
1275 raise error.PushRaced('repository changed while pushing - '
1276 'please try again')
1276 'please try again')
1277
1277
1278 @parthandler('output')
1278 @parthandler('output')
1279 def handleoutput(op, inpart):
1279 def handleoutput(op, inpart):
1280 """forward output captured on the server to the client"""
1280 """forward output captured on the server to the client"""
1281 for line in inpart.read().splitlines():
1281 for line in inpart.read().splitlines():
1282 op.ui.status(('remote: %s\n' % line))
1282 op.ui.status(('remote: %s\n' % line))
1283
1283
1284 @parthandler('replycaps')
1284 @parthandler('replycaps')
1285 def handlereplycaps(op, inpart):
1285 def handlereplycaps(op, inpart):
1286 """Notify that a reply bundle should be created
1286 """Notify that a reply bundle should be created
1287
1287
1288 The payload contains the capabilities information for the reply"""
1288 The payload contains the capabilities information for the reply"""
1289 caps = decodecaps(inpart.read())
1289 caps = decodecaps(inpart.read())
1290 if op.reply is None:
1290 if op.reply is None:
1291 op.reply = bundle20(op.ui, caps)
1291 op.reply = bundle20(op.ui, caps)
1292
1292
1293 @parthandler('error:abort', ('message', 'hint'))
1293 @parthandler('error:abort', ('message', 'hint'))
1294 def handleerrorabort(op, inpart):
1294 def handleerrorabort(op, inpart):
1295 """Used to transmit abort error over the wire"""
1295 """Used to transmit abort error over the wire"""
1296 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1296 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1297
1297
1298 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1298 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1299 'in-reply-to'))
1299 'in-reply-to'))
1300 def handleerrorpushkey(op, inpart):
1300 def handleerrorpushkey(op, inpart):
1301 """Used to transmit failure of a mandatory pushkey over the wire"""
1301 """Used to transmit failure of a mandatory pushkey over the wire"""
1302 kwargs = {}
1302 kwargs = {}
1303 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1303 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1304 value = inpart.params.get(name)
1304 value = inpart.params.get(name)
1305 if value is not None:
1305 if value is not None:
1306 kwargs[name] = value
1306 kwargs[name] = value
1307 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1307 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1308
1308
1309 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1309 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1310 def handleerrorunsupportedcontent(op, inpart):
1310 def handleerrorunsupportedcontent(op, inpart):
1311 """Used to transmit unknown content error over the wire"""
1311 """Used to transmit unknown content error over the wire"""
1312 kwargs = {}
1312 kwargs = {}
1313 parttype = inpart.params.get('parttype')
1313 parttype = inpart.params.get('parttype')
1314 if parttype is not None:
1314 if parttype is not None:
1315 kwargs['parttype'] = parttype
1315 kwargs['parttype'] = parttype
1316 params = inpart.params.get('params')
1316 params = inpart.params.get('params')
1317 if params is not None:
1317 if params is not None:
1318 kwargs['params'] = params.split('\0')
1318 kwargs['params'] = params.split('\0')
1319
1319
1320 raise error.UnsupportedPartError(**kwargs)
1320 raise error.UnsupportedPartError(**kwargs)
1321
1321
1322 @parthandler('error:pushraced', ('message',))
1322 @parthandler('error:pushraced', ('message',))
1323 def handleerrorpushraced(op, inpart):
1323 def handleerrorpushraced(op, inpart):
1324 """Used to transmit push race error over the wire"""
1324 """Used to transmit push race error over the wire"""
1325 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1325 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1326
1326
1327 @parthandler('listkeys', ('namespace',))
1327 @parthandler('listkeys', ('namespace',))
1328 def handlelistkeys(op, inpart):
1328 def handlelistkeys(op, inpart):
1329 """retrieve pushkey namespace content stored in a bundle2"""
1329 """retrieve pushkey namespace content stored in a bundle2"""
1330 namespace = inpart.params['namespace']
1330 namespace = inpart.params['namespace']
1331 r = pushkey.decodekeys(inpart.read())
1331 r = pushkey.decodekeys(inpart.read())
1332 op.records.add('listkeys', (namespace, r))
1332 op.records.add('listkeys', (namespace, r))
1333
1333
1334 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1334 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1335 def handlepushkey(op, inpart):
1335 def handlepushkey(op, inpart):
1336 """process a pushkey request"""
1336 """process a pushkey request"""
1337 dec = pushkey.decode
1337 dec = pushkey.decode
1338 namespace = dec(inpart.params['namespace'])
1338 namespace = dec(inpart.params['namespace'])
1339 key = dec(inpart.params['key'])
1339 key = dec(inpart.params['key'])
1340 old = dec(inpart.params['old'])
1340 old = dec(inpart.params['old'])
1341 new = dec(inpart.params['new'])
1341 new = dec(inpart.params['new'])
1342 ret = op.repo.pushkey(namespace, key, old, new)
1342 ret = op.repo.pushkey(namespace, key, old, new)
1343 record = {'namespace': namespace,
1343 record = {'namespace': namespace,
1344 'key': key,
1344 'key': key,
1345 'old': old,
1345 'old': old,
1346 'new': new}
1346 'new': new}
1347 op.records.add('pushkey', record)
1347 op.records.add('pushkey', record)
1348 if op.reply is not None:
1348 if op.reply is not None:
1349 rpart = op.reply.newpart('reply:pushkey')
1349 rpart = op.reply.newpart('reply:pushkey')
1350 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1350 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1351 rpart.addparam('return', '%i' % ret, mandatory=False)
1351 rpart.addparam('return', '%i' % ret, mandatory=False)
1352 if inpart.mandatory and not ret:
1352 if inpart.mandatory and not ret:
1353 kwargs = {}
1353 kwargs = {}
1354 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1354 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1355 if key in inpart.params:
1355 if key in inpart.params:
1356 kwargs[key] = inpart.params[key]
1356 kwargs[key] = inpart.params[key]
1357 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1357 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1358
1358
1359 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1359 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1360 def handlepushkeyreply(op, inpart):
1360 def handlepushkeyreply(op, inpart):
1361 """retrieve the result of a pushkey request"""
1361 """retrieve the result of a pushkey request"""
1362 ret = int(inpart.params['return'])
1362 ret = int(inpart.params['return'])
1363 partid = int(inpart.params['in-reply-to'])
1363 partid = int(inpart.params['in-reply-to'])
1364 op.records.add('pushkey', {'return': ret}, partid)
1364 op.records.add('pushkey', {'return': ret}, partid)
1365
1365
1366 @parthandler('obsmarkers')
1366 @parthandler('obsmarkers')
1367 def handleobsmarker(op, inpart):
1367 def handleobsmarker(op, inpart):
1368 """add a stream of obsmarkers to the repo"""
1368 """add a stream of obsmarkers to the repo"""
1369 tr = op.gettransaction()
1369 tr = op.gettransaction()
1370 markerdata = inpart.read()
1370 markerdata = inpart.read()
1371 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1371 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1372 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1372 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1373 % len(markerdata))
1373 % len(markerdata))
1374 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1374 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1375 if new:
1375 if new:
1376 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1376 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1377 op.records.add('obsmarkers', {'new': new})
1377 op.records.add('obsmarkers', {'new': new})
1378 if op.reply is not None:
1378 if op.reply is not None:
1379 rpart = op.reply.newpart('reply:obsmarkers')
1379 rpart = op.reply.newpart('reply:obsmarkers')
1380 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1380 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1381 rpart.addparam('new', '%i' % new, mandatory=False)
1381 rpart.addparam('new', '%i' % new, mandatory=False)
1382
1382
1383
1383
1384 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1384 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1385 def handleobsmarkerreply(op, inpart):
1385 def handleobsmarkerreply(op, inpart):
1386 """retrieve the result of a pushkey request"""
1386 """retrieve the result of a pushkey request"""
1387 ret = int(inpart.params['new'])
1387 ret = int(inpart.params['new'])
1388 partid = int(inpart.params['in-reply-to'])
1388 partid = int(inpart.params['in-reply-to'])
1389 op.records.add('obsmarkers', {'new': ret}, partid)
1389 op.records.add('obsmarkers', {'new': ret}, partid)
1390
1390
1391 @parthandler('hgtagsfnodes')
1391 @parthandler('hgtagsfnodes')
1392 def handlehgtagsfnodes(op, inpart):
1392 def handlehgtagsfnodes(op, inpart):
1393 """Applies .hgtags fnodes cache entries to the local repo.
1393 """Applies .hgtags fnodes cache entries to the local repo.
1394
1394
1395 Payload is pairs of 20 byte changeset nodes and filenodes.
1395 Payload is pairs of 20 byte changeset nodes and filenodes.
1396 """
1396 """
1397 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1397 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1398
1398
1399 count = 0
1399 count = 0
1400 while True:
1400 while True:
1401 node = inpart.read(20)
1401 node = inpart.read(20)
1402 fnode = inpart.read(20)
1402 fnode = inpart.read(20)
1403 if len(node) < 20 or len(fnode) < 20:
1403 if len(node) < 20 or len(fnode) < 20:
1404 op.ui.debug('received incomplete .hgtags fnodes data, ignoring\n')
1404 op.ui.debug('received incomplete .hgtags fnodes data, ignoring\n')
1405 break
1405 break
1406 cache.setfnode(node, fnode)
1406 cache.setfnode(node, fnode)
1407 count += 1
1407 count += 1
1408
1408
1409 cache.write()
1409 cache.write()
1410 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1410 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
@@ -1,1572 +1,1572
1 # exchange.py - utility to exchange data between repos.
1 # exchange.py - utility to exchange data between repos.
2 #
2 #
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import time
8 import time
9 from i18n import _
9 from i18n import _
10 from node import hex, nullid
10 from node import hex, nullid
11 import errno, urllib
11 import errno, urllib
12 import util, scmutil, changegroup, base85, error, store
12 import util, scmutil, changegroup, base85, error, store
13 import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey
13 import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey
14 import lock as lockmod
14 import lock as lockmod
15 import tags
15 import tags
16
16
17 def readbundle(ui, fh, fname, vfs=None):
17 def readbundle(ui, fh, fname, vfs=None):
18 header = changegroup.readexactly(fh, 4)
18 header = changegroup.readexactly(fh, 4)
19
19
20 alg = None
20 alg = None
21 if not fname:
21 if not fname:
22 fname = "stream"
22 fname = "stream"
23 if not header.startswith('HG') and header.startswith('\0'):
23 if not header.startswith('HG') and header.startswith('\0'):
24 fh = changegroup.headerlessfixup(fh, header)
24 fh = changegroup.headerlessfixup(fh, header)
25 header = "HG10"
25 header = "HG10"
26 alg = 'UN'
26 alg = 'UN'
27 elif vfs:
27 elif vfs:
28 fname = vfs.join(fname)
28 fname = vfs.join(fname)
29
29
30 magic, version = header[0:2], header[2:4]
30 magic, version = header[0:2], header[2:4]
31
31
32 if magic != 'HG':
32 if magic != 'HG':
33 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
33 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
34 if version == '10':
34 if version == '10':
35 if alg is None:
35 if alg is None:
36 alg = changegroup.readexactly(fh, 2)
36 alg = changegroup.readexactly(fh, 2)
37 return changegroup.cg1unpacker(fh, alg)
37 return changegroup.cg1unpacker(fh, alg)
38 elif version.startswith('2'):
38 elif version.startswith('2'):
39 return bundle2.getunbundler(ui, fh, header=magic + version)
39 return bundle2.getunbundler(ui, fh, magicstring=magic + version)
40 else:
40 else:
41 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
41 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
42
42
43 def buildobsmarkerspart(bundler, markers):
43 def buildobsmarkerspart(bundler, markers):
44 """add an obsmarker part to the bundler with <markers>
44 """add an obsmarker part to the bundler with <markers>
45
45
46 No part is created if markers is empty.
46 No part is created if markers is empty.
47 Raises ValueError if the bundler doesn't support any known obsmarker format.
47 Raises ValueError if the bundler doesn't support any known obsmarker format.
48 """
48 """
49 if markers:
49 if markers:
50 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
50 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
51 version = obsolete.commonversion(remoteversions)
51 version = obsolete.commonversion(remoteversions)
52 if version is None:
52 if version is None:
53 raise ValueError('bundler do not support common obsmarker format')
53 raise ValueError('bundler do not support common obsmarker format')
54 stream = obsolete.encodemarkers(markers, True, version=version)
54 stream = obsolete.encodemarkers(markers, True, version=version)
55 return bundler.newpart('obsmarkers', data=stream)
55 return bundler.newpart('obsmarkers', data=stream)
56 return None
56 return None
57
57
58 def _canusebundle2(op):
58 def _canusebundle2(op):
59 """return true if a pull/push can use bundle2
59 """return true if a pull/push can use bundle2
60
60
61 Feel free to nuke this function when we drop the experimental option"""
61 Feel free to nuke this function when we drop the experimental option"""
62 return (op.repo.ui.configbool('experimental', 'bundle2-exp', True)
62 return (op.repo.ui.configbool('experimental', 'bundle2-exp', True)
63 and op.remote.capable('bundle2'))
63 and op.remote.capable('bundle2'))
64
64
65
65
66 class pushoperation(object):
66 class pushoperation(object):
67 """A object that represent a single push operation
67 """A object that represent a single push operation
68
68
69 It purpose is to carry push related state and very common operation.
69 It purpose is to carry push related state and very common operation.
70
70
71 A new should be created at the beginning of each push and discarded
71 A new should be created at the beginning of each push and discarded
72 afterward.
72 afterward.
73 """
73 """
74
74
75 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
75 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
76 bookmarks=()):
76 bookmarks=()):
77 # repo we push from
77 # repo we push from
78 self.repo = repo
78 self.repo = repo
79 self.ui = repo.ui
79 self.ui = repo.ui
80 # repo we push to
80 # repo we push to
81 self.remote = remote
81 self.remote = remote
82 # force option provided
82 # force option provided
83 self.force = force
83 self.force = force
84 # revs to be pushed (None is "all")
84 # revs to be pushed (None is "all")
85 self.revs = revs
85 self.revs = revs
86 # bookmark explicitly pushed
86 # bookmark explicitly pushed
87 self.bookmarks = bookmarks
87 self.bookmarks = bookmarks
88 # allow push of new branch
88 # allow push of new branch
89 self.newbranch = newbranch
89 self.newbranch = newbranch
90 # did a local lock get acquired?
90 # did a local lock get acquired?
91 self.locallocked = None
91 self.locallocked = None
92 # step already performed
92 # step already performed
93 # (used to check what steps have been already performed through bundle2)
93 # (used to check what steps have been already performed through bundle2)
94 self.stepsdone = set()
94 self.stepsdone = set()
95 # Integer version of the changegroup push result
95 # Integer version of the changegroup push result
96 # - None means nothing to push
96 # - None means nothing to push
97 # - 0 means HTTP error
97 # - 0 means HTTP error
98 # - 1 means we pushed and remote head count is unchanged *or*
98 # - 1 means we pushed and remote head count is unchanged *or*
99 # we have outgoing changesets but refused to push
99 # we have outgoing changesets but refused to push
100 # - other values as described by addchangegroup()
100 # - other values as described by addchangegroup()
101 self.cgresult = None
101 self.cgresult = None
102 # Boolean value for the bookmark push
102 # Boolean value for the bookmark push
103 self.bkresult = None
103 self.bkresult = None
104 # discover.outgoing object (contains common and outgoing data)
104 # discover.outgoing object (contains common and outgoing data)
105 self.outgoing = None
105 self.outgoing = None
106 # all remote heads before the push
106 # all remote heads before the push
107 self.remoteheads = None
107 self.remoteheads = None
108 # testable as a boolean indicating if any nodes are missing locally.
108 # testable as a boolean indicating if any nodes are missing locally.
109 self.incoming = None
109 self.incoming = None
110 # phases changes that must be pushed along side the changesets
110 # phases changes that must be pushed along side the changesets
111 self.outdatedphases = None
111 self.outdatedphases = None
112 # phases changes that must be pushed if changeset push fails
112 # phases changes that must be pushed if changeset push fails
113 self.fallbackoutdatedphases = None
113 self.fallbackoutdatedphases = None
114 # outgoing obsmarkers
114 # outgoing obsmarkers
115 self.outobsmarkers = set()
115 self.outobsmarkers = set()
116 # outgoing bookmarks
116 # outgoing bookmarks
117 self.outbookmarks = []
117 self.outbookmarks = []
118 # transaction manager
118 # transaction manager
119 self.trmanager = None
119 self.trmanager = None
120 # map { pushkey partid -> callback handling failure}
120 # map { pushkey partid -> callback handling failure}
121 # used to handle exception from mandatory pushkey part failure
121 # used to handle exception from mandatory pushkey part failure
122 self.pkfailcb = {}
122 self.pkfailcb = {}
123
123
124 @util.propertycache
124 @util.propertycache
125 def futureheads(self):
125 def futureheads(self):
126 """future remote heads if the changeset push succeeds"""
126 """future remote heads if the changeset push succeeds"""
127 return self.outgoing.missingheads
127 return self.outgoing.missingheads
128
128
129 @util.propertycache
129 @util.propertycache
130 def fallbackheads(self):
130 def fallbackheads(self):
131 """future remote heads if the changeset push fails"""
131 """future remote heads if the changeset push fails"""
132 if self.revs is None:
132 if self.revs is None:
133 # not target to push, all common are relevant
133 # not target to push, all common are relevant
134 return self.outgoing.commonheads
134 return self.outgoing.commonheads
135 unfi = self.repo.unfiltered()
135 unfi = self.repo.unfiltered()
136 # I want cheads = heads(::missingheads and ::commonheads)
136 # I want cheads = heads(::missingheads and ::commonheads)
137 # (missingheads is revs with secret changeset filtered out)
137 # (missingheads is revs with secret changeset filtered out)
138 #
138 #
139 # This can be expressed as:
139 # This can be expressed as:
140 # cheads = ( (missingheads and ::commonheads)
140 # cheads = ( (missingheads and ::commonheads)
141 # + (commonheads and ::missingheads))"
141 # + (commonheads and ::missingheads))"
142 # )
142 # )
143 #
143 #
144 # while trying to push we already computed the following:
144 # while trying to push we already computed the following:
145 # common = (::commonheads)
145 # common = (::commonheads)
146 # missing = ((commonheads::missingheads) - commonheads)
146 # missing = ((commonheads::missingheads) - commonheads)
147 #
147 #
148 # We can pick:
148 # We can pick:
149 # * missingheads part of common (::commonheads)
149 # * missingheads part of common (::commonheads)
150 common = set(self.outgoing.common)
150 common = set(self.outgoing.common)
151 nm = self.repo.changelog.nodemap
151 nm = self.repo.changelog.nodemap
152 cheads = [node for node in self.revs if nm[node] in common]
152 cheads = [node for node in self.revs if nm[node] in common]
153 # and
153 # and
154 # * commonheads parents on missing
154 # * commonheads parents on missing
155 revset = unfi.set('%ln and parents(roots(%ln))',
155 revset = unfi.set('%ln and parents(roots(%ln))',
156 self.outgoing.commonheads,
156 self.outgoing.commonheads,
157 self.outgoing.missing)
157 self.outgoing.missing)
158 cheads.extend(c.node() for c in revset)
158 cheads.extend(c.node() for c in revset)
159 return cheads
159 return cheads
160
160
161 @property
161 @property
162 def commonheads(self):
162 def commonheads(self):
163 """set of all common heads after changeset bundle push"""
163 """set of all common heads after changeset bundle push"""
164 if self.cgresult:
164 if self.cgresult:
165 return self.futureheads
165 return self.futureheads
166 else:
166 else:
167 return self.fallbackheads
167 return self.fallbackheads
168
168
169 # mapping of message used when pushing bookmark
169 # mapping of message used when pushing bookmark
170 bookmsgmap = {'update': (_("updating bookmark %s\n"),
170 bookmsgmap = {'update': (_("updating bookmark %s\n"),
171 _('updating bookmark %s failed!\n')),
171 _('updating bookmark %s failed!\n')),
172 'export': (_("exporting bookmark %s\n"),
172 'export': (_("exporting bookmark %s\n"),
173 _('exporting bookmark %s failed!\n')),
173 _('exporting bookmark %s failed!\n')),
174 'delete': (_("deleting remote bookmark %s\n"),
174 'delete': (_("deleting remote bookmark %s\n"),
175 _('deleting remote bookmark %s failed!\n')),
175 _('deleting remote bookmark %s failed!\n')),
176 }
176 }
177
177
178
178
179 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=()):
179 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=()):
180 '''Push outgoing changesets (limited by revs) from a local
180 '''Push outgoing changesets (limited by revs) from a local
181 repository to remote. Return an integer:
181 repository to remote. Return an integer:
182 - None means nothing to push
182 - None means nothing to push
183 - 0 means HTTP error
183 - 0 means HTTP error
184 - 1 means we pushed and remote head count is unchanged *or*
184 - 1 means we pushed and remote head count is unchanged *or*
185 we have outgoing changesets but refused to push
185 we have outgoing changesets but refused to push
186 - other values as described by addchangegroup()
186 - other values as described by addchangegroup()
187 '''
187 '''
188 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks)
188 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks)
189 if pushop.remote.local():
189 if pushop.remote.local():
190 missing = (set(pushop.repo.requirements)
190 missing = (set(pushop.repo.requirements)
191 - pushop.remote.local().supported)
191 - pushop.remote.local().supported)
192 if missing:
192 if missing:
193 msg = _("required features are not"
193 msg = _("required features are not"
194 " supported in the destination:"
194 " supported in the destination:"
195 " %s") % (', '.join(sorted(missing)))
195 " %s") % (', '.join(sorted(missing)))
196 raise util.Abort(msg)
196 raise util.Abort(msg)
197
197
198 # there are two ways to push to remote repo:
198 # there are two ways to push to remote repo:
199 #
199 #
200 # addchangegroup assumes local user can lock remote
200 # addchangegroup assumes local user can lock remote
201 # repo (local filesystem, old ssh servers).
201 # repo (local filesystem, old ssh servers).
202 #
202 #
203 # unbundle assumes local user cannot lock remote repo (new ssh
203 # unbundle assumes local user cannot lock remote repo (new ssh
204 # servers, http servers).
204 # servers, http servers).
205
205
206 if not pushop.remote.canpush():
206 if not pushop.remote.canpush():
207 raise util.Abort(_("destination does not support push"))
207 raise util.Abort(_("destination does not support push"))
208 # get local lock as we might write phase data
208 # get local lock as we might write phase data
209 localwlock = locallock = None
209 localwlock = locallock = None
210 try:
210 try:
211 # bundle2 push may receive a reply bundle touching bookmarks or other
211 # bundle2 push may receive a reply bundle touching bookmarks or other
212 # things requiring the wlock. Take it now to ensure proper ordering.
212 # things requiring the wlock. Take it now to ensure proper ordering.
213 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
213 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
214 if _canusebundle2(pushop) and maypushback:
214 if _canusebundle2(pushop) and maypushback:
215 localwlock = pushop.repo.wlock()
215 localwlock = pushop.repo.wlock()
216 locallock = pushop.repo.lock()
216 locallock = pushop.repo.lock()
217 pushop.locallocked = True
217 pushop.locallocked = True
218 except IOError, err:
218 except IOError, err:
219 pushop.locallocked = False
219 pushop.locallocked = False
220 if err.errno != errno.EACCES:
220 if err.errno != errno.EACCES:
221 raise
221 raise
222 # source repo cannot be locked.
222 # source repo cannot be locked.
223 # We do not abort the push, but just disable the local phase
223 # We do not abort the push, but just disable the local phase
224 # synchronisation.
224 # synchronisation.
225 msg = 'cannot lock source repository: %s\n' % err
225 msg = 'cannot lock source repository: %s\n' % err
226 pushop.ui.debug(msg)
226 pushop.ui.debug(msg)
227 try:
227 try:
228 if pushop.locallocked:
228 if pushop.locallocked:
229 pushop.trmanager = transactionmanager(repo,
229 pushop.trmanager = transactionmanager(repo,
230 'push-response',
230 'push-response',
231 pushop.remote.url())
231 pushop.remote.url())
232 pushop.repo.checkpush(pushop)
232 pushop.repo.checkpush(pushop)
233 lock = None
233 lock = None
234 unbundle = pushop.remote.capable('unbundle')
234 unbundle = pushop.remote.capable('unbundle')
235 if not unbundle:
235 if not unbundle:
236 lock = pushop.remote.lock()
236 lock = pushop.remote.lock()
237 try:
237 try:
238 _pushdiscovery(pushop)
238 _pushdiscovery(pushop)
239 if _canusebundle2(pushop):
239 if _canusebundle2(pushop):
240 _pushbundle2(pushop)
240 _pushbundle2(pushop)
241 _pushchangeset(pushop)
241 _pushchangeset(pushop)
242 _pushsyncphase(pushop)
242 _pushsyncphase(pushop)
243 _pushobsolete(pushop)
243 _pushobsolete(pushop)
244 _pushbookmark(pushop)
244 _pushbookmark(pushop)
245 finally:
245 finally:
246 if lock is not None:
246 if lock is not None:
247 lock.release()
247 lock.release()
248 if pushop.trmanager:
248 if pushop.trmanager:
249 pushop.trmanager.close()
249 pushop.trmanager.close()
250 finally:
250 finally:
251 if pushop.trmanager:
251 if pushop.trmanager:
252 pushop.trmanager.release()
252 pushop.trmanager.release()
253 if locallock is not None:
253 if locallock is not None:
254 locallock.release()
254 locallock.release()
255 if localwlock is not None:
255 if localwlock is not None:
256 localwlock.release()
256 localwlock.release()
257
257
258 return pushop
258 return pushop
259
259
260 # list of steps to perform discovery before push
260 # list of steps to perform discovery before push
261 pushdiscoveryorder = []
261 pushdiscoveryorder = []
262
262
263 # Mapping between step name and function
263 # Mapping between step name and function
264 #
264 #
265 # This exists to help extensions wrap steps if necessary
265 # This exists to help extensions wrap steps if necessary
266 pushdiscoverymapping = {}
266 pushdiscoverymapping = {}
267
267
268 def pushdiscovery(stepname):
268 def pushdiscovery(stepname):
269 """decorator for function performing discovery before push
269 """decorator for function performing discovery before push
270
270
271 The function is added to the step -> function mapping and appended to the
271 The function is added to the step -> function mapping and appended to the
272 list of steps. Beware that decorated function will be added in order (this
272 list of steps. Beware that decorated function will be added in order (this
273 may matter).
273 may matter).
274
274
275 You can only use this decorator for a new step, if you want to wrap a step
275 You can only use this decorator for a new step, if you want to wrap a step
276 from an extension, change the pushdiscovery dictionary directly."""
276 from an extension, change the pushdiscovery dictionary directly."""
277 def dec(func):
277 def dec(func):
278 assert stepname not in pushdiscoverymapping
278 assert stepname not in pushdiscoverymapping
279 pushdiscoverymapping[stepname] = func
279 pushdiscoverymapping[stepname] = func
280 pushdiscoveryorder.append(stepname)
280 pushdiscoveryorder.append(stepname)
281 return func
281 return func
282 return dec
282 return dec
283
283
284 def _pushdiscovery(pushop):
284 def _pushdiscovery(pushop):
285 """Run all discovery steps"""
285 """Run all discovery steps"""
286 for stepname in pushdiscoveryorder:
286 for stepname in pushdiscoveryorder:
287 step = pushdiscoverymapping[stepname]
287 step = pushdiscoverymapping[stepname]
288 step(pushop)
288 step(pushop)
289
289
290 @pushdiscovery('changeset')
290 @pushdiscovery('changeset')
291 def _pushdiscoverychangeset(pushop):
291 def _pushdiscoverychangeset(pushop):
292 """discover the changeset that need to be pushed"""
292 """discover the changeset that need to be pushed"""
293 fci = discovery.findcommonincoming
293 fci = discovery.findcommonincoming
294 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
294 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
295 common, inc, remoteheads = commoninc
295 common, inc, remoteheads = commoninc
296 fco = discovery.findcommonoutgoing
296 fco = discovery.findcommonoutgoing
297 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
297 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
298 commoninc=commoninc, force=pushop.force)
298 commoninc=commoninc, force=pushop.force)
299 pushop.outgoing = outgoing
299 pushop.outgoing = outgoing
300 pushop.remoteheads = remoteheads
300 pushop.remoteheads = remoteheads
301 pushop.incoming = inc
301 pushop.incoming = inc
302
302
303 @pushdiscovery('phase')
303 @pushdiscovery('phase')
304 def _pushdiscoveryphase(pushop):
304 def _pushdiscoveryphase(pushop):
305 """discover the phase that needs to be pushed
305 """discover the phase that needs to be pushed
306
306
307 (computed for both success and failure case for changesets push)"""
307 (computed for both success and failure case for changesets push)"""
308 outgoing = pushop.outgoing
308 outgoing = pushop.outgoing
309 unfi = pushop.repo.unfiltered()
309 unfi = pushop.repo.unfiltered()
310 remotephases = pushop.remote.listkeys('phases')
310 remotephases = pushop.remote.listkeys('phases')
311 publishing = remotephases.get('publishing', False)
311 publishing = remotephases.get('publishing', False)
312 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
312 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
313 and remotephases # server supports phases
313 and remotephases # server supports phases
314 and not pushop.outgoing.missing # no changesets to be pushed
314 and not pushop.outgoing.missing # no changesets to be pushed
315 and publishing):
315 and publishing):
316 # When:
316 # When:
317 # - this is a subrepo push
317 # - this is a subrepo push
318 # - and remote support phase
318 # - and remote support phase
319 # - and no changeset are to be pushed
319 # - and no changeset are to be pushed
320 # - and remote is publishing
320 # - and remote is publishing
321 # We may be in issue 3871 case!
321 # We may be in issue 3871 case!
322 # We drop the possible phase synchronisation done by
322 # We drop the possible phase synchronisation done by
323 # courtesy to publish changesets possibly locally draft
323 # courtesy to publish changesets possibly locally draft
324 # on the remote.
324 # on the remote.
325 remotephases = {'publishing': 'True'}
325 remotephases = {'publishing': 'True'}
326 ana = phases.analyzeremotephases(pushop.repo,
326 ana = phases.analyzeremotephases(pushop.repo,
327 pushop.fallbackheads,
327 pushop.fallbackheads,
328 remotephases)
328 remotephases)
329 pheads, droots = ana
329 pheads, droots = ana
330 extracond = ''
330 extracond = ''
331 if not publishing:
331 if not publishing:
332 extracond = ' and public()'
332 extracond = ' and public()'
333 revset = 'heads((%%ln::%%ln) %s)' % extracond
333 revset = 'heads((%%ln::%%ln) %s)' % extracond
334 # Get the list of all revs draft on remote by public here.
334 # Get the list of all revs draft on remote by public here.
335 # XXX Beware that revset break if droots is not strictly
335 # XXX Beware that revset break if droots is not strictly
336 # XXX root we may want to ensure it is but it is costly
336 # XXX root we may want to ensure it is but it is costly
337 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
337 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
338 if not outgoing.missing:
338 if not outgoing.missing:
339 future = fallback
339 future = fallback
340 else:
340 else:
341 # adds changeset we are going to push as draft
341 # adds changeset we are going to push as draft
342 #
342 #
343 # should not be necessary for publishing server, but because of an
343 # should not be necessary for publishing server, but because of an
344 # issue fixed in xxxxx we have to do it anyway.
344 # issue fixed in xxxxx we have to do it anyway.
345 fdroots = list(unfi.set('roots(%ln + %ln::)',
345 fdroots = list(unfi.set('roots(%ln + %ln::)',
346 outgoing.missing, droots))
346 outgoing.missing, droots))
347 fdroots = [f.node() for f in fdroots]
347 fdroots = [f.node() for f in fdroots]
348 future = list(unfi.set(revset, fdroots, pushop.futureheads))
348 future = list(unfi.set(revset, fdroots, pushop.futureheads))
349 pushop.outdatedphases = future
349 pushop.outdatedphases = future
350 pushop.fallbackoutdatedphases = fallback
350 pushop.fallbackoutdatedphases = fallback
351
351
352 @pushdiscovery('obsmarker')
352 @pushdiscovery('obsmarker')
353 def _pushdiscoveryobsmarkers(pushop):
353 def _pushdiscoveryobsmarkers(pushop):
354 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
354 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
355 and pushop.repo.obsstore
355 and pushop.repo.obsstore
356 and 'obsolete' in pushop.remote.listkeys('namespaces')):
356 and 'obsolete' in pushop.remote.listkeys('namespaces')):
357 repo = pushop.repo
357 repo = pushop.repo
358 # very naive computation, that can be quite expensive on big repo.
358 # very naive computation, that can be quite expensive on big repo.
359 # However: evolution is currently slow on them anyway.
359 # However: evolution is currently slow on them anyway.
360 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
360 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
361 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
361 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
362
362
363 @pushdiscovery('bookmarks')
363 @pushdiscovery('bookmarks')
364 def _pushdiscoverybookmarks(pushop):
364 def _pushdiscoverybookmarks(pushop):
365 ui = pushop.ui
365 ui = pushop.ui
366 repo = pushop.repo.unfiltered()
366 repo = pushop.repo.unfiltered()
367 remote = pushop.remote
367 remote = pushop.remote
368 ui.debug("checking for updated bookmarks\n")
368 ui.debug("checking for updated bookmarks\n")
369 ancestors = ()
369 ancestors = ()
370 if pushop.revs:
370 if pushop.revs:
371 revnums = map(repo.changelog.rev, pushop.revs)
371 revnums = map(repo.changelog.rev, pushop.revs)
372 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
372 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
373 remotebookmark = remote.listkeys('bookmarks')
373 remotebookmark = remote.listkeys('bookmarks')
374
374
375 explicit = set(pushop.bookmarks)
375 explicit = set(pushop.bookmarks)
376
376
377 comp = bookmod.compare(repo, repo._bookmarks, remotebookmark, srchex=hex)
377 comp = bookmod.compare(repo, repo._bookmarks, remotebookmark, srchex=hex)
378 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
378 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
379 for b, scid, dcid in advsrc:
379 for b, scid, dcid in advsrc:
380 if b in explicit:
380 if b in explicit:
381 explicit.remove(b)
381 explicit.remove(b)
382 if not ancestors or repo[scid].rev() in ancestors:
382 if not ancestors or repo[scid].rev() in ancestors:
383 pushop.outbookmarks.append((b, dcid, scid))
383 pushop.outbookmarks.append((b, dcid, scid))
384 # search added bookmark
384 # search added bookmark
385 for b, scid, dcid in addsrc:
385 for b, scid, dcid in addsrc:
386 if b in explicit:
386 if b in explicit:
387 explicit.remove(b)
387 explicit.remove(b)
388 pushop.outbookmarks.append((b, '', scid))
388 pushop.outbookmarks.append((b, '', scid))
389 # search for overwritten bookmark
389 # search for overwritten bookmark
390 for b, scid, dcid in advdst + diverge + differ:
390 for b, scid, dcid in advdst + diverge + differ:
391 if b in explicit:
391 if b in explicit:
392 explicit.remove(b)
392 explicit.remove(b)
393 pushop.outbookmarks.append((b, dcid, scid))
393 pushop.outbookmarks.append((b, dcid, scid))
394 # search for bookmark to delete
394 # search for bookmark to delete
395 for b, scid, dcid in adddst:
395 for b, scid, dcid in adddst:
396 if b in explicit:
396 if b in explicit:
397 explicit.remove(b)
397 explicit.remove(b)
398 # treat as "deleted locally"
398 # treat as "deleted locally"
399 pushop.outbookmarks.append((b, dcid, ''))
399 pushop.outbookmarks.append((b, dcid, ''))
400 # identical bookmarks shouldn't get reported
400 # identical bookmarks shouldn't get reported
401 for b, scid, dcid in same:
401 for b, scid, dcid in same:
402 if b in explicit:
402 if b in explicit:
403 explicit.remove(b)
403 explicit.remove(b)
404
404
405 if explicit:
405 if explicit:
406 explicit = sorted(explicit)
406 explicit = sorted(explicit)
407 # we should probably list all of them
407 # we should probably list all of them
408 ui.warn(_('bookmark %s does not exist on the local '
408 ui.warn(_('bookmark %s does not exist on the local '
409 'or remote repository!\n') % explicit[0])
409 'or remote repository!\n') % explicit[0])
410 pushop.bkresult = 2
410 pushop.bkresult = 2
411
411
412 pushop.outbookmarks.sort()
412 pushop.outbookmarks.sort()
413
413
414 def _pushcheckoutgoing(pushop):
414 def _pushcheckoutgoing(pushop):
415 outgoing = pushop.outgoing
415 outgoing = pushop.outgoing
416 unfi = pushop.repo.unfiltered()
416 unfi = pushop.repo.unfiltered()
417 if not outgoing.missing:
417 if not outgoing.missing:
418 # nothing to push
418 # nothing to push
419 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
419 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
420 return False
420 return False
421 # something to push
421 # something to push
422 if not pushop.force:
422 if not pushop.force:
423 # if repo.obsstore == False --> no obsolete
423 # if repo.obsstore == False --> no obsolete
424 # then, save the iteration
424 # then, save the iteration
425 if unfi.obsstore:
425 if unfi.obsstore:
426 # this message are here for 80 char limit reason
426 # this message are here for 80 char limit reason
427 mso = _("push includes obsolete changeset: %s!")
427 mso = _("push includes obsolete changeset: %s!")
428 mst = {"unstable": _("push includes unstable changeset: %s!"),
428 mst = {"unstable": _("push includes unstable changeset: %s!"),
429 "bumped": _("push includes bumped changeset: %s!"),
429 "bumped": _("push includes bumped changeset: %s!"),
430 "divergent": _("push includes divergent changeset: %s!")}
430 "divergent": _("push includes divergent changeset: %s!")}
431 # If we are to push if there is at least one
431 # If we are to push if there is at least one
432 # obsolete or unstable changeset in missing, at
432 # obsolete or unstable changeset in missing, at
433 # least one of the missinghead will be obsolete or
433 # least one of the missinghead will be obsolete or
434 # unstable. So checking heads only is ok
434 # unstable. So checking heads only is ok
435 for node in outgoing.missingheads:
435 for node in outgoing.missingheads:
436 ctx = unfi[node]
436 ctx = unfi[node]
437 if ctx.obsolete():
437 if ctx.obsolete():
438 raise util.Abort(mso % ctx)
438 raise util.Abort(mso % ctx)
439 elif ctx.troubled():
439 elif ctx.troubled():
440 raise util.Abort(mst[ctx.troubles()[0]] % ctx)
440 raise util.Abort(mst[ctx.troubles()[0]] % ctx)
441 newbm = pushop.ui.configlist('bookmarks', 'pushing')
441 newbm = pushop.ui.configlist('bookmarks', 'pushing')
442 discovery.checkheads(unfi, pushop.remote, outgoing,
442 discovery.checkheads(unfi, pushop.remote, outgoing,
443 pushop.remoteheads,
443 pushop.remoteheads,
444 pushop.newbranch,
444 pushop.newbranch,
445 bool(pushop.incoming),
445 bool(pushop.incoming),
446 newbm)
446 newbm)
447 return True
447 return True
448
448
449 # List of names of steps to perform for an outgoing bundle2, order matters.
449 # List of names of steps to perform for an outgoing bundle2, order matters.
450 b2partsgenorder = []
450 b2partsgenorder = []
451
451
452 # Mapping between step name and function
452 # Mapping between step name and function
453 #
453 #
454 # This exists to help extensions wrap steps if necessary
454 # This exists to help extensions wrap steps if necessary
455 b2partsgenmapping = {}
455 b2partsgenmapping = {}
456
456
457 def b2partsgenerator(stepname, idx=None):
457 def b2partsgenerator(stepname, idx=None):
458 """decorator for function generating bundle2 part
458 """decorator for function generating bundle2 part
459
459
460 The function is added to the step -> function mapping and appended to the
460 The function is added to the step -> function mapping and appended to the
461 list of steps. Beware that decorated functions will be added in order
461 list of steps. Beware that decorated functions will be added in order
462 (this may matter).
462 (this may matter).
463
463
464 You can only use this decorator for new steps, if you want to wrap a step
464 You can only use this decorator for new steps, if you want to wrap a step
465 from an extension, attack the b2partsgenmapping dictionary directly."""
465 from an extension, attack the b2partsgenmapping dictionary directly."""
466 def dec(func):
466 def dec(func):
467 assert stepname not in b2partsgenmapping
467 assert stepname not in b2partsgenmapping
468 b2partsgenmapping[stepname] = func
468 b2partsgenmapping[stepname] = func
469 if idx is None:
469 if idx is None:
470 b2partsgenorder.append(stepname)
470 b2partsgenorder.append(stepname)
471 else:
471 else:
472 b2partsgenorder.insert(idx, stepname)
472 b2partsgenorder.insert(idx, stepname)
473 return func
473 return func
474 return dec
474 return dec
475
475
476 @b2partsgenerator('changeset')
476 @b2partsgenerator('changeset')
477 def _pushb2ctx(pushop, bundler):
477 def _pushb2ctx(pushop, bundler):
478 """handle changegroup push through bundle2
478 """handle changegroup push through bundle2
479
479
480 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
480 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
481 """
481 """
482 if 'changesets' in pushop.stepsdone:
482 if 'changesets' in pushop.stepsdone:
483 return
483 return
484 pushop.stepsdone.add('changesets')
484 pushop.stepsdone.add('changesets')
485 # Send known heads to the server for race detection.
485 # Send known heads to the server for race detection.
486 if not _pushcheckoutgoing(pushop):
486 if not _pushcheckoutgoing(pushop):
487 return
487 return
488 pushop.repo.prepushoutgoinghooks(pushop.repo,
488 pushop.repo.prepushoutgoinghooks(pushop.repo,
489 pushop.remote,
489 pushop.remote,
490 pushop.outgoing)
490 pushop.outgoing)
491 if not pushop.force:
491 if not pushop.force:
492 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
492 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
493 b2caps = bundle2.bundle2caps(pushop.remote)
493 b2caps = bundle2.bundle2caps(pushop.remote)
494 version = None
494 version = None
495 cgversions = b2caps.get('changegroup')
495 cgversions = b2caps.get('changegroup')
496 if not cgversions: # 3.1 and 3.2 ship with an empty value
496 if not cgversions: # 3.1 and 3.2 ship with an empty value
497 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
497 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
498 pushop.outgoing)
498 pushop.outgoing)
499 else:
499 else:
500 cgversions = [v for v in cgversions if v in changegroup.packermap]
500 cgversions = [v for v in cgversions if v in changegroup.packermap]
501 if not cgversions:
501 if not cgversions:
502 raise ValueError(_('no common changegroup version'))
502 raise ValueError(_('no common changegroup version'))
503 version = max(cgversions)
503 version = max(cgversions)
504 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
504 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
505 pushop.outgoing,
505 pushop.outgoing,
506 version=version)
506 version=version)
507 cgpart = bundler.newpart('changegroup', data=cg)
507 cgpart = bundler.newpart('changegroup', data=cg)
508 if version is not None:
508 if version is not None:
509 cgpart.addparam('version', version)
509 cgpart.addparam('version', version)
510 def handlereply(op):
510 def handlereply(op):
511 """extract addchangegroup returns from server reply"""
511 """extract addchangegroup returns from server reply"""
512 cgreplies = op.records.getreplies(cgpart.id)
512 cgreplies = op.records.getreplies(cgpart.id)
513 assert len(cgreplies['changegroup']) == 1
513 assert len(cgreplies['changegroup']) == 1
514 pushop.cgresult = cgreplies['changegroup'][0]['return']
514 pushop.cgresult = cgreplies['changegroup'][0]['return']
515 return handlereply
515 return handlereply
516
516
517 @b2partsgenerator('phase')
517 @b2partsgenerator('phase')
518 def _pushb2phases(pushop, bundler):
518 def _pushb2phases(pushop, bundler):
519 """handle phase push through bundle2"""
519 """handle phase push through bundle2"""
520 if 'phases' in pushop.stepsdone:
520 if 'phases' in pushop.stepsdone:
521 return
521 return
522 b2caps = bundle2.bundle2caps(pushop.remote)
522 b2caps = bundle2.bundle2caps(pushop.remote)
523 if not 'pushkey' in b2caps:
523 if not 'pushkey' in b2caps:
524 return
524 return
525 pushop.stepsdone.add('phases')
525 pushop.stepsdone.add('phases')
526 part2node = []
526 part2node = []
527
527
528 def handlefailure(pushop, exc):
528 def handlefailure(pushop, exc):
529 targetid = int(exc.partid)
529 targetid = int(exc.partid)
530 for partid, node in part2node:
530 for partid, node in part2node:
531 if partid == targetid:
531 if partid == targetid:
532 raise error.Abort(_('updating %s to public failed') % node)
532 raise error.Abort(_('updating %s to public failed') % node)
533
533
534 enc = pushkey.encode
534 enc = pushkey.encode
535 for newremotehead in pushop.outdatedphases:
535 for newremotehead in pushop.outdatedphases:
536 part = bundler.newpart('pushkey')
536 part = bundler.newpart('pushkey')
537 part.addparam('namespace', enc('phases'))
537 part.addparam('namespace', enc('phases'))
538 part.addparam('key', enc(newremotehead.hex()))
538 part.addparam('key', enc(newremotehead.hex()))
539 part.addparam('old', enc(str(phases.draft)))
539 part.addparam('old', enc(str(phases.draft)))
540 part.addparam('new', enc(str(phases.public)))
540 part.addparam('new', enc(str(phases.public)))
541 part2node.append((part.id, newremotehead))
541 part2node.append((part.id, newremotehead))
542 pushop.pkfailcb[part.id] = handlefailure
542 pushop.pkfailcb[part.id] = handlefailure
543
543
544 def handlereply(op):
544 def handlereply(op):
545 for partid, node in part2node:
545 for partid, node in part2node:
546 partrep = op.records.getreplies(partid)
546 partrep = op.records.getreplies(partid)
547 results = partrep['pushkey']
547 results = partrep['pushkey']
548 assert len(results) <= 1
548 assert len(results) <= 1
549 msg = None
549 msg = None
550 if not results:
550 if not results:
551 msg = _('server ignored update of %s to public!\n') % node
551 msg = _('server ignored update of %s to public!\n') % node
552 elif not int(results[0]['return']):
552 elif not int(results[0]['return']):
553 msg = _('updating %s to public failed!\n') % node
553 msg = _('updating %s to public failed!\n') % node
554 if msg is not None:
554 if msg is not None:
555 pushop.ui.warn(msg)
555 pushop.ui.warn(msg)
556 return handlereply
556 return handlereply
557
557
558 @b2partsgenerator('obsmarkers')
558 @b2partsgenerator('obsmarkers')
559 def _pushb2obsmarkers(pushop, bundler):
559 def _pushb2obsmarkers(pushop, bundler):
560 if 'obsmarkers' in pushop.stepsdone:
560 if 'obsmarkers' in pushop.stepsdone:
561 return
561 return
562 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
562 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
563 if obsolete.commonversion(remoteversions) is None:
563 if obsolete.commonversion(remoteversions) is None:
564 return
564 return
565 pushop.stepsdone.add('obsmarkers')
565 pushop.stepsdone.add('obsmarkers')
566 if pushop.outobsmarkers:
566 if pushop.outobsmarkers:
567 markers = sorted(pushop.outobsmarkers)
567 markers = sorted(pushop.outobsmarkers)
568 buildobsmarkerspart(bundler, markers)
568 buildobsmarkerspart(bundler, markers)
569
569
570 @b2partsgenerator('bookmarks')
570 @b2partsgenerator('bookmarks')
571 def _pushb2bookmarks(pushop, bundler):
571 def _pushb2bookmarks(pushop, bundler):
572 """handle phase push through bundle2"""
572 """handle phase push through bundle2"""
573 if 'bookmarks' in pushop.stepsdone:
573 if 'bookmarks' in pushop.stepsdone:
574 return
574 return
575 b2caps = bundle2.bundle2caps(pushop.remote)
575 b2caps = bundle2.bundle2caps(pushop.remote)
576 if 'pushkey' not in b2caps:
576 if 'pushkey' not in b2caps:
577 return
577 return
578 pushop.stepsdone.add('bookmarks')
578 pushop.stepsdone.add('bookmarks')
579 part2book = []
579 part2book = []
580 enc = pushkey.encode
580 enc = pushkey.encode
581
581
582 def handlefailure(pushop, exc):
582 def handlefailure(pushop, exc):
583 targetid = int(exc.partid)
583 targetid = int(exc.partid)
584 for partid, book, action in part2book:
584 for partid, book, action in part2book:
585 if partid == targetid:
585 if partid == targetid:
586 raise error.Abort(bookmsgmap[action][1].rstrip() % book)
586 raise error.Abort(bookmsgmap[action][1].rstrip() % book)
587 # we should not be called for part we did not generated
587 # we should not be called for part we did not generated
588 assert False
588 assert False
589
589
590 for book, old, new in pushop.outbookmarks:
590 for book, old, new in pushop.outbookmarks:
591 part = bundler.newpart('pushkey')
591 part = bundler.newpart('pushkey')
592 part.addparam('namespace', enc('bookmarks'))
592 part.addparam('namespace', enc('bookmarks'))
593 part.addparam('key', enc(book))
593 part.addparam('key', enc(book))
594 part.addparam('old', enc(old))
594 part.addparam('old', enc(old))
595 part.addparam('new', enc(new))
595 part.addparam('new', enc(new))
596 action = 'update'
596 action = 'update'
597 if not old:
597 if not old:
598 action = 'export'
598 action = 'export'
599 elif not new:
599 elif not new:
600 action = 'delete'
600 action = 'delete'
601 part2book.append((part.id, book, action))
601 part2book.append((part.id, book, action))
602 pushop.pkfailcb[part.id] = handlefailure
602 pushop.pkfailcb[part.id] = handlefailure
603
603
604 def handlereply(op):
604 def handlereply(op):
605 ui = pushop.ui
605 ui = pushop.ui
606 for partid, book, action in part2book:
606 for partid, book, action in part2book:
607 partrep = op.records.getreplies(partid)
607 partrep = op.records.getreplies(partid)
608 results = partrep['pushkey']
608 results = partrep['pushkey']
609 assert len(results) <= 1
609 assert len(results) <= 1
610 if not results:
610 if not results:
611 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
611 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
612 else:
612 else:
613 ret = int(results[0]['return'])
613 ret = int(results[0]['return'])
614 if ret:
614 if ret:
615 ui.status(bookmsgmap[action][0] % book)
615 ui.status(bookmsgmap[action][0] % book)
616 else:
616 else:
617 ui.warn(bookmsgmap[action][1] % book)
617 ui.warn(bookmsgmap[action][1] % book)
618 if pushop.bkresult is not None:
618 if pushop.bkresult is not None:
619 pushop.bkresult = 1
619 pushop.bkresult = 1
620 return handlereply
620 return handlereply
621
621
622
622
623 def _pushbundle2(pushop):
623 def _pushbundle2(pushop):
624 """push data to the remote using bundle2
624 """push data to the remote using bundle2
625
625
626 The only currently supported type of data is changegroup but this will
626 The only currently supported type of data is changegroup but this will
627 evolve in the future."""
627 evolve in the future."""
628 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
628 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
629 pushback = (pushop.trmanager
629 pushback = (pushop.trmanager
630 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
630 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
631
631
632 # create reply capability
632 # create reply capability
633 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
633 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
634 allowpushback=pushback))
634 allowpushback=pushback))
635 bundler.newpart('replycaps', data=capsblob)
635 bundler.newpart('replycaps', data=capsblob)
636 replyhandlers = []
636 replyhandlers = []
637 for partgenname in b2partsgenorder:
637 for partgenname in b2partsgenorder:
638 partgen = b2partsgenmapping[partgenname]
638 partgen = b2partsgenmapping[partgenname]
639 ret = partgen(pushop, bundler)
639 ret = partgen(pushop, bundler)
640 if callable(ret):
640 if callable(ret):
641 replyhandlers.append(ret)
641 replyhandlers.append(ret)
642 # do not push if nothing to push
642 # do not push if nothing to push
643 if bundler.nbparts <= 1:
643 if bundler.nbparts <= 1:
644 return
644 return
645 stream = util.chunkbuffer(bundler.getchunks())
645 stream = util.chunkbuffer(bundler.getchunks())
646 try:
646 try:
647 try:
647 try:
648 reply = pushop.remote.unbundle(stream, ['force'], 'push')
648 reply = pushop.remote.unbundle(stream, ['force'], 'push')
649 except error.BundleValueError, exc:
649 except error.BundleValueError, exc:
650 raise util.Abort('missing support for %s' % exc)
650 raise util.Abort('missing support for %s' % exc)
651 try:
651 try:
652 trgetter = None
652 trgetter = None
653 if pushback:
653 if pushback:
654 trgetter = pushop.trmanager.transaction
654 trgetter = pushop.trmanager.transaction
655 op = bundle2.processbundle(pushop.repo, reply, trgetter)
655 op = bundle2.processbundle(pushop.repo, reply, trgetter)
656 except error.BundleValueError, exc:
656 except error.BundleValueError, exc:
657 raise util.Abort('missing support for %s' % exc)
657 raise util.Abort('missing support for %s' % exc)
658 except error.PushkeyFailed, exc:
658 except error.PushkeyFailed, exc:
659 partid = int(exc.partid)
659 partid = int(exc.partid)
660 if partid not in pushop.pkfailcb:
660 if partid not in pushop.pkfailcb:
661 raise
661 raise
662 pushop.pkfailcb[partid](pushop, exc)
662 pushop.pkfailcb[partid](pushop, exc)
663 for rephand in replyhandlers:
663 for rephand in replyhandlers:
664 rephand(op)
664 rephand(op)
665
665
666 def _pushchangeset(pushop):
666 def _pushchangeset(pushop):
667 """Make the actual push of changeset bundle to remote repo"""
667 """Make the actual push of changeset bundle to remote repo"""
668 if 'changesets' in pushop.stepsdone:
668 if 'changesets' in pushop.stepsdone:
669 return
669 return
670 pushop.stepsdone.add('changesets')
670 pushop.stepsdone.add('changesets')
671 if not _pushcheckoutgoing(pushop):
671 if not _pushcheckoutgoing(pushop):
672 return
672 return
673 pushop.repo.prepushoutgoinghooks(pushop.repo,
673 pushop.repo.prepushoutgoinghooks(pushop.repo,
674 pushop.remote,
674 pushop.remote,
675 pushop.outgoing)
675 pushop.outgoing)
676 outgoing = pushop.outgoing
676 outgoing = pushop.outgoing
677 unbundle = pushop.remote.capable('unbundle')
677 unbundle = pushop.remote.capable('unbundle')
678 # TODO: get bundlecaps from remote
678 # TODO: get bundlecaps from remote
679 bundlecaps = None
679 bundlecaps = None
680 # create a changegroup from local
680 # create a changegroup from local
681 if pushop.revs is None and not (outgoing.excluded
681 if pushop.revs is None and not (outgoing.excluded
682 or pushop.repo.changelog.filteredrevs):
682 or pushop.repo.changelog.filteredrevs):
683 # push everything,
683 # push everything,
684 # use the fast path, no race possible on push
684 # use the fast path, no race possible on push
685 bundler = changegroup.cg1packer(pushop.repo, bundlecaps)
685 bundler = changegroup.cg1packer(pushop.repo, bundlecaps)
686 cg = changegroup.getsubset(pushop.repo,
686 cg = changegroup.getsubset(pushop.repo,
687 outgoing,
687 outgoing,
688 bundler,
688 bundler,
689 'push',
689 'push',
690 fastpath=True)
690 fastpath=True)
691 else:
691 else:
692 cg = changegroup.getlocalchangegroup(pushop.repo, 'push', outgoing,
692 cg = changegroup.getlocalchangegroup(pushop.repo, 'push', outgoing,
693 bundlecaps)
693 bundlecaps)
694
694
695 # apply changegroup to remote
695 # apply changegroup to remote
696 if unbundle:
696 if unbundle:
697 # local repo finds heads on server, finds out what
697 # local repo finds heads on server, finds out what
698 # revs it must push. once revs transferred, if server
698 # revs it must push. once revs transferred, if server
699 # finds it has different heads (someone else won
699 # finds it has different heads (someone else won
700 # commit/push race), server aborts.
700 # commit/push race), server aborts.
701 if pushop.force:
701 if pushop.force:
702 remoteheads = ['force']
702 remoteheads = ['force']
703 else:
703 else:
704 remoteheads = pushop.remoteheads
704 remoteheads = pushop.remoteheads
705 # ssh: return remote's addchangegroup()
705 # ssh: return remote's addchangegroup()
706 # http: return remote's addchangegroup() or 0 for error
706 # http: return remote's addchangegroup() or 0 for error
707 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
707 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
708 pushop.repo.url())
708 pushop.repo.url())
709 else:
709 else:
710 # we return an integer indicating remote head count
710 # we return an integer indicating remote head count
711 # change
711 # change
712 pushop.cgresult = pushop.remote.addchangegroup(cg, 'push',
712 pushop.cgresult = pushop.remote.addchangegroup(cg, 'push',
713 pushop.repo.url())
713 pushop.repo.url())
714
714
715 def _pushsyncphase(pushop):
715 def _pushsyncphase(pushop):
716 """synchronise phase information locally and remotely"""
716 """synchronise phase information locally and remotely"""
717 cheads = pushop.commonheads
717 cheads = pushop.commonheads
718 # even when we don't push, exchanging phase data is useful
718 # even when we don't push, exchanging phase data is useful
719 remotephases = pushop.remote.listkeys('phases')
719 remotephases = pushop.remote.listkeys('phases')
720 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
720 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
721 and remotephases # server supports phases
721 and remotephases # server supports phases
722 and pushop.cgresult is None # nothing was pushed
722 and pushop.cgresult is None # nothing was pushed
723 and remotephases.get('publishing', False)):
723 and remotephases.get('publishing', False)):
724 # When:
724 # When:
725 # - this is a subrepo push
725 # - this is a subrepo push
726 # - and remote support phase
726 # - and remote support phase
727 # - and no changeset was pushed
727 # - and no changeset was pushed
728 # - and remote is publishing
728 # - and remote is publishing
729 # We may be in issue 3871 case!
729 # We may be in issue 3871 case!
730 # We drop the possible phase synchronisation done by
730 # We drop the possible phase synchronisation done by
731 # courtesy to publish changesets possibly locally draft
731 # courtesy to publish changesets possibly locally draft
732 # on the remote.
732 # on the remote.
733 remotephases = {'publishing': 'True'}
733 remotephases = {'publishing': 'True'}
734 if not remotephases: # old server or public only reply from non-publishing
734 if not remotephases: # old server or public only reply from non-publishing
735 _localphasemove(pushop, cheads)
735 _localphasemove(pushop, cheads)
736 # don't push any phase data as there is nothing to push
736 # don't push any phase data as there is nothing to push
737 else:
737 else:
738 ana = phases.analyzeremotephases(pushop.repo, cheads,
738 ana = phases.analyzeremotephases(pushop.repo, cheads,
739 remotephases)
739 remotephases)
740 pheads, droots = ana
740 pheads, droots = ana
741 ### Apply remote phase on local
741 ### Apply remote phase on local
742 if remotephases.get('publishing', False):
742 if remotephases.get('publishing', False):
743 _localphasemove(pushop, cheads)
743 _localphasemove(pushop, cheads)
744 else: # publish = False
744 else: # publish = False
745 _localphasemove(pushop, pheads)
745 _localphasemove(pushop, pheads)
746 _localphasemove(pushop, cheads, phases.draft)
746 _localphasemove(pushop, cheads, phases.draft)
747 ### Apply local phase on remote
747 ### Apply local phase on remote
748
748
749 if pushop.cgresult:
749 if pushop.cgresult:
750 if 'phases' in pushop.stepsdone:
750 if 'phases' in pushop.stepsdone:
751 # phases already pushed though bundle2
751 # phases already pushed though bundle2
752 return
752 return
753 outdated = pushop.outdatedphases
753 outdated = pushop.outdatedphases
754 else:
754 else:
755 outdated = pushop.fallbackoutdatedphases
755 outdated = pushop.fallbackoutdatedphases
756
756
757 pushop.stepsdone.add('phases')
757 pushop.stepsdone.add('phases')
758
758
759 # filter heads already turned public by the push
759 # filter heads already turned public by the push
760 outdated = [c for c in outdated if c.node() not in pheads]
760 outdated = [c for c in outdated if c.node() not in pheads]
761 # fallback to independent pushkey command
761 # fallback to independent pushkey command
762 for newremotehead in outdated:
762 for newremotehead in outdated:
763 r = pushop.remote.pushkey('phases',
763 r = pushop.remote.pushkey('phases',
764 newremotehead.hex(),
764 newremotehead.hex(),
765 str(phases.draft),
765 str(phases.draft),
766 str(phases.public))
766 str(phases.public))
767 if not r:
767 if not r:
768 pushop.ui.warn(_('updating %s to public failed!\n')
768 pushop.ui.warn(_('updating %s to public failed!\n')
769 % newremotehead)
769 % newremotehead)
770
770
771 def _localphasemove(pushop, nodes, phase=phases.public):
771 def _localphasemove(pushop, nodes, phase=phases.public):
772 """move <nodes> to <phase> in the local source repo"""
772 """move <nodes> to <phase> in the local source repo"""
773 if pushop.trmanager:
773 if pushop.trmanager:
774 phases.advanceboundary(pushop.repo,
774 phases.advanceboundary(pushop.repo,
775 pushop.trmanager.transaction(),
775 pushop.trmanager.transaction(),
776 phase,
776 phase,
777 nodes)
777 nodes)
778 else:
778 else:
779 # repo is not locked, do not change any phases!
779 # repo is not locked, do not change any phases!
780 # Informs the user that phases should have been moved when
780 # Informs the user that phases should have been moved when
781 # applicable.
781 # applicable.
782 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
782 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
783 phasestr = phases.phasenames[phase]
783 phasestr = phases.phasenames[phase]
784 if actualmoves:
784 if actualmoves:
785 pushop.ui.status(_('cannot lock source repo, skipping '
785 pushop.ui.status(_('cannot lock source repo, skipping '
786 'local %s phase update\n') % phasestr)
786 'local %s phase update\n') % phasestr)
787
787
788 def _pushobsolete(pushop):
788 def _pushobsolete(pushop):
789 """utility function to push obsolete markers to a remote"""
789 """utility function to push obsolete markers to a remote"""
790 if 'obsmarkers' in pushop.stepsdone:
790 if 'obsmarkers' in pushop.stepsdone:
791 return
791 return
792 repo = pushop.repo
792 repo = pushop.repo
793 remote = pushop.remote
793 remote = pushop.remote
794 pushop.stepsdone.add('obsmarkers')
794 pushop.stepsdone.add('obsmarkers')
795 if pushop.outobsmarkers:
795 if pushop.outobsmarkers:
796 pushop.ui.debug('try to push obsolete markers to remote\n')
796 pushop.ui.debug('try to push obsolete markers to remote\n')
797 rslts = []
797 rslts = []
798 remotedata = obsolete._pushkeyescape(sorted(pushop.outobsmarkers))
798 remotedata = obsolete._pushkeyescape(sorted(pushop.outobsmarkers))
799 for key in sorted(remotedata, reverse=True):
799 for key in sorted(remotedata, reverse=True):
800 # reverse sort to ensure we end with dump0
800 # reverse sort to ensure we end with dump0
801 data = remotedata[key]
801 data = remotedata[key]
802 rslts.append(remote.pushkey('obsolete', key, '', data))
802 rslts.append(remote.pushkey('obsolete', key, '', data))
803 if [r for r in rslts if not r]:
803 if [r for r in rslts if not r]:
804 msg = _('failed to push some obsolete markers!\n')
804 msg = _('failed to push some obsolete markers!\n')
805 repo.ui.warn(msg)
805 repo.ui.warn(msg)
806
806
807 def _pushbookmark(pushop):
807 def _pushbookmark(pushop):
808 """Update bookmark position on remote"""
808 """Update bookmark position on remote"""
809 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
809 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
810 return
810 return
811 pushop.stepsdone.add('bookmarks')
811 pushop.stepsdone.add('bookmarks')
812 ui = pushop.ui
812 ui = pushop.ui
813 remote = pushop.remote
813 remote = pushop.remote
814
814
815 for b, old, new in pushop.outbookmarks:
815 for b, old, new in pushop.outbookmarks:
816 action = 'update'
816 action = 'update'
817 if not old:
817 if not old:
818 action = 'export'
818 action = 'export'
819 elif not new:
819 elif not new:
820 action = 'delete'
820 action = 'delete'
821 if remote.pushkey('bookmarks', b, old, new):
821 if remote.pushkey('bookmarks', b, old, new):
822 ui.status(bookmsgmap[action][0] % b)
822 ui.status(bookmsgmap[action][0] % b)
823 else:
823 else:
824 ui.warn(bookmsgmap[action][1] % b)
824 ui.warn(bookmsgmap[action][1] % b)
825 # discovery can have set the value form invalid entry
825 # discovery can have set the value form invalid entry
826 if pushop.bkresult is not None:
826 if pushop.bkresult is not None:
827 pushop.bkresult = 1
827 pushop.bkresult = 1
828
828
829 class pulloperation(object):
829 class pulloperation(object):
830 """A object that represent a single pull operation
830 """A object that represent a single pull operation
831
831
832 It purpose is to carry pull related state and very common operation.
832 It purpose is to carry pull related state and very common operation.
833
833
834 A new should be created at the beginning of each pull and discarded
834 A new should be created at the beginning of each pull and discarded
835 afterward.
835 afterward.
836 """
836 """
837
837
838 def __init__(self, repo, remote, heads=None, force=False, bookmarks=(),
838 def __init__(self, repo, remote, heads=None, force=False, bookmarks=(),
839 remotebookmarks=None):
839 remotebookmarks=None):
840 # repo we pull into
840 # repo we pull into
841 self.repo = repo
841 self.repo = repo
842 # repo we pull from
842 # repo we pull from
843 self.remote = remote
843 self.remote = remote
844 # revision we try to pull (None is "all")
844 # revision we try to pull (None is "all")
845 self.heads = heads
845 self.heads = heads
846 # bookmark pulled explicitly
846 # bookmark pulled explicitly
847 self.explicitbookmarks = bookmarks
847 self.explicitbookmarks = bookmarks
848 # do we force pull?
848 # do we force pull?
849 self.force = force
849 self.force = force
850 # transaction manager
850 # transaction manager
851 self.trmanager = None
851 self.trmanager = None
852 # set of common changeset between local and remote before pull
852 # set of common changeset between local and remote before pull
853 self.common = None
853 self.common = None
854 # set of pulled head
854 # set of pulled head
855 self.rheads = None
855 self.rheads = None
856 # list of missing changeset to fetch remotely
856 # list of missing changeset to fetch remotely
857 self.fetch = None
857 self.fetch = None
858 # remote bookmarks data
858 # remote bookmarks data
859 self.remotebookmarks = remotebookmarks
859 self.remotebookmarks = remotebookmarks
860 # result of changegroup pulling (used as return code by pull)
860 # result of changegroup pulling (used as return code by pull)
861 self.cgresult = None
861 self.cgresult = None
862 # list of step already done
862 # list of step already done
863 self.stepsdone = set()
863 self.stepsdone = set()
864
864
865 @util.propertycache
865 @util.propertycache
866 def pulledsubset(self):
866 def pulledsubset(self):
867 """heads of the set of changeset target by the pull"""
867 """heads of the set of changeset target by the pull"""
868 # compute target subset
868 # compute target subset
869 if self.heads is None:
869 if self.heads is None:
870 # We pulled every thing possible
870 # We pulled every thing possible
871 # sync on everything common
871 # sync on everything common
872 c = set(self.common)
872 c = set(self.common)
873 ret = list(self.common)
873 ret = list(self.common)
874 for n in self.rheads:
874 for n in self.rheads:
875 if n not in c:
875 if n not in c:
876 ret.append(n)
876 ret.append(n)
877 return ret
877 return ret
878 else:
878 else:
879 # We pulled a specific subset
879 # We pulled a specific subset
880 # sync on this subset
880 # sync on this subset
881 return self.heads
881 return self.heads
882
882
883 def gettransaction(self):
883 def gettransaction(self):
884 # deprecated; talk to trmanager directly
884 # deprecated; talk to trmanager directly
885 return self.trmanager.transaction()
885 return self.trmanager.transaction()
886
886
887 class transactionmanager(object):
887 class transactionmanager(object):
888 """An object to manage the life cycle of a transaction
888 """An object to manage the life cycle of a transaction
889
889
890 It creates the transaction on demand and calls the appropriate hooks when
890 It creates the transaction on demand and calls the appropriate hooks when
891 closing the transaction."""
891 closing the transaction."""
892 def __init__(self, repo, source, url):
892 def __init__(self, repo, source, url):
893 self.repo = repo
893 self.repo = repo
894 self.source = source
894 self.source = source
895 self.url = url
895 self.url = url
896 self._tr = None
896 self._tr = None
897
897
898 def transaction(self):
898 def transaction(self):
899 """Return an open transaction object, constructing if necessary"""
899 """Return an open transaction object, constructing if necessary"""
900 if not self._tr:
900 if not self._tr:
901 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
901 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
902 self._tr = self.repo.transaction(trname)
902 self._tr = self.repo.transaction(trname)
903 self._tr.hookargs['source'] = self.source
903 self._tr.hookargs['source'] = self.source
904 self._tr.hookargs['url'] = self.url
904 self._tr.hookargs['url'] = self.url
905 return self._tr
905 return self._tr
906
906
907 def close(self):
907 def close(self):
908 """close transaction if created"""
908 """close transaction if created"""
909 if self._tr is not None:
909 if self._tr is not None:
910 self._tr.close()
910 self._tr.close()
911
911
912 def release(self):
912 def release(self):
913 """release transaction if created"""
913 """release transaction if created"""
914 if self._tr is not None:
914 if self._tr is not None:
915 self._tr.release()
915 self._tr.release()
916
916
917 def pull(repo, remote, heads=None, force=False, bookmarks=(), opargs=None):
917 def pull(repo, remote, heads=None, force=False, bookmarks=(), opargs=None):
918 if opargs is None:
918 if opargs is None:
919 opargs = {}
919 opargs = {}
920 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks,
920 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks,
921 **opargs)
921 **opargs)
922 if pullop.remote.local():
922 if pullop.remote.local():
923 missing = set(pullop.remote.requirements) - pullop.repo.supported
923 missing = set(pullop.remote.requirements) - pullop.repo.supported
924 if missing:
924 if missing:
925 msg = _("required features are not"
925 msg = _("required features are not"
926 " supported in the destination:"
926 " supported in the destination:"
927 " %s") % (', '.join(sorted(missing)))
927 " %s") % (', '.join(sorted(missing)))
928 raise util.Abort(msg)
928 raise util.Abort(msg)
929
929
930 lock = pullop.repo.lock()
930 lock = pullop.repo.lock()
931 try:
931 try:
932 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
932 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
933 _pulldiscovery(pullop)
933 _pulldiscovery(pullop)
934 if _canusebundle2(pullop):
934 if _canusebundle2(pullop):
935 _pullbundle2(pullop)
935 _pullbundle2(pullop)
936 _pullchangeset(pullop)
936 _pullchangeset(pullop)
937 _pullphase(pullop)
937 _pullphase(pullop)
938 _pullbookmarks(pullop)
938 _pullbookmarks(pullop)
939 _pullobsolete(pullop)
939 _pullobsolete(pullop)
940 pullop.trmanager.close()
940 pullop.trmanager.close()
941 finally:
941 finally:
942 pullop.trmanager.release()
942 pullop.trmanager.release()
943 lock.release()
943 lock.release()
944
944
945 return pullop
945 return pullop
946
946
947 # list of steps to perform discovery before pull
947 # list of steps to perform discovery before pull
948 pulldiscoveryorder = []
948 pulldiscoveryorder = []
949
949
950 # Mapping between step name and function
950 # Mapping between step name and function
951 #
951 #
952 # This exists to help extensions wrap steps if necessary
952 # This exists to help extensions wrap steps if necessary
953 pulldiscoverymapping = {}
953 pulldiscoverymapping = {}
954
954
955 def pulldiscovery(stepname):
955 def pulldiscovery(stepname):
956 """decorator for function performing discovery before pull
956 """decorator for function performing discovery before pull
957
957
958 The function is added to the step -> function mapping and appended to the
958 The function is added to the step -> function mapping and appended to the
959 list of steps. Beware that decorated function will be added in order (this
959 list of steps. Beware that decorated function will be added in order (this
960 may matter).
960 may matter).
961
961
962 You can only use this decorator for a new step, if you want to wrap a step
962 You can only use this decorator for a new step, if you want to wrap a step
963 from an extension, change the pulldiscovery dictionary directly."""
963 from an extension, change the pulldiscovery dictionary directly."""
964 def dec(func):
964 def dec(func):
965 assert stepname not in pulldiscoverymapping
965 assert stepname not in pulldiscoverymapping
966 pulldiscoverymapping[stepname] = func
966 pulldiscoverymapping[stepname] = func
967 pulldiscoveryorder.append(stepname)
967 pulldiscoveryorder.append(stepname)
968 return func
968 return func
969 return dec
969 return dec
970
970
971 def _pulldiscovery(pullop):
971 def _pulldiscovery(pullop):
972 """Run all discovery steps"""
972 """Run all discovery steps"""
973 for stepname in pulldiscoveryorder:
973 for stepname in pulldiscoveryorder:
974 step = pulldiscoverymapping[stepname]
974 step = pulldiscoverymapping[stepname]
975 step(pullop)
975 step(pullop)
976
976
977 @pulldiscovery('b1:bookmarks')
977 @pulldiscovery('b1:bookmarks')
978 def _pullbookmarkbundle1(pullop):
978 def _pullbookmarkbundle1(pullop):
979 """fetch bookmark data in bundle1 case
979 """fetch bookmark data in bundle1 case
980
980
981 If not using bundle2, we have to fetch bookmarks before changeset
981 If not using bundle2, we have to fetch bookmarks before changeset
982 discovery to reduce the chance and impact of race conditions."""
982 discovery to reduce the chance and impact of race conditions."""
983 if pullop.remotebookmarks is not None:
983 if pullop.remotebookmarks is not None:
984 return
984 return
985 if (_canusebundle2(pullop)
985 if (_canusebundle2(pullop)
986 and 'listkeys' in bundle2.bundle2caps(pullop.remote)):
986 and 'listkeys' in bundle2.bundle2caps(pullop.remote)):
987 # all known bundle2 servers now support listkeys, but lets be nice with
987 # all known bundle2 servers now support listkeys, but lets be nice with
988 # new implementation.
988 # new implementation.
989 return
989 return
990 pullop.remotebookmarks = pullop.remote.listkeys('bookmarks')
990 pullop.remotebookmarks = pullop.remote.listkeys('bookmarks')
991
991
992
992
993 @pulldiscovery('changegroup')
993 @pulldiscovery('changegroup')
994 def _pulldiscoverychangegroup(pullop):
994 def _pulldiscoverychangegroup(pullop):
995 """discovery phase for the pull
995 """discovery phase for the pull
996
996
997 Current handle changeset discovery only, will change handle all discovery
997 Current handle changeset discovery only, will change handle all discovery
998 at some point."""
998 at some point."""
999 tmp = discovery.findcommonincoming(pullop.repo,
999 tmp = discovery.findcommonincoming(pullop.repo,
1000 pullop.remote,
1000 pullop.remote,
1001 heads=pullop.heads,
1001 heads=pullop.heads,
1002 force=pullop.force)
1002 force=pullop.force)
1003 common, fetch, rheads = tmp
1003 common, fetch, rheads = tmp
1004 nm = pullop.repo.unfiltered().changelog.nodemap
1004 nm = pullop.repo.unfiltered().changelog.nodemap
1005 if fetch and rheads:
1005 if fetch and rheads:
1006 # If a remote heads in filtered locally, lets drop it from the unknown
1006 # If a remote heads in filtered locally, lets drop it from the unknown
1007 # remote heads and put in back in common.
1007 # remote heads and put in back in common.
1008 #
1008 #
1009 # This is a hackish solution to catch most of "common but locally
1009 # This is a hackish solution to catch most of "common but locally
1010 # hidden situation". We do not performs discovery on unfiltered
1010 # hidden situation". We do not performs discovery on unfiltered
1011 # repository because it end up doing a pathological amount of round
1011 # repository because it end up doing a pathological amount of round
1012 # trip for w huge amount of changeset we do not care about.
1012 # trip for w huge amount of changeset we do not care about.
1013 #
1013 #
1014 # If a set of such "common but filtered" changeset exist on the server
1014 # If a set of such "common but filtered" changeset exist on the server
1015 # but are not including a remote heads, we'll not be able to detect it,
1015 # but are not including a remote heads, we'll not be able to detect it,
1016 scommon = set(common)
1016 scommon = set(common)
1017 filteredrheads = []
1017 filteredrheads = []
1018 for n in rheads:
1018 for n in rheads:
1019 if n in nm:
1019 if n in nm:
1020 if n not in scommon:
1020 if n not in scommon:
1021 common.append(n)
1021 common.append(n)
1022 else:
1022 else:
1023 filteredrheads.append(n)
1023 filteredrheads.append(n)
1024 if not filteredrheads:
1024 if not filteredrheads:
1025 fetch = []
1025 fetch = []
1026 rheads = filteredrheads
1026 rheads = filteredrheads
1027 pullop.common = common
1027 pullop.common = common
1028 pullop.fetch = fetch
1028 pullop.fetch = fetch
1029 pullop.rheads = rheads
1029 pullop.rheads = rheads
1030
1030
1031 def _pullbundle2(pullop):
1031 def _pullbundle2(pullop):
1032 """pull data using bundle2
1032 """pull data using bundle2
1033
1033
1034 For now, the only supported data are changegroup."""
1034 For now, the only supported data are changegroup."""
1035 remotecaps = bundle2.bundle2caps(pullop.remote)
1035 remotecaps = bundle2.bundle2caps(pullop.remote)
1036 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
1036 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
1037 # pulling changegroup
1037 # pulling changegroup
1038 pullop.stepsdone.add('changegroup')
1038 pullop.stepsdone.add('changegroup')
1039
1039
1040 kwargs['common'] = pullop.common
1040 kwargs['common'] = pullop.common
1041 kwargs['heads'] = pullop.heads or pullop.rheads
1041 kwargs['heads'] = pullop.heads or pullop.rheads
1042 kwargs['cg'] = pullop.fetch
1042 kwargs['cg'] = pullop.fetch
1043 if 'listkeys' in remotecaps:
1043 if 'listkeys' in remotecaps:
1044 kwargs['listkeys'] = ['phase']
1044 kwargs['listkeys'] = ['phase']
1045 if pullop.remotebookmarks is None:
1045 if pullop.remotebookmarks is None:
1046 # make sure to always includes bookmark data when migrating
1046 # make sure to always includes bookmark data when migrating
1047 # `hg incoming --bundle` to using this function.
1047 # `hg incoming --bundle` to using this function.
1048 kwargs['listkeys'].append('bookmarks')
1048 kwargs['listkeys'].append('bookmarks')
1049 if not pullop.fetch:
1049 if not pullop.fetch:
1050 pullop.repo.ui.status(_("no changes found\n"))
1050 pullop.repo.ui.status(_("no changes found\n"))
1051 pullop.cgresult = 0
1051 pullop.cgresult = 0
1052 else:
1052 else:
1053 if pullop.heads is None and list(pullop.common) == [nullid]:
1053 if pullop.heads is None and list(pullop.common) == [nullid]:
1054 pullop.repo.ui.status(_("requesting all changes\n"))
1054 pullop.repo.ui.status(_("requesting all changes\n"))
1055 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1055 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1056 remoteversions = bundle2.obsmarkersversion(remotecaps)
1056 remoteversions = bundle2.obsmarkersversion(remotecaps)
1057 if obsolete.commonversion(remoteversions) is not None:
1057 if obsolete.commonversion(remoteversions) is not None:
1058 kwargs['obsmarkers'] = True
1058 kwargs['obsmarkers'] = True
1059 pullop.stepsdone.add('obsmarkers')
1059 pullop.stepsdone.add('obsmarkers')
1060 _pullbundle2extraprepare(pullop, kwargs)
1060 _pullbundle2extraprepare(pullop, kwargs)
1061 bundle = pullop.remote.getbundle('pull', **kwargs)
1061 bundle = pullop.remote.getbundle('pull', **kwargs)
1062 try:
1062 try:
1063 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
1063 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
1064 except error.BundleValueError, exc:
1064 except error.BundleValueError, exc:
1065 raise util.Abort('missing support for %s' % exc)
1065 raise util.Abort('missing support for %s' % exc)
1066
1066
1067 if pullop.fetch:
1067 if pullop.fetch:
1068 results = [cg['return'] for cg in op.records['changegroup']]
1068 results = [cg['return'] for cg in op.records['changegroup']]
1069 pullop.cgresult = changegroup.combineresults(results)
1069 pullop.cgresult = changegroup.combineresults(results)
1070
1070
1071 # processing phases change
1071 # processing phases change
1072 for namespace, value in op.records['listkeys']:
1072 for namespace, value in op.records['listkeys']:
1073 if namespace == 'phases':
1073 if namespace == 'phases':
1074 _pullapplyphases(pullop, value)
1074 _pullapplyphases(pullop, value)
1075
1075
1076 # processing bookmark update
1076 # processing bookmark update
1077 for namespace, value in op.records['listkeys']:
1077 for namespace, value in op.records['listkeys']:
1078 if namespace == 'bookmarks':
1078 if namespace == 'bookmarks':
1079 pullop.remotebookmarks = value
1079 pullop.remotebookmarks = value
1080
1080
1081 # bookmark data were either already there or pulled in the bundle
1081 # bookmark data were either already there or pulled in the bundle
1082 if pullop.remotebookmarks is not None:
1082 if pullop.remotebookmarks is not None:
1083 _pullbookmarks(pullop)
1083 _pullbookmarks(pullop)
1084
1084
1085 def _pullbundle2extraprepare(pullop, kwargs):
1085 def _pullbundle2extraprepare(pullop, kwargs):
1086 """hook function so that extensions can extend the getbundle call"""
1086 """hook function so that extensions can extend the getbundle call"""
1087 pass
1087 pass
1088
1088
1089 def _pullchangeset(pullop):
1089 def _pullchangeset(pullop):
1090 """pull changeset from unbundle into the local repo"""
1090 """pull changeset from unbundle into the local repo"""
1091 # We delay the open of the transaction as late as possible so we
1091 # We delay the open of the transaction as late as possible so we
1092 # don't open transaction for nothing or you break future useful
1092 # don't open transaction for nothing or you break future useful
1093 # rollback call
1093 # rollback call
1094 if 'changegroup' in pullop.stepsdone:
1094 if 'changegroup' in pullop.stepsdone:
1095 return
1095 return
1096 pullop.stepsdone.add('changegroup')
1096 pullop.stepsdone.add('changegroup')
1097 if not pullop.fetch:
1097 if not pullop.fetch:
1098 pullop.repo.ui.status(_("no changes found\n"))
1098 pullop.repo.ui.status(_("no changes found\n"))
1099 pullop.cgresult = 0
1099 pullop.cgresult = 0
1100 return
1100 return
1101 pullop.gettransaction()
1101 pullop.gettransaction()
1102 if pullop.heads is None and list(pullop.common) == [nullid]:
1102 if pullop.heads is None and list(pullop.common) == [nullid]:
1103 pullop.repo.ui.status(_("requesting all changes\n"))
1103 pullop.repo.ui.status(_("requesting all changes\n"))
1104 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1104 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1105 # issue1320, avoid a race if remote changed after discovery
1105 # issue1320, avoid a race if remote changed after discovery
1106 pullop.heads = pullop.rheads
1106 pullop.heads = pullop.rheads
1107
1107
1108 if pullop.remote.capable('getbundle'):
1108 if pullop.remote.capable('getbundle'):
1109 # TODO: get bundlecaps from remote
1109 # TODO: get bundlecaps from remote
1110 cg = pullop.remote.getbundle('pull', common=pullop.common,
1110 cg = pullop.remote.getbundle('pull', common=pullop.common,
1111 heads=pullop.heads or pullop.rheads)
1111 heads=pullop.heads or pullop.rheads)
1112 elif pullop.heads is None:
1112 elif pullop.heads is None:
1113 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1113 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1114 elif not pullop.remote.capable('changegroupsubset'):
1114 elif not pullop.remote.capable('changegroupsubset'):
1115 raise util.Abort(_("partial pull cannot be done because "
1115 raise util.Abort(_("partial pull cannot be done because "
1116 "other repository doesn't support "
1116 "other repository doesn't support "
1117 "changegroupsubset."))
1117 "changegroupsubset."))
1118 else:
1118 else:
1119 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1119 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1120 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
1120 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
1121 pullop.remote.url())
1121 pullop.remote.url())
1122
1122
1123 def _pullphase(pullop):
1123 def _pullphase(pullop):
1124 # Get remote phases data from remote
1124 # Get remote phases data from remote
1125 if 'phases' in pullop.stepsdone:
1125 if 'phases' in pullop.stepsdone:
1126 return
1126 return
1127 remotephases = pullop.remote.listkeys('phases')
1127 remotephases = pullop.remote.listkeys('phases')
1128 _pullapplyphases(pullop, remotephases)
1128 _pullapplyphases(pullop, remotephases)
1129
1129
1130 def _pullapplyphases(pullop, remotephases):
1130 def _pullapplyphases(pullop, remotephases):
1131 """apply phase movement from observed remote state"""
1131 """apply phase movement from observed remote state"""
1132 if 'phases' in pullop.stepsdone:
1132 if 'phases' in pullop.stepsdone:
1133 return
1133 return
1134 pullop.stepsdone.add('phases')
1134 pullop.stepsdone.add('phases')
1135 publishing = bool(remotephases.get('publishing', False))
1135 publishing = bool(remotephases.get('publishing', False))
1136 if remotephases and not publishing:
1136 if remotephases and not publishing:
1137 # remote is new and unpublishing
1137 # remote is new and unpublishing
1138 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1138 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1139 pullop.pulledsubset,
1139 pullop.pulledsubset,
1140 remotephases)
1140 remotephases)
1141 dheads = pullop.pulledsubset
1141 dheads = pullop.pulledsubset
1142 else:
1142 else:
1143 # Remote is old or publishing all common changesets
1143 # Remote is old or publishing all common changesets
1144 # should be seen as public
1144 # should be seen as public
1145 pheads = pullop.pulledsubset
1145 pheads = pullop.pulledsubset
1146 dheads = []
1146 dheads = []
1147 unfi = pullop.repo.unfiltered()
1147 unfi = pullop.repo.unfiltered()
1148 phase = unfi._phasecache.phase
1148 phase = unfi._phasecache.phase
1149 rev = unfi.changelog.nodemap.get
1149 rev = unfi.changelog.nodemap.get
1150 public = phases.public
1150 public = phases.public
1151 draft = phases.draft
1151 draft = phases.draft
1152
1152
1153 # exclude changesets already public locally and update the others
1153 # exclude changesets already public locally and update the others
1154 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1154 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1155 if pheads:
1155 if pheads:
1156 tr = pullop.gettransaction()
1156 tr = pullop.gettransaction()
1157 phases.advanceboundary(pullop.repo, tr, public, pheads)
1157 phases.advanceboundary(pullop.repo, tr, public, pheads)
1158
1158
1159 # exclude changesets already draft locally and update the others
1159 # exclude changesets already draft locally and update the others
1160 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1160 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1161 if dheads:
1161 if dheads:
1162 tr = pullop.gettransaction()
1162 tr = pullop.gettransaction()
1163 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1163 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1164
1164
1165 def _pullbookmarks(pullop):
1165 def _pullbookmarks(pullop):
1166 """process the remote bookmark information to update the local one"""
1166 """process the remote bookmark information to update the local one"""
1167 if 'bookmarks' in pullop.stepsdone:
1167 if 'bookmarks' in pullop.stepsdone:
1168 return
1168 return
1169 pullop.stepsdone.add('bookmarks')
1169 pullop.stepsdone.add('bookmarks')
1170 repo = pullop.repo
1170 repo = pullop.repo
1171 remotebookmarks = pullop.remotebookmarks
1171 remotebookmarks = pullop.remotebookmarks
1172 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1172 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1173 pullop.remote.url(),
1173 pullop.remote.url(),
1174 pullop.gettransaction,
1174 pullop.gettransaction,
1175 explicit=pullop.explicitbookmarks)
1175 explicit=pullop.explicitbookmarks)
1176
1176
1177 def _pullobsolete(pullop):
1177 def _pullobsolete(pullop):
1178 """utility function to pull obsolete markers from a remote
1178 """utility function to pull obsolete markers from a remote
1179
1179
1180 The `gettransaction` is function that return the pull transaction, creating
1180 The `gettransaction` is function that return the pull transaction, creating
1181 one if necessary. We return the transaction to inform the calling code that
1181 one if necessary. We return the transaction to inform the calling code that
1182 a new transaction have been created (when applicable).
1182 a new transaction have been created (when applicable).
1183
1183
1184 Exists mostly to allow overriding for experimentation purpose"""
1184 Exists mostly to allow overriding for experimentation purpose"""
1185 if 'obsmarkers' in pullop.stepsdone:
1185 if 'obsmarkers' in pullop.stepsdone:
1186 return
1186 return
1187 pullop.stepsdone.add('obsmarkers')
1187 pullop.stepsdone.add('obsmarkers')
1188 tr = None
1188 tr = None
1189 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1189 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1190 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1190 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1191 remoteobs = pullop.remote.listkeys('obsolete')
1191 remoteobs = pullop.remote.listkeys('obsolete')
1192 if 'dump0' in remoteobs:
1192 if 'dump0' in remoteobs:
1193 tr = pullop.gettransaction()
1193 tr = pullop.gettransaction()
1194 for key in sorted(remoteobs, reverse=True):
1194 for key in sorted(remoteobs, reverse=True):
1195 if key.startswith('dump'):
1195 if key.startswith('dump'):
1196 data = base85.b85decode(remoteobs[key])
1196 data = base85.b85decode(remoteobs[key])
1197 pullop.repo.obsstore.mergemarkers(tr, data)
1197 pullop.repo.obsstore.mergemarkers(tr, data)
1198 pullop.repo.invalidatevolatilesets()
1198 pullop.repo.invalidatevolatilesets()
1199 return tr
1199 return tr
1200
1200
1201 def caps20to10(repo):
1201 def caps20to10(repo):
1202 """return a set with appropriate options to use bundle20 during getbundle"""
1202 """return a set with appropriate options to use bundle20 during getbundle"""
1203 caps = set(['HG20'])
1203 caps = set(['HG20'])
1204 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1204 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1205 caps.add('bundle2=' + urllib.quote(capsblob))
1205 caps.add('bundle2=' + urllib.quote(capsblob))
1206 return caps
1206 return caps
1207
1207
1208 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1208 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1209 getbundle2partsorder = []
1209 getbundle2partsorder = []
1210
1210
1211 # Mapping between step name and function
1211 # Mapping between step name and function
1212 #
1212 #
1213 # This exists to help extensions wrap steps if necessary
1213 # This exists to help extensions wrap steps if necessary
1214 getbundle2partsmapping = {}
1214 getbundle2partsmapping = {}
1215
1215
1216 def getbundle2partsgenerator(stepname, idx=None):
1216 def getbundle2partsgenerator(stepname, idx=None):
1217 """decorator for function generating bundle2 part for getbundle
1217 """decorator for function generating bundle2 part for getbundle
1218
1218
1219 The function is added to the step -> function mapping and appended to the
1219 The function is added to the step -> function mapping and appended to the
1220 list of steps. Beware that decorated functions will be added in order
1220 list of steps. Beware that decorated functions will be added in order
1221 (this may matter).
1221 (this may matter).
1222
1222
1223 You can only use this decorator for new steps, if you want to wrap a step
1223 You can only use this decorator for new steps, if you want to wrap a step
1224 from an extension, attack the getbundle2partsmapping dictionary directly."""
1224 from an extension, attack the getbundle2partsmapping dictionary directly."""
1225 def dec(func):
1225 def dec(func):
1226 assert stepname not in getbundle2partsmapping
1226 assert stepname not in getbundle2partsmapping
1227 getbundle2partsmapping[stepname] = func
1227 getbundle2partsmapping[stepname] = func
1228 if idx is None:
1228 if idx is None:
1229 getbundle2partsorder.append(stepname)
1229 getbundle2partsorder.append(stepname)
1230 else:
1230 else:
1231 getbundle2partsorder.insert(idx, stepname)
1231 getbundle2partsorder.insert(idx, stepname)
1232 return func
1232 return func
1233 return dec
1233 return dec
1234
1234
1235 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
1235 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
1236 **kwargs):
1236 **kwargs):
1237 """return a full bundle (with potentially multiple kind of parts)
1237 """return a full bundle (with potentially multiple kind of parts)
1238
1238
1239 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
1239 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
1240 passed. For now, the bundle can contain only changegroup, but this will
1240 passed. For now, the bundle can contain only changegroup, but this will
1241 changes when more part type will be available for bundle2.
1241 changes when more part type will be available for bundle2.
1242
1242
1243 This is different from changegroup.getchangegroup that only returns an HG10
1243 This is different from changegroup.getchangegroup that only returns an HG10
1244 changegroup bundle. They may eventually get reunited in the future when we
1244 changegroup bundle. They may eventually get reunited in the future when we
1245 have a clearer idea of the API we what to query different data.
1245 have a clearer idea of the API we what to query different data.
1246
1246
1247 The implementation is at a very early stage and will get massive rework
1247 The implementation is at a very early stage and will get massive rework
1248 when the API of bundle is refined.
1248 when the API of bundle is refined.
1249 """
1249 """
1250 # bundle10 case
1250 # bundle10 case
1251 usebundle2 = False
1251 usebundle2 = False
1252 if bundlecaps is not None:
1252 if bundlecaps is not None:
1253 usebundle2 = any((cap.startswith('HG2') for cap in bundlecaps))
1253 usebundle2 = any((cap.startswith('HG2') for cap in bundlecaps))
1254 if not usebundle2:
1254 if not usebundle2:
1255 if bundlecaps and not kwargs.get('cg', True):
1255 if bundlecaps and not kwargs.get('cg', True):
1256 raise ValueError(_('request for bundle10 must include changegroup'))
1256 raise ValueError(_('request for bundle10 must include changegroup'))
1257
1257
1258 if kwargs:
1258 if kwargs:
1259 raise ValueError(_('unsupported getbundle arguments: %s')
1259 raise ValueError(_('unsupported getbundle arguments: %s')
1260 % ', '.join(sorted(kwargs.keys())))
1260 % ', '.join(sorted(kwargs.keys())))
1261 return changegroup.getchangegroup(repo, source, heads=heads,
1261 return changegroup.getchangegroup(repo, source, heads=heads,
1262 common=common, bundlecaps=bundlecaps)
1262 common=common, bundlecaps=bundlecaps)
1263
1263
1264 # bundle20 case
1264 # bundle20 case
1265 b2caps = {}
1265 b2caps = {}
1266 for bcaps in bundlecaps:
1266 for bcaps in bundlecaps:
1267 if bcaps.startswith('bundle2='):
1267 if bcaps.startswith('bundle2='):
1268 blob = urllib.unquote(bcaps[len('bundle2='):])
1268 blob = urllib.unquote(bcaps[len('bundle2='):])
1269 b2caps.update(bundle2.decodecaps(blob))
1269 b2caps.update(bundle2.decodecaps(blob))
1270 bundler = bundle2.bundle20(repo.ui, b2caps)
1270 bundler = bundle2.bundle20(repo.ui, b2caps)
1271
1271
1272 kwargs['heads'] = heads
1272 kwargs['heads'] = heads
1273 kwargs['common'] = common
1273 kwargs['common'] = common
1274
1274
1275 for name in getbundle2partsorder:
1275 for name in getbundle2partsorder:
1276 func = getbundle2partsmapping[name]
1276 func = getbundle2partsmapping[name]
1277 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1277 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1278 **kwargs)
1278 **kwargs)
1279
1279
1280 return util.chunkbuffer(bundler.getchunks())
1280 return util.chunkbuffer(bundler.getchunks())
1281
1281
1282 @getbundle2partsgenerator('changegroup')
1282 @getbundle2partsgenerator('changegroup')
1283 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1283 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1284 b2caps=None, heads=None, common=None, **kwargs):
1284 b2caps=None, heads=None, common=None, **kwargs):
1285 """add a changegroup part to the requested bundle"""
1285 """add a changegroup part to the requested bundle"""
1286 cg = None
1286 cg = None
1287 if kwargs.get('cg', True):
1287 if kwargs.get('cg', True):
1288 # build changegroup bundle here.
1288 # build changegroup bundle here.
1289 version = None
1289 version = None
1290 cgversions = b2caps.get('changegroup')
1290 cgversions = b2caps.get('changegroup')
1291 getcgkwargs = {}
1291 getcgkwargs = {}
1292 if cgversions: # 3.1 and 3.2 ship with an empty value
1292 if cgversions: # 3.1 and 3.2 ship with an empty value
1293 cgversions = [v for v in cgversions if v in changegroup.packermap]
1293 cgversions = [v for v in cgversions if v in changegroup.packermap]
1294 if not cgversions:
1294 if not cgversions:
1295 raise ValueError(_('no common changegroup version'))
1295 raise ValueError(_('no common changegroup version'))
1296 version = getcgkwargs['version'] = max(cgversions)
1296 version = getcgkwargs['version'] = max(cgversions)
1297 outgoing = changegroup.computeoutgoing(repo, heads, common)
1297 outgoing = changegroup.computeoutgoing(repo, heads, common)
1298 cg = changegroup.getlocalchangegroupraw(repo, source, outgoing,
1298 cg = changegroup.getlocalchangegroupraw(repo, source, outgoing,
1299 bundlecaps=bundlecaps,
1299 bundlecaps=bundlecaps,
1300 **getcgkwargs)
1300 **getcgkwargs)
1301
1301
1302 if cg:
1302 if cg:
1303 part = bundler.newpart('changegroup', data=cg)
1303 part = bundler.newpart('changegroup', data=cg)
1304 if version is not None:
1304 if version is not None:
1305 part.addparam('version', version)
1305 part.addparam('version', version)
1306 part.addparam('nbchanges', str(len(outgoing.missing)), mandatory=False)
1306 part.addparam('nbchanges', str(len(outgoing.missing)), mandatory=False)
1307
1307
1308 @getbundle2partsgenerator('listkeys')
1308 @getbundle2partsgenerator('listkeys')
1309 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1309 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1310 b2caps=None, **kwargs):
1310 b2caps=None, **kwargs):
1311 """add parts containing listkeys namespaces to the requested bundle"""
1311 """add parts containing listkeys namespaces to the requested bundle"""
1312 listkeys = kwargs.get('listkeys', ())
1312 listkeys = kwargs.get('listkeys', ())
1313 for namespace in listkeys:
1313 for namespace in listkeys:
1314 part = bundler.newpart('listkeys')
1314 part = bundler.newpart('listkeys')
1315 part.addparam('namespace', namespace)
1315 part.addparam('namespace', namespace)
1316 keys = repo.listkeys(namespace).items()
1316 keys = repo.listkeys(namespace).items()
1317 part.data = pushkey.encodekeys(keys)
1317 part.data = pushkey.encodekeys(keys)
1318
1318
1319 @getbundle2partsgenerator('obsmarkers')
1319 @getbundle2partsgenerator('obsmarkers')
1320 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1320 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1321 b2caps=None, heads=None, **kwargs):
1321 b2caps=None, heads=None, **kwargs):
1322 """add an obsolescence markers part to the requested bundle"""
1322 """add an obsolescence markers part to the requested bundle"""
1323 if kwargs.get('obsmarkers', False):
1323 if kwargs.get('obsmarkers', False):
1324 if heads is None:
1324 if heads is None:
1325 heads = repo.heads()
1325 heads = repo.heads()
1326 subset = [c.node() for c in repo.set('::%ln', heads)]
1326 subset = [c.node() for c in repo.set('::%ln', heads)]
1327 markers = repo.obsstore.relevantmarkers(subset)
1327 markers = repo.obsstore.relevantmarkers(subset)
1328 markers = sorted(markers)
1328 markers = sorted(markers)
1329 buildobsmarkerspart(bundler, markers)
1329 buildobsmarkerspart(bundler, markers)
1330
1330
1331 @getbundle2partsgenerator('hgtagsfnodes')
1331 @getbundle2partsgenerator('hgtagsfnodes')
1332 def _getbundletagsfnodes(bundler, repo, source, bundlecaps=None,
1332 def _getbundletagsfnodes(bundler, repo, source, bundlecaps=None,
1333 b2caps=None, heads=None, common=None,
1333 b2caps=None, heads=None, common=None,
1334 **kwargs):
1334 **kwargs):
1335 """Transfer the .hgtags filenodes mapping.
1335 """Transfer the .hgtags filenodes mapping.
1336
1336
1337 Only values for heads in this bundle will be transferred.
1337 Only values for heads in this bundle will be transferred.
1338
1338
1339 The part data consists of pairs of 20 byte changeset node and .hgtags
1339 The part data consists of pairs of 20 byte changeset node and .hgtags
1340 filenodes raw values.
1340 filenodes raw values.
1341 """
1341 """
1342 # Don't send unless:
1342 # Don't send unless:
1343 # - changeset are being exchanged,
1343 # - changeset are being exchanged,
1344 # - the client supports it.
1344 # - the client supports it.
1345 if not (kwargs.get('cg', True) and 'hgtagsfnodes' in b2caps):
1345 if not (kwargs.get('cg', True) and 'hgtagsfnodes' in b2caps):
1346 return
1346 return
1347
1347
1348 outgoing = changegroup.computeoutgoing(repo, heads, common)
1348 outgoing = changegroup.computeoutgoing(repo, heads, common)
1349
1349
1350 if not outgoing.missingheads:
1350 if not outgoing.missingheads:
1351 return
1351 return
1352
1352
1353 cache = tags.hgtagsfnodescache(repo.unfiltered())
1353 cache = tags.hgtagsfnodescache(repo.unfiltered())
1354 chunks = []
1354 chunks = []
1355
1355
1356 # .hgtags fnodes are only relevant for head changesets. While we could
1356 # .hgtags fnodes are only relevant for head changesets. While we could
1357 # transfer values for all known nodes, there will likely be little to
1357 # transfer values for all known nodes, there will likely be little to
1358 # no benefit.
1358 # no benefit.
1359 #
1359 #
1360 # We don't bother using a generator to produce output data because
1360 # We don't bother using a generator to produce output data because
1361 # a) we only have 40 bytes per head and even esoteric numbers of heads
1361 # a) we only have 40 bytes per head and even esoteric numbers of heads
1362 # consume little memory (1M heads is 40MB) b) we don't want to send the
1362 # consume little memory (1M heads is 40MB) b) we don't want to send the
1363 # part if we don't have entries and knowing if we have entries requires
1363 # part if we don't have entries and knowing if we have entries requires
1364 # cache lookups.
1364 # cache lookups.
1365 for node in outgoing.missingheads:
1365 for node in outgoing.missingheads:
1366 # Don't compute missing, as this may slow down serving.
1366 # Don't compute missing, as this may slow down serving.
1367 fnode = cache.getfnode(node, computemissing=False)
1367 fnode = cache.getfnode(node, computemissing=False)
1368 if fnode is not None:
1368 if fnode is not None:
1369 chunks.extend([node, fnode])
1369 chunks.extend([node, fnode])
1370
1370
1371 if chunks:
1371 if chunks:
1372 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1372 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1373
1373
1374 def check_heads(repo, their_heads, context):
1374 def check_heads(repo, their_heads, context):
1375 """check if the heads of a repo have been modified
1375 """check if the heads of a repo have been modified
1376
1376
1377 Used by peer for unbundling.
1377 Used by peer for unbundling.
1378 """
1378 """
1379 heads = repo.heads()
1379 heads = repo.heads()
1380 heads_hash = util.sha1(''.join(sorted(heads))).digest()
1380 heads_hash = util.sha1(''.join(sorted(heads))).digest()
1381 if not (their_heads == ['force'] or their_heads == heads or
1381 if not (their_heads == ['force'] or their_heads == heads or
1382 their_heads == ['hashed', heads_hash]):
1382 their_heads == ['hashed', heads_hash]):
1383 # someone else committed/pushed/unbundled while we
1383 # someone else committed/pushed/unbundled while we
1384 # were transferring data
1384 # were transferring data
1385 raise error.PushRaced('repository changed while %s - '
1385 raise error.PushRaced('repository changed while %s - '
1386 'please try again' % context)
1386 'please try again' % context)
1387
1387
1388 def unbundle(repo, cg, heads, source, url):
1388 def unbundle(repo, cg, heads, source, url):
1389 """Apply a bundle to a repo.
1389 """Apply a bundle to a repo.
1390
1390
1391 this function makes sure the repo is locked during the application and have
1391 this function makes sure the repo is locked during the application and have
1392 mechanism to check that no push race occurred between the creation of the
1392 mechanism to check that no push race occurred between the creation of the
1393 bundle and its application.
1393 bundle and its application.
1394
1394
1395 If the push was raced as PushRaced exception is raised."""
1395 If the push was raced as PushRaced exception is raised."""
1396 r = 0
1396 r = 0
1397 # need a transaction when processing a bundle2 stream
1397 # need a transaction when processing a bundle2 stream
1398 wlock = lock = tr = None
1398 wlock = lock = tr = None
1399 recordout = None
1399 recordout = None
1400 # quick fix for output mismatch with bundle2 in 3.4
1400 # quick fix for output mismatch with bundle2 in 3.4
1401 captureoutput = repo.ui.configbool('experimental', 'bundle2-output-capture',
1401 captureoutput = repo.ui.configbool('experimental', 'bundle2-output-capture',
1402 False)
1402 False)
1403 if url.startswith('remote:http:') or url.startswith('remote:https:'):
1403 if url.startswith('remote:http:') or url.startswith('remote:https:'):
1404 captureoutput = True
1404 captureoutput = True
1405 try:
1405 try:
1406 check_heads(repo, heads, 'uploading changes')
1406 check_heads(repo, heads, 'uploading changes')
1407 # push can proceed
1407 # push can proceed
1408 if util.safehasattr(cg, 'params'):
1408 if util.safehasattr(cg, 'params'):
1409 r = None
1409 r = None
1410 try:
1410 try:
1411 wlock = repo.wlock()
1411 wlock = repo.wlock()
1412 lock = repo.lock()
1412 lock = repo.lock()
1413 tr = repo.transaction(source)
1413 tr = repo.transaction(source)
1414 tr.hookargs['source'] = source
1414 tr.hookargs['source'] = source
1415 tr.hookargs['url'] = url
1415 tr.hookargs['url'] = url
1416 tr.hookargs['bundle2'] = '1'
1416 tr.hookargs['bundle2'] = '1'
1417 op = bundle2.bundleoperation(repo, lambda: tr,
1417 op = bundle2.bundleoperation(repo, lambda: tr,
1418 captureoutput=captureoutput)
1418 captureoutput=captureoutput)
1419 try:
1419 try:
1420 r = bundle2.processbundle(repo, cg, op=op)
1420 r = bundle2.processbundle(repo, cg, op=op)
1421 finally:
1421 finally:
1422 r = op.reply
1422 r = op.reply
1423 if captureoutput and r is not None:
1423 if captureoutput and r is not None:
1424 repo.ui.pushbuffer(error=True, subproc=True)
1424 repo.ui.pushbuffer(error=True, subproc=True)
1425 def recordout(output):
1425 def recordout(output):
1426 r.newpart('output', data=output, mandatory=False)
1426 r.newpart('output', data=output, mandatory=False)
1427 tr.close()
1427 tr.close()
1428 except BaseException, exc:
1428 except BaseException, exc:
1429 exc.duringunbundle2 = True
1429 exc.duringunbundle2 = True
1430 if captureoutput and r is not None:
1430 if captureoutput and r is not None:
1431 parts = exc._bundle2salvagedoutput = r.salvageoutput()
1431 parts = exc._bundle2salvagedoutput = r.salvageoutput()
1432 def recordout(output):
1432 def recordout(output):
1433 part = bundle2.bundlepart('output', data=output,
1433 part = bundle2.bundlepart('output', data=output,
1434 mandatory=False)
1434 mandatory=False)
1435 parts.append(part)
1435 parts.append(part)
1436 raise
1436 raise
1437 else:
1437 else:
1438 lock = repo.lock()
1438 lock = repo.lock()
1439 r = changegroup.addchangegroup(repo, cg, source, url)
1439 r = changegroup.addchangegroup(repo, cg, source, url)
1440 finally:
1440 finally:
1441 lockmod.release(tr, lock, wlock)
1441 lockmod.release(tr, lock, wlock)
1442 if recordout is not None:
1442 if recordout is not None:
1443 recordout(repo.ui.popbuffer())
1443 recordout(repo.ui.popbuffer())
1444 return r
1444 return r
1445
1445
1446 # This is it's own function so extensions can override it.
1446 # This is it's own function so extensions can override it.
1447 def _walkstreamfiles(repo):
1447 def _walkstreamfiles(repo):
1448 return repo.store.walk()
1448 return repo.store.walk()
1449
1449
1450 def generatestreamclone(repo):
1450 def generatestreamclone(repo):
1451 """Emit content for a streaming clone.
1451 """Emit content for a streaming clone.
1452
1452
1453 This is a generator of raw chunks that constitute a streaming clone.
1453 This is a generator of raw chunks that constitute a streaming clone.
1454
1454
1455 The stream begins with a line of 2 space-delimited integers containing the
1455 The stream begins with a line of 2 space-delimited integers containing the
1456 number of entries and total bytes size.
1456 number of entries and total bytes size.
1457
1457
1458 Next, are N entries for each file being transferred. Each file entry starts
1458 Next, are N entries for each file being transferred. Each file entry starts
1459 as a line with the file name and integer size delimited by a null byte.
1459 as a line with the file name and integer size delimited by a null byte.
1460 The raw file data follows. Following the raw file data is the next file
1460 The raw file data follows. Following the raw file data is the next file
1461 entry, or EOF.
1461 entry, or EOF.
1462
1462
1463 When used on the wire protocol, an additional line indicating protocol
1463 When used on the wire protocol, an additional line indicating protocol
1464 success will be prepended to the stream. This function is not responsible
1464 success will be prepended to the stream. This function is not responsible
1465 for adding it.
1465 for adding it.
1466
1466
1467 This function will obtain a repository lock to ensure a consistent view of
1467 This function will obtain a repository lock to ensure a consistent view of
1468 the store is captured. It therefore may raise LockError.
1468 the store is captured. It therefore may raise LockError.
1469 """
1469 """
1470 entries = []
1470 entries = []
1471 total_bytes = 0
1471 total_bytes = 0
1472 # Get consistent snapshot of repo, lock during scan.
1472 # Get consistent snapshot of repo, lock during scan.
1473 lock = repo.lock()
1473 lock = repo.lock()
1474 try:
1474 try:
1475 repo.ui.debug('scanning\n')
1475 repo.ui.debug('scanning\n')
1476 for name, ename, size in _walkstreamfiles(repo):
1476 for name, ename, size in _walkstreamfiles(repo):
1477 if size:
1477 if size:
1478 entries.append((name, size))
1478 entries.append((name, size))
1479 total_bytes += size
1479 total_bytes += size
1480 finally:
1480 finally:
1481 lock.release()
1481 lock.release()
1482
1482
1483 repo.ui.debug('%d files, %d bytes to transfer\n' %
1483 repo.ui.debug('%d files, %d bytes to transfer\n' %
1484 (len(entries), total_bytes))
1484 (len(entries), total_bytes))
1485 yield '%d %d\n' % (len(entries), total_bytes)
1485 yield '%d %d\n' % (len(entries), total_bytes)
1486
1486
1487 sopener = repo.svfs
1487 sopener = repo.svfs
1488 oldaudit = sopener.mustaudit
1488 oldaudit = sopener.mustaudit
1489 debugflag = repo.ui.debugflag
1489 debugflag = repo.ui.debugflag
1490 sopener.mustaudit = False
1490 sopener.mustaudit = False
1491
1491
1492 try:
1492 try:
1493 for name, size in entries:
1493 for name, size in entries:
1494 if debugflag:
1494 if debugflag:
1495 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
1495 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
1496 # partially encode name over the wire for backwards compat
1496 # partially encode name over the wire for backwards compat
1497 yield '%s\0%d\n' % (store.encodedir(name), size)
1497 yield '%s\0%d\n' % (store.encodedir(name), size)
1498 if size <= 65536:
1498 if size <= 65536:
1499 fp = sopener(name)
1499 fp = sopener(name)
1500 try:
1500 try:
1501 data = fp.read(size)
1501 data = fp.read(size)
1502 finally:
1502 finally:
1503 fp.close()
1503 fp.close()
1504 yield data
1504 yield data
1505 else:
1505 else:
1506 for chunk in util.filechunkiter(sopener(name), limit=size):
1506 for chunk in util.filechunkiter(sopener(name), limit=size):
1507 yield chunk
1507 yield chunk
1508 finally:
1508 finally:
1509 sopener.mustaudit = oldaudit
1509 sopener.mustaudit = oldaudit
1510
1510
1511 def consumestreamclone(repo, fp):
1511 def consumestreamclone(repo, fp):
1512 """Apply the contents from a streaming clone file.
1512 """Apply the contents from a streaming clone file.
1513
1513
1514 This takes the output from "streamout" and applies it to the specified
1514 This takes the output from "streamout" and applies it to the specified
1515 repository.
1515 repository.
1516
1516
1517 Like "streamout," the status line added by the wire protocol is not handled
1517 Like "streamout," the status line added by the wire protocol is not handled
1518 by this function.
1518 by this function.
1519 """
1519 """
1520 lock = repo.lock()
1520 lock = repo.lock()
1521 try:
1521 try:
1522 repo.ui.status(_('streaming all changes\n'))
1522 repo.ui.status(_('streaming all changes\n'))
1523 l = fp.readline()
1523 l = fp.readline()
1524 try:
1524 try:
1525 total_files, total_bytes = map(int, l.split(' ', 1))
1525 total_files, total_bytes = map(int, l.split(' ', 1))
1526 except (ValueError, TypeError):
1526 except (ValueError, TypeError):
1527 raise error.ResponseError(
1527 raise error.ResponseError(
1528 _('unexpected response from remote server:'), l)
1528 _('unexpected response from remote server:'), l)
1529 repo.ui.status(_('%d files to transfer, %s of data\n') %
1529 repo.ui.status(_('%d files to transfer, %s of data\n') %
1530 (total_files, util.bytecount(total_bytes)))
1530 (total_files, util.bytecount(total_bytes)))
1531 handled_bytes = 0
1531 handled_bytes = 0
1532 repo.ui.progress(_('clone'), 0, total=total_bytes)
1532 repo.ui.progress(_('clone'), 0, total=total_bytes)
1533 start = time.time()
1533 start = time.time()
1534
1534
1535 tr = repo.transaction(_('clone'))
1535 tr = repo.transaction(_('clone'))
1536 try:
1536 try:
1537 for i in xrange(total_files):
1537 for i in xrange(total_files):
1538 # XXX doesn't support '\n' or '\r' in filenames
1538 # XXX doesn't support '\n' or '\r' in filenames
1539 l = fp.readline()
1539 l = fp.readline()
1540 try:
1540 try:
1541 name, size = l.split('\0', 1)
1541 name, size = l.split('\0', 1)
1542 size = int(size)
1542 size = int(size)
1543 except (ValueError, TypeError):
1543 except (ValueError, TypeError):
1544 raise error.ResponseError(
1544 raise error.ResponseError(
1545 _('unexpected response from remote server:'), l)
1545 _('unexpected response from remote server:'), l)
1546 if repo.ui.debugflag:
1546 if repo.ui.debugflag:
1547 repo.ui.debug('adding %s (%s)\n' %
1547 repo.ui.debug('adding %s (%s)\n' %
1548 (name, util.bytecount(size)))
1548 (name, util.bytecount(size)))
1549 # for backwards compat, name was partially encoded
1549 # for backwards compat, name was partially encoded
1550 ofp = repo.svfs(store.decodedir(name), 'w')
1550 ofp = repo.svfs(store.decodedir(name), 'w')
1551 for chunk in util.filechunkiter(fp, limit=size):
1551 for chunk in util.filechunkiter(fp, limit=size):
1552 handled_bytes += len(chunk)
1552 handled_bytes += len(chunk)
1553 repo.ui.progress(_('clone'), handled_bytes,
1553 repo.ui.progress(_('clone'), handled_bytes,
1554 total=total_bytes)
1554 total=total_bytes)
1555 ofp.write(chunk)
1555 ofp.write(chunk)
1556 ofp.close()
1556 ofp.close()
1557 tr.close()
1557 tr.close()
1558 finally:
1558 finally:
1559 tr.release()
1559 tr.release()
1560
1560
1561 # Writing straight to files circumvented the inmemory caches
1561 # Writing straight to files circumvented the inmemory caches
1562 repo.invalidate()
1562 repo.invalidate()
1563
1563
1564 elapsed = time.time() - start
1564 elapsed = time.time() - start
1565 if elapsed <= 0:
1565 if elapsed <= 0:
1566 elapsed = 0.001
1566 elapsed = 0.001
1567 repo.ui.progress(_('clone'), None)
1567 repo.ui.progress(_('clone'), None)
1568 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
1568 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
1569 (util.bytecount(total_bytes), elapsed,
1569 (util.bytecount(total_bytes), elapsed,
1570 util.bytecount(total_bytes / elapsed)))
1570 util.bytecount(total_bytes / elapsed)))
1571 finally:
1571 finally:
1572 lock.release()
1572 lock.release()
General Comments 0
You need to be logged in to leave comments. Login now