##// END OF EJS Templates
bundle2: store the salvaged output on the exception object...
Pierre-Yves David -
r24795:f9aa4cb8 default
parent child Browse files
Show More
@@ -1,1260 +1,1264 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 import errno
148 import errno
149 import sys
149 import sys
150 import util
150 import util
151 import struct
151 import struct
152 import urllib
152 import urllib
153 import string
153 import string
154 import obsolete
154 import obsolete
155 import pushkey
155 import pushkey
156 import url
156 import url
157 import re
157 import re
158
158
159 import changegroup, error
159 import changegroup, error
160 from i18n import _
160 from i18n import _
161
161
162 _pack = struct.pack
162 _pack = struct.pack
163 _unpack = struct.unpack
163 _unpack = struct.unpack
164
164
165 _fstreamparamsize = '>i'
165 _fstreamparamsize = '>i'
166 _fpartheadersize = '>i'
166 _fpartheadersize = '>i'
167 _fparttypesize = '>B'
167 _fparttypesize = '>B'
168 _fpartid = '>I'
168 _fpartid = '>I'
169 _fpayloadsize = '>i'
169 _fpayloadsize = '>i'
170 _fpartparamcount = '>BB'
170 _fpartparamcount = '>BB'
171
171
172 preferedchunksize = 4096
172 preferedchunksize = 4096
173
173
174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
175
175
176 def validateparttype(parttype):
176 def validateparttype(parttype):
177 """raise ValueError if a parttype contains invalid character"""
177 """raise ValueError if a parttype contains invalid character"""
178 if _parttypeforbidden.search(parttype):
178 if _parttypeforbidden.search(parttype):
179 raise ValueError(parttype)
179 raise ValueError(parttype)
180
180
181 def _makefpartparamsizes(nbparams):
181 def _makefpartparamsizes(nbparams):
182 """return a struct format to read part parameter sizes
182 """return a struct format to read part parameter sizes
183
183
184 The number parameters is variable so we need to build that format
184 The number parameters is variable so we need to build that format
185 dynamically.
185 dynamically.
186 """
186 """
187 return '>'+('BB'*nbparams)
187 return '>'+('BB'*nbparams)
188
188
189 parthandlermapping = {}
189 parthandlermapping = {}
190
190
191 def parthandler(parttype, params=()):
191 def parthandler(parttype, params=()):
192 """decorator that register a function as a bundle2 part handler
192 """decorator that register a function as a bundle2 part handler
193
193
194 eg::
194 eg::
195
195
196 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
196 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
197 def myparttypehandler(...):
197 def myparttypehandler(...):
198 '''process a part of type "my part".'''
198 '''process a part of type "my part".'''
199 ...
199 ...
200 """
200 """
201 validateparttype(parttype)
201 validateparttype(parttype)
202 def _decorator(func):
202 def _decorator(func):
203 lparttype = parttype.lower() # enforce lower case matching.
203 lparttype = parttype.lower() # enforce lower case matching.
204 assert lparttype not in parthandlermapping
204 assert lparttype not in parthandlermapping
205 parthandlermapping[lparttype] = func
205 parthandlermapping[lparttype] = func
206 func.params = frozenset(params)
206 func.params = frozenset(params)
207 return func
207 return func
208 return _decorator
208 return _decorator
209
209
210 class unbundlerecords(object):
210 class unbundlerecords(object):
211 """keep record of what happens during and unbundle
211 """keep record of what happens during and unbundle
212
212
213 New records are added using `records.add('cat', obj)`. Where 'cat' is a
213 New records are added using `records.add('cat', obj)`. Where 'cat' is a
214 category of record and obj is an arbitrary object.
214 category of record and obj is an arbitrary object.
215
215
216 `records['cat']` will return all entries of this category 'cat'.
216 `records['cat']` will return all entries of this category 'cat'.
217
217
218 Iterating on the object itself will yield `('category', obj)` tuples
218 Iterating on the object itself will yield `('category', obj)` tuples
219 for all entries.
219 for all entries.
220
220
221 All iterations happens in chronological order.
221 All iterations happens in chronological order.
222 """
222 """
223
223
224 def __init__(self):
224 def __init__(self):
225 self._categories = {}
225 self._categories = {}
226 self._sequences = []
226 self._sequences = []
227 self._replies = {}
227 self._replies = {}
228
228
229 def add(self, category, entry, inreplyto=None):
229 def add(self, category, entry, inreplyto=None):
230 """add a new record of a given category.
230 """add a new record of a given category.
231
231
232 The entry can then be retrieved in the list returned by
232 The entry can then be retrieved in the list returned by
233 self['category']."""
233 self['category']."""
234 self._categories.setdefault(category, []).append(entry)
234 self._categories.setdefault(category, []).append(entry)
235 self._sequences.append((category, entry))
235 self._sequences.append((category, entry))
236 if inreplyto is not None:
236 if inreplyto is not None:
237 self.getreplies(inreplyto).add(category, entry)
237 self.getreplies(inreplyto).add(category, entry)
238
238
239 def getreplies(self, partid):
239 def getreplies(self, partid):
240 """get the records that are replies to a specific part"""
240 """get the records that are replies to a specific part"""
241 return self._replies.setdefault(partid, unbundlerecords())
241 return self._replies.setdefault(partid, unbundlerecords())
242
242
243 def __getitem__(self, cat):
243 def __getitem__(self, cat):
244 return tuple(self._categories.get(cat, ()))
244 return tuple(self._categories.get(cat, ()))
245
245
246 def __iter__(self):
246 def __iter__(self):
247 return iter(self._sequences)
247 return iter(self._sequences)
248
248
249 def __len__(self):
249 def __len__(self):
250 return len(self._sequences)
250 return len(self._sequences)
251
251
252 def __nonzero__(self):
252 def __nonzero__(self):
253 return bool(self._sequences)
253 return bool(self._sequences)
254
254
255 class bundleoperation(object):
255 class bundleoperation(object):
256 """an object that represents a single bundling process
256 """an object that represents a single bundling process
257
257
258 Its purpose is to carry unbundle-related objects and states.
258 Its purpose is to carry unbundle-related objects and states.
259
259
260 A new object should be created at the beginning of each bundle processing.
260 A new object should be created at the beginning of each bundle processing.
261 The object is to be returned by the processing function.
261 The object is to be returned by the processing function.
262
262
263 The object has very little content now it will ultimately contain:
263 The object has very little content now it will ultimately contain:
264 * an access to the repo the bundle is applied to,
264 * an access to the repo the bundle is applied to,
265 * a ui object,
265 * a ui object,
266 * a way to retrieve a transaction to add changes to the repo,
266 * a way to retrieve a transaction to add changes to the repo,
267 * a way to record the result of processing each part,
267 * a way to record the result of processing each part,
268 * a way to construct a bundle response when applicable.
268 * a way to construct a bundle response when applicable.
269 """
269 """
270
270
271 def __init__(self, repo, transactiongetter):
271 def __init__(self, repo, transactiongetter):
272 self.repo = repo
272 self.repo = repo
273 self.ui = repo.ui
273 self.ui = repo.ui
274 self.records = unbundlerecords()
274 self.records = unbundlerecords()
275 self.gettransaction = transactiongetter
275 self.gettransaction = transactiongetter
276 self.reply = None
276 self.reply = None
277
277
278 class TransactionUnavailable(RuntimeError):
278 class TransactionUnavailable(RuntimeError):
279 pass
279 pass
280
280
281 def _notransaction():
281 def _notransaction():
282 """default method to get a transaction while processing a bundle
282 """default method to get a transaction while processing a bundle
283
283
284 Raise an exception to highlight the fact that no transaction was expected
284 Raise an exception to highlight the fact that no transaction was expected
285 to be created"""
285 to be created"""
286 raise TransactionUnavailable()
286 raise TransactionUnavailable()
287
287
288 def processbundle(repo, unbundler, transactiongetter=None):
288 def processbundle(repo, unbundler, transactiongetter=None):
289 """This function process a bundle, apply effect to/from a repo
289 """This function process a bundle, apply effect to/from a repo
290
290
291 It iterates over each part then searches for and uses the proper handling
291 It iterates over each part then searches for and uses the proper handling
292 code to process the part. Parts are processed in order.
292 code to process the part. Parts are processed in order.
293
293
294 This is very early version of this function that will be strongly reworked
294 This is very early version of this function that will be strongly reworked
295 before final usage.
295 before final usage.
296
296
297 Unknown Mandatory part will abort the process.
297 Unknown Mandatory part will abort the process.
298 """
298 """
299 if transactiongetter is None:
299 if transactiongetter is None:
300 transactiongetter = _notransaction
300 transactiongetter = _notransaction
301 op = bundleoperation(repo, transactiongetter)
301 op = bundleoperation(repo, transactiongetter)
302 # todo:
302 # todo:
303 # - replace this is a init function soon.
303 # - replace this is a init function soon.
304 # - exception catching
304 # - exception catching
305 unbundler.params
305 unbundler.params
306 iterparts = unbundler.iterparts()
306 iterparts = unbundler.iterparts()
307 part = None
307 part = None
308 try:
308 try:
309 for part in iterparts:
309 for part in iterparts:
310 _processpart(op, part)
310 _processpart(op, part)
311 except Exception, exc:
311 except Exception, exc:
312 for part in iterparts:
312 for part in iterparts:
313 # consume the bundle content
313 # consume the bundle content
314 part.seek(0, 2)
314 part.seek(0, 2)
315 # Small hack to let caller code distinguish exceptions from bundle2
315 # Small hack to let caller code distinguish exceptions from bundle2
316 # processing from processing the old format. This is mostly
316 # processing from processing the old format. This is mostly
317 # needed to handle different return codes to unbundle according to the
317 # needed to handle different return codes to unbundle according to the
318 # type of bundle. We should probably clean up or drop this return code
318 # type of bundle. We should probably clean up or drop this return code
319 # craziness in a future version.
319 # craziness in a future version.
320 exc.duringunbundle2 = True
320 exc.duringunbundle2 = True
321 salvaged = []
322 if op.reply is not None:
323 salvaged = op.reply.salvageoutput()
324 exc._bundle2salvagedoutput = salvaged
321 raise
325 raise
322 return op
326 return op
323
327
324 def _processpart(op, part):
328 def _processpart(op, part):
325 """process a single part from a bundle
329 """process a single part from a bundle
326
330
327 The part is guaranteed to have been fully consumed when the function exits
331 The part is guaranteed to have been fully consumed when the function exits
328 (even if an exception is raised)."""
332 (even if an exception is raised)."""
329 try:
333 try:
330 try:
334 try:
331 handler = parthandlermapping.get(part.type)
335 handler = parthandlermapping.get(part.type)
332 if handler is None:
336 if handler is None:
333 raise error.UnsupportedPartError(parttype=part.type)
337 raise error.UnsupportedPartError(parttype=part.type)
334 op.ui.debug('found a handler for part %r\n' % part.type)
338 op.ui.debug('found a handler for part %r\n' % part.type)
335 unknownparams = part.mandatorykeys - handler.params
339 unknownparams = part.mandatorykeys - handler.params
336 if unknownparams:
340 if unknownparams:
337 unknownparams = list(unknownparams)
341 unknownparams = list(unknownparams)
338 unknownparams.sort()
342 unknownparams.sort()
339 raise error.UnsupportedPartError(parttype=part.type,
343 raise error.UnsupportedPartError(parttype=part.type,
340 params=unknownparams)
344 params=unknownparams)
341 except error.UnsupportedPartError, exc:
345 except error.UnsupportedPartError, exc:
342 if part.mandatory: # mandatory parts
346 if part.mandatory: # mandatory parts
343 raise
347 raise
344 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
348 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
345 return # skip to part processing
349 return # skip to part processing
346
350
347 # handler is called outside the above try block so that we don't
351 # handler is called outside the above try block so that we don't
348 # risk catching KeyErrors from anything other than the
352 # risk catching KeyErrors from anything other than the
349 # parthandlermapping lookup (any KeyError raised by handler()
353 # parthandlermapping lookup (any KeyError raised by handler()
350 # itself represents a defect of a different variety).
354 # itself represents a defect of a different variety).
351 output = None
355 output = None
352 if op.reply is not None:
356 if op.reply is not None:
353 op.ui.pushbuffer(error=True)
357 op.ui.pushbuffer(error=True)
354 output = ''
358 output = ''
355 try:
359 try:
356 handler(op, part)
360 handler(op, part)
357 finally:
361 finally:
358 if output is not None:
362 if output is not None:
359 output = op.ui.popbuffer()
363 output = op.ui.popbuffer()
360 if output:
364 if output:
361 outpart = op.reply.newpart('output', data=output,
365 outpart = op.reply.newpart('output', data=output,
362 mandatory=False)
366 mandatory=False)
363 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
367 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
364 finally:
368 finally:
365 # consume the part content to not corrupt the stream.
369 # consume the part content to not corrupt the stream.
366 part.seek(0, 2)
370 part.seek(0, 2)
367
371
368
372
369 def decodecaps(blob):
373 def decodecaps(blob):
370 """decode a bundle2 caps bytes blob into a dictionary
374 """decode a bundle2 caps bytes blob into a dictionary
371
375
372 The blob is a list of capabilities (one per line)
376 The blob is a list of capabilities (one per line)
373 Capabilities may have values using a line of the form::
377 Capabilities may have values using a line of the form::
374
378
375 capability=value1,value2,value3
379 capability=value1,value2,value3
376
380
377 The values are always a list."""
381 The values are always a list."""
378 caps = {}
382 caps = {}
379 for line in blob.splitlines():
383 for line in blob.splitlines():
380 if not line:
384 if not line:
381 continue
385 continue
382 if '=' not in line:
386 if '=' not in line:
383 key, vals = line, ()
387 key, vals = line, ()
384 else:
388 else:
385 key, vals = line.split('=', 1)
389 key, vals = line.split('=', 1)
386 vals = vals.split(',')
390 vals = vals.split(',')
387 key = urllib.unquote(key)
391 key = urllib.unquote(key)
388 vals = [urllib.unquote(v) for v in vals]
392 vals = [urllib.unquote(v) for v in vals]
389 caps[key] = vals
393 caps[key] = vals
390 return caps
394 return caps
391
395
392 def encodecaps(caps):
396 def encodecaps(caps):
393 """encode a bundle2 caps dictionary into a bytes blob"""
397 """encode a bundle2 caps dictionary into a bytes blob"""
394 chunks = []
398 chunks = []
395 for ca in sorted(caps):
399 for ca in sorted(caps):
396 vals = caps[ca]
400 vals = caps[ca]
397 ca = urllib.quote(ca)
401 ca = urllib.quote(ca)
398 vals = [urllib.quote(v) for v in vals]
402 vals = [urllib.quote(v) for v in vals]
399 if vals:
403 if vals:
400 ca = "%s=%s" % (ca, ','.join(vals))
404 ca = "%s=%s" % (ca, ','.join(vals))
401 chunks.append(ca)
405 chunks.append(ca)
402 return '\n'.join(chunks)
406 return '\n'.join(chunks)
403
407
404 class bundle20(object):
408 class bundle20(object):
405 """represent an outgoing bundle2 container
409 """represent an outgoing bundle2 container
406
410
407 Use the `addparam` method to add stream level parameter. and `newpart` to
411 Use the `addparam` method to add stream level parameter. and `newpart` to
408 populate it. Then call `getchunks` to retrieve all the binary chunks of
412 populate it. Then call `getchunks` to retrieve all the binary chunks of
409 data that compose the bundle2 container."""
413 data that compose the bundle2 container."""
410
414
411 _magicstring = 'HG20'
415 _magicstring = 'HG20'
412
416
413 def __init__(self, ui, capabilities=()):
417 def __init__(self, ui, capabilities=()):
414 self.ui = ui
418 self.ui = ui
415 self._params = []
419 self._params = []
416 self._parts = []
420 self._parts = []
417 self.capabilities = dict(capabilities)
421 self.capabilities = dict(capabilities)
418
422
419 @property
423 @property
420 def nbparts(self):
424 def nbparts(self):
421 """total number of parts added to the bundler"""
425 """total number of parts added to the bundler"""
422 return len(self._parts)
426 return len(self._parts)
423
427
424 # methods used to defines the bundle2 content
428 # methods used to defines the bundle2 content
425 def addparam(self, name, value=None):
429 def addparam(self, name, value=None):
426 """add a stream level parameter"""
430 """add a stream level parameter"""
427 if not name:
431 if not name:
428 raise ValueError('empty parameter name')
432 raise ValueError('empty parameter name')
429 if name[0] not in string.letters:
433 if name[0] not in string.letters:
430 raise ValueError('non letter first character: %r' % name)
434 raise ValueError('non letter first character: %r' % name)
431 self._params.append((name, value))
435 self._params.append((name, value))
432
436
433 def addpart(self, part):
437 def addpart(self, part):
434 """add a new part to the bundle2 container
438 """add a new part to the bundle2 container
435
439
436 Parts contains the actual applicative payload."""
440 Parts contains the actual applicative payload."""
437 assert part.id is None
441 assert part.id is None
438 part.id = len(self._parts) # very cheap counter
442 part.id = len(self._parts) # very cheap counter
439 self._parts.append(part)
443 self._parts.append(part)
440
444
441 def newpart(self, typeid, *args, **kwargs):
445 def newpart(self, typeid, *args, **kwargs):
442 """create a new part and add it to the containers
446 """create a new part and add it to the containers
443
447
444 As the part is directly added to the containers. For now, this means
448 As the part is directly added to the containers. For now, this means
445 that any failure to properly initialize the part after calling
449 that any failure to properly initialize the part after calling
446 ``newpart`` should result in a failure of the whole bundling process.
450 ``newpart`` should result in a failure of the whole bundling process.
447
451
448 You can still fall back to manually create and add if you need better
452 You can still fall back to manually create and add if you need better
449 control."""
453 control."""
450 part = bundlepart(typeid, *args, **kwargs)
454 part = bundlepart(typeid, *args, **kwargs)
451 self.addpart(part)
455 self.addpart(part)
452 return part
456 return part
453
457
454 # methods used to generate the bundle2 stream
458 # methods used to generate the bundle2 stream
455 def getchunks(self):
459 def getchunks(self):
456 self.ui.debug('start emission of %s stream\n' % self._magicstring)
460 self.ui.debug('start emission of %s stream\n' % self._magicstring)
457 yield self._magicstring
461 yield self._magicstring
458 param = self._paramchunk()
462 param = self._paramchunk()
459 self.ui.debug('bundle parameter: %s\n' % param)
463 self.ui.debug('bundle parameter: %s\n' % param)
460 yield _pack(_fstreamparamsize, len(param))
464 yield _pack(_fstreamparamsize, len(param))
461 if param:
465 if param:
462 yield param
466 yield param
463
467
464 self.ui.debug('start of parts\n')
468 self.ui.debug('start of parts\n')
465 for part in self._parts:
469 for part in self._parts:
466 self.ui.debug('bundle part: "%s"\n' % part.type)
470 self.ui.debug('bundle part: "%s"\n' % part.type)
467 for chunk in part.getchunks():
471 for chunk in part.getchunks():
468 yield chunk
472 yield chunk
469 self.ui.debug('end of bundle\n')
473 self.ui.debug('end of bundle\n')
470 yield _pack(_fpartheadersize, 0)
474 yield _pack(_fpartheadersize, 0)
471
475
472 def _paramchunk(self):
476 def _paramchunk(self):
473 """return a encoded version of all stream parameters"""
477 """return a encoded version of all stream parameters"""
474 blocks = []
478 blocks = []
475 for par, value in self._params:
479 for par, value in self._params:
476 par = urllib.quote(par)
480 par = urllib.quote(par)
477 if value is not None:
481 if value is not None:
478 value = urllib.quote(value)
482 value = urllib.quote(value)
479 par = '%s=%s' % (par, value)
483 par = '%s=%s' % (par, value)
480 blocks.append(par)
484 blocks.append(par)
481 return ' '.join(blocks)
485 return ' '.join(blocks)
482
486
483 def salvageoutput(self):
487 def salvageoutput(self):
484 """return a list with a copy of all output parts in the bundle
488 """return a list with a copy of all output parts in the bundle
485
489
486 This is meant to be used during error handling to make sure we preserve
490 This is meant to be used during error handling to make sure we preserve
487 server output"""
491 server output"""
488 salvaged = []
492 salvaged = []
489 for part in self._parts:
493 for part in self._parts:
490 if part.type.startswith('output'):
494 if part.type.startswith('output'):
491 salvaged.append(part.copy())
495 salvaged.append(part.copy())
492 return salvaged
496 return salvaged
493
497
494
498
495 class unpackermixin(object):
499 class unpackermixin(object):
496 """A mixin to extract bytes and struct data from a stream"""
500 """A mixin to extract bytes and struct data from a stream"""
497
501
498 def __init__(self, fp):
502 def __init__(self, fp):
499 self._fp = fp
503 self._fp = fp
500 self._seekable = (util.safehasattr(fp, 'seek') and
504 self._seekable = (util.safehasattr(fp, 'seek') and
501 util.safehasattr(fp, 'tell'))
505 util.safehasattr(fp, 'tell'))
502
506
503 def _unpack(self, format):
507 def _unpack(self, format):
504 """unpack this struct format from the stream"""
508 """unpack this struct format from the stream"""
505 data = self._readexact(struct.calcsize(format))
509 data = self._readexact(struct.calcsize(format))
506 return _unpack(format, data)
510 return _unpack(format, data)
507
511
508 def _readexact(self, size):
512 def _readexact(self, size):
509 """read exactly <size> bytes from the stream"""
513 """read exactly <size> bytes from the stream"""
510 return changegroup.readexactly(self._fp, size)
514 return changegroup.readexactly(self._fp, size)
511
515
512 def seek(self, offset, whence=0):
516 def seek(self, offset, whence=0):
513 """move the underlying file pointer"""
517 """move the underlying file pointer"""
514 if self._seekable:
518 if self._seekable:
515 return self._fp.seek(offset, whence)
519 return self._fp.seek(offset, whence)
516 else:
520 else:
517 raise NotImplementedError(_('File pointer is not seekable'))
521 raise NotImplementedError(_('File pointer is not seekable'))
518
522
519 def tell(self):
523 def tell(self):
520 """return the file offset, or None if file is not seekable"""
524 """return the file offset, or None if file is not seekable"""
521 if self._seekable:
525 if self._seekable:
522 try:
526 try:
523 return self._fp.tell()
527 return self._fp.tell()
524 except IOError, e:
528 except IOError, e:
525 if e.errno == errno.ESPIPE:
529 if e.errno == errno.ESPIPE:
526 self._seekable = False
530 self._seekable = False
527 else:
531 else:
528 raise
532 raise
529 return None
533 return None
530
534
531 def close(self):
535 def close(self):
532 """close underlying file"""
536 """close underlying file"""
533 if util.safehasattr(self._fp, 'close'):
537 if util.safehasattr(self._fp, 'close'):
534 return self._fp.close()
538 return self._fp.close()
535
539
536 def getunbundler(ui, fp, header=None):
540 def getunbundler(ui, fp, header=None):
537 """return a valid unbundler object for a given header"""
541 """return a valid unbundler object for a given header"""
538 if header is None:
542 if header is None:
539 header = changegroup.readexactly(fp, 4)
543 header = changegroup.readexactly(fp, 4)
540 magic, version = header[0:2], header[2:4]
544 magic, version = header[0:2], header[2:4]
541 if magic != 'HG':
545 if magic != 'HG':
542 raise util.Abort(_('not a Mercurial bundle'))
546 raise util.Abort(_('not a Mercurial bundle'))
543 unbundlerclass = formatmap.get(version)
547 unbundlerclass = formatmap.get(version)
544 if unbundlerclass is None:
548 if unbundlerclass is None:
545 raise util.Abort(_('unknown bundle version %s') % version)
549 raise util.Abort(_('unknown bundle version %s') % version)
546 unbundler = unbundlerclass(ui, fp)
550 unbundler = unbundlerclass(ui, fp)
547 ui.debug('start processing of %s stream\n' % header)
551 ui.debug('start processing of %s stream\n' % header)
548 return unbundler
552 return unbundler
549
553
550 class unbundle20(unpackermixin):
554 class unbundle20(unpackermixin):
551 """interpret a bundle2 stream
555 """interpret a bundle2 stream
552
556
553 This class is fed with a binary stream and yields parts through its
557 This class is fed with a binary stream and yields parts through its
554 `iterparts` methods."""
558 `iterparts` methods."""
555
559
556 def __init__(self, ui, fp):
560 def __init__(self, ui, fp):
557 """If header is specified, we do not read it out of the stream."""
561 """If header is specified, we do not read it out of the stream."""
558 self.ui = ui
562 self.ui = ui
559 super(unbundle20, self).__init__(fp)
563 super(unbundle20, self).__init__(fp)
560
564
561 @util.propertycache
565 @util.propertycache
562 def params(self):
566 def params(self):
563 """dictionary of stream level parameters"""
567 """dictionary of stream level parameters"""
564 self.ui.debug('reading bundle2 stream parameters\n')
568 self.ui.debug('reading bundle2 stream parameters\n')
565 params = {}
569 params = {}
566 paramssize = self._unpack(_fstreamparamsize)[0]
570 paramssize = self._unpack(_fstreamparamsize)[0]
567 if paramssize < 0:
571 if paramssize < 0:
568 raise error.BundleValueError('negative bundle param size: %i'
572 raise error.BundleValueError('negative bundle param size: %i'
569 % paramssize)
573 % paramssize)
570 if paramssize:
574 if paramssize:
571 for p in self._readexact(paramssize).split(' '):
575 for p in self._readexact(paramssize).split(' '):
572 p = p.split('=', 1)
576 p = p.split('=', 1)
573 p = [urllib.unquote(i) for i in p]
577 p = [urllib.unquote(i) for i in p]
574 if len(p) < 2:
578 if len(p) < 2:
575 p.append(None)
579 p.append(None)
576 self._processparam(*p)
580 self._processparam(*p)
577 params[p[0]] = p[1]
581 params[p[0]] = p[1]
578 return params
582 return params
579
583
580 def _processparam(self, name, value):
584 def _processparam(self, name, value):
581 """process a parameter, applying its effect if needed
585 """process a parameter, applying its effect if needed
582
586
583 Parameter starting with a lower case letter are advisory and will be
587 Parameter starting with a lower case letter are advisory and will be
584 ignored when unknown. Those starting with an upper case letter are
588 ignored when unknown. Those starting with an upper case letter are
585 mandatory and will this function will raise a KeyError when unknown.
589 mandatory and will this function will raise a KeyError when unknown.
586
590
587 Note: no option are currently supported. Any input will be either
591 Note: no option are currently supported. Any input will be either
588 ignored or failing.
592 ignored or failing.
589 """
593 """
590 if not name:
594 if not name:
591 raise ValueError('empty parameter name')
595 raise ValueError('empty parameter name')
592 if name[0] not in string.letters:
596 if name[0] not in string.letters:
593 raise ValueError('non letter first character: %r' % name)
597 raise ValueError('non letter first character: %r' % name)
594 # Some logic will be later added here to try to process the option for
598 # Some logic will be later added here to try to process the option for
595 # a dict of known parameter.
599 # a dict of known parameter.
596 if name[0].islower():
600 if name[0].islower():
597 self.ui.debug("ignoring unknown parameter %r\n" % name)
601 self.ui.debug("ignoring unknown parameter %r\n" % name)
598 else:
602 else:
599 raise error.UnsupportedPartError(params=(name,))
603 raise error.UnsupportedPartError(params=(name,))
600
604
601
605
602 def iterparts(self):
606 def iterparts(self):
603 """yield all parts contained in the stream"""
607 """yield all parts contained in the stream"""
604 # make sure param have been loaded
608 # make sure param have been loaded
605 self.params
609 self.params
606 self.ui.debug('start extraction of bundle2 parts\n')
610 self.ui.debug('start extraction of bundle2 parts\n')
607 headerblock = self._readpartheader()
611 headerblock = self._readpartheader()
608 while headerblock is not None:
612 while headerblock is not None:
609 part = unbundlepart(self.ui, headerblock, self._fp)
613 part = unbundlepart(self.ui, headerblock, self._fp)
610 yield part
614 yield part
611 part.seek(0, 2)
615 part.seek(0, 2)
612 headerblock = self._readpartheader()
616 headerblock = self._readpartheader()
613 self.ui.debug('end of bundle2 stream\n')
617 self.ui.debug('end of bundle2 stream\n')
614
618
615 def _readpartheader(self):
619 def _readpartheader(self):
616 """reads a part header size and return the bytes blob
620 """reads a part header size and return the bytes blob
617
621
618 returns None if empty"""
622 returns None if empty"""
619 headersize = self._unpack(_fpartheadersize)[0]
623 headersize = self._unpack(_fpartheadersize)[0]
620 if headersize < 0:
624 if headersize < 0:
621 raise error.BundleValueError('negative part header size: %i'
625 raise error.BundleValueError('negative part header size: %i'
622 % headersize)
626 % headersize)
623 self.ui.debug('part header size: %i\n' % headersize)
627 self.ui.debug('part header size: %i\n' % headersize)
624 if headersize:
628 if headersize:
625 return self._readexact(headersize)
629 return self._readexact(headersize)
626 return None
630 return None
627
631
628 def compressed(self):
632 def compressed(self):
629 return False
633 return False
630
634
631 formatmap = {'20': unbundle20}
635 formatmap = {'20': unbundle20}
632
636
633 class bundlepart(object):
637 class bundlepart(object):
634 """A bundle2 part contains application level payload
638 """A bundle2 part contains application level payload
635
639
636 The part `type` is used to route the part to the application level
640 The part `type` is used to route the part to the application level
637 handler.
641 handler.
638
642
639 The part payload is contained in ``part.data``. It could be raw bytes or a
643 The part payload is contained in ``part.data``. It could be raw bytes or a
640 generator of byte chunks.
644 generator of byte chunks.
641
645
642 You can add parameters to the part using the ``addparam`` method.
646 You can add parameters to the part using the ``addparam`` method.
643 Parameters can be either mandatory (default) or advisory. Remote side
647 Parameters can be either mandatory (default) or advisory. Remote side
644 should be able to safely ignore the advisory ones.
648 should be able to safely ignore the advisory ones.
645
649
646 Both data and parameters cannot be modified after the generation has begun.
650 Both data and parameters cannot be modified after the generation has begun.
647 """
651 """
648
652
649 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
653 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
650 data='', mandatory=True):
654 data='', mandatory=True):
651 validateparttype(parttype)
655 validateparttype(parttype)
652 self.id = None
656 self.id = None
653 self.type = parttype
657 self.type = parttype
654 self._data = data
658 self._data = data
655 self._mandatoryparams = list(mandatoryparams)
659 self._mandatoryparams = list(mandatoryparams)
656 self._advisoryparams = list(advisoryparams)
660 self._advisoryparams = list(advisoryparams)
657 # checking for duplicated entries
661 # checking for duplicated entries
658 self._seenparams = set()
662 self._seenparams = set()
659 for pname, __ in self._mandatoryparams + self._advisoryparams:
663 for pname, __ in self._mandatoryparams + self._advisoryparams:
660 if pname in self._seenparams:
664 if pname in self._seenparams:
661 raise RuntimeError('duplicated params: %s' % pname)
665 raise RuntimeError('duplicated params: %s' % pname)
662 self._seenparams.add(pname)
666 self._seenparams.add(pname)
663 # status of the part's generation:
667 # status of the part's generation:
664 # - None: not started,
668 # - None: not started,
665 # - False: currently generated,
669 # - False: currently generated,
666 # - True: generation done.
670 # - True: generation done.
667 self._generated = None
671 self._generated = None
668 self.mandatory = mandatory
672 self.mandatory = mandatory
669
673
670 def copy(self):
674 def copy(self):
671 """return a copy of the part
675 """return a copy of the part
672
676
673 The new part have the very same content but no partid assigned yet.
677 The new part have the very same content but no partid assigned yet.
674 Parts with generated data cannot be copied."""
678 Parts with generated data cannot be copied."""
675 assert not util.safehasattr(self.data, 'next')
679 assert not util.safehasattr(self.data, 'next')
676 return self.__class__(self.type, self._mandatoryparams,
680 return self.__class__(self.type, self._mandatoryparams,
677 self._advisoryparams, self._data, self.mandatory)
681 self._advisoryparams, self._data, self.mandatory)
678
682
679 # methods used to defines the part content
683 # methods used to defines the part content
680 def __setdata(self, data):
684 def __setdata(self, data):
681 if self._generated is not None:
685 if self._generated is not None:
682 raise error.ReadOnlyPartError('part is being generated')
686 raise error.ReadOnlyPartError('part is being generated')
683 self._data = data
687 self._data = data
684 def __getdata(self):
688 def __getdata(self):
685 return self._data
689 return self._data
686 data = property(__getdata, __setdata)
690 data = property(__getdata, __setdata)
687
691
688 @property
692 @property
689 def mandatoryparams(self):
693 def mandatoryparams(self):
690 # make it an immutable tuple to force people through ``addparam``
694 # make it an immutable tuple to force people through ``addparam``
691 return tuple(self._mandatoryparams)
695 return tuple(self._mandatoryparams)
692
696
693 @property
697 @property
694 def advisoryparams(self):
698 def advisoryparams(self):
695 # make it an immutable tuple to force people through ``addparam``
699 # make it an immutable tuple to force people through ``addparam``
696 return tuple(self._advisoryparams)
700 return tuple(self._advisoryparams)
697
701
698 def addparam(self, name, value='', mandatory=True):
702 def addparam(self, name, value='', mandatory=True):
699 if self._generated is not None:
703 if self._generated is not None:
700 raise error.ReadOnlyPartError('part is being generated')
704 raise error.ReadOnlyPartError('part is being generated')
701 if name in self._seenparams:
705 if name in self._seenparams:
702 raise ValueError('duplicated params: %s' % name)
706 raise ValueError('duplicated params: %s' % name)
703 self._seenparams.add(name)
707 self._seenparams.add(name)
704 params = self._advisoryparams
708 params = self._advisoryparams
705 if mandatory:
709 if mandatory:
706 params = self._mandatoryparams
710 params = self._mandatoryparams
707 params.append((name, value))
711 params.append((name, value))
708
712
709 # methods used to generates the bundle2 stream
713 # methods used to generates the bundle2 stream
710 def getchunks(self):
714 def getchunks(self):
711 if self._generated is not None:
715 if self._generated is not None:
712 raise RuntimeError('part can only be consumed once')
716 raise RuntimeError('part can only be consumed once')
713 self._generated = False
717 self._generated = False
714 #### header
718 #### header
715 if self.mandatory:
719 if self.mandatory:
716 parttype = self.type.upper()
720 parttype = self.type.upper()
717 else:
721 else:
718 parttype = self.type.lower()
722 parttype = self.type.lower()
719 ## parttype
723 ## parttype
720 header = [_pack(_fparttypesize, len(parttype)),
724 header = [_pack(_fparttypesize, len(parttype)),
721 parttype, _pack(_fpartid, self.id),
725 parttype, _pack(_fpartid, self.id),
722 ]
726 ]
723 ## parameters
727 ## parameters
724 # count
728 # count
725 manpar = self.mandatoryparams
729 manpar = self.mandatoryparams
726 advpar = self.advisoryparams
730 advpar = self.advisoryparams
727 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
731 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
728 # size
732 # size
729 parsizes = []
733 parsizes = []
730 for key, value in manpar:
734 for key, value in manpar:
731 parsizes.append(len(key))
735 parsizes.append(len(key))
732 parsizes.append(len(value))
736 parsizes.append(len(value))
733 for key, value in advpar:
737 for key, value in advpar:
734 parsizes.append(len(key))
738 parsizes.append(len(key))
735 parsizes.append(len(value))
739 parsizes.append(len(value))
736 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
740 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
737 header.append(paramsizes)
741 header.append(paramsizes)
738 # key, value
742 # key, value
739 for key, value in manpar:
743 for key, value in manpar:
740 header.append(key)
744 header.append(key)
741 header.append(value)
745 header.append(value)
742 for key, value in advpar:
746 for key, value in advpar:
743 header.append(key)
747 header.append(key)
744 header.append(value)
748 header.append(value)
745 ## finalize header
749 ## finalize header
746 headerchunk = ''.join(header)
750 headerchunk = ''.join(header)
747 yield _pack(_fpartheadersize, len(headerchunk))
751 yield _pack(_fpartheadersize, len(headerchunk))
748 yield headerchunk
752 yield headerchunk
749 ## payload
753 ## payload
750 try:
754 try:
751 for chunk in self._payloadchunks():
755 for chunk in self._payloadchunks():
752 yield _pack(_fpayloadsize, len(chunk))
756 yield _pack(_fpayloadsize, len(chunk))
753 yield chunk
757 yield chunk
754 except Exception, exc:
758 except Exception, exc:
755 # backup exception data for later
759 # backup exception data for later
756 exc_info = sys.exc_info()
760 exc_info = sys.exc_info()
757 msg = 'unexpected error: %s' % exc
761 msg = 'unexpected error: %s' % exc
758 interpart = bundlepart('error:abort', [('message', msg)],
762 interpart = bundlepart('error:abort', [('message', msg)],
759 mandatory=False)
763 mandatory=False)
760 interpart.id = 0
764 interpart.id = 0
761 yield _pack(_fpayloadsize, -1)
765 yield _pack(_fpayloadsize, -1)
762 for chunk in interpart.getchunks():
766 for chunk in interpart.getchunks():
763 yield chunk
767 yield chunk
764 # abort current part payload
768 # abort current part payload
765 yield _pack(_fpayloadsize, 0)
769 yield _pack(_fpayloadsize, 0)
766 raise exc_info[0], exc_info[1], exc_info[2]
770 raise exc_info[0], exc_info[1], exc_info[2]
767 # end of payload
771 # end of payload
768 yield _pack(_fpayloadsize, 0)
772 yield _pack(_fpayloadsize, 0)
769 self._generated = True
773 self._generated = True
770
774
771 def _payloadchunks(self):
775 def _payloadchunks(self):
772 """yield chunks of a the part payload
776 """yield chunks of a the part payload
773
777
774 Exists to handle the different methods to provide data to a part."""
778 Exists to handle the different methods to provide data to a part."""
775 # we only support fixed size data now.
779 # we only support fixed size data now.
776 # This will be improved in the future.
780 # This will be improved in the future.
777 if util.safehasattr(self.data, 'next'):
781 if util.safehasattr(self.data, 'next'):
778 buff = util.chunkbuffer(self.data)
782 buff = util.chunkbuffer(self.data)
779 chunk = buff.read(preferedchunksize)
783 chunk = buff.read(preferedchunksize)
780 while chunk:
784 while chunk:
781 yield chunk
785 yield chunk
782 chunk = buff.read(preferedchunksize)
786 chunk = buff.read(preferedchunksize)
783 elif len(self.data):
787 elif len(self.data):
784 yield self.data
788 yield self.data
785
789
786
790
787 flaginterrupt = -1
791 flaginterrupt = -1
788
792
789 class interrupthandler(unpackermixin):
793 class interrupthandler(unpackermixin):
790 """read one part and process it with restricted capability
794 """read one part and process it with restricted capability
791
795
792 This allows to transmit exception raised on the producer size during part
796 This allows to transmit exception raised on the producer size during part
793 iteration while the consumer is reading a part.
797 iteration while the consumer is reading a part.
794
798
795 Part processed in this manner only have access to a ui object,"""
799 Part processed in this manner only have access to a ui object,"""
796
800
797 def __init__(self, ui, fp):
801 def __init__(self, ui, fp):
798 super(interrupthandler, self).__init__(fp)
802 super(interrupthandler, self).__init__(fp)
799 self.ui = ui
803 self.ui = ui
800
804
801 def _readpartheader(self):
805 def _readpartheader(self):
802 """reads a part header size and return the bytes blob
806 """reads a part header size and return the bytes blob
803
807
804 returns None if empty"""
808 returns None if empty"""
805 headersize = self._unpack(_fpartheadersize)[0]
809 headersize = self._unpack(_fpartheadersize)[0]
806 if headersize < 0:
810 if headersize < 0:
807 raise error.BundleValueError('negative part header size: %i'
811 raise error.BundleValueError('negative part header size: %i'
808 % headersize)
812 % headersize)
809 self.ui.debug('part header size: %i\n' % headersize)
813 self.ui.debug('part header size: %i\n' % headersize)
810 if headersize:
814 if headersize:
811 return self._readexact(headersize)
815 return self._readexact(headersize)
812 return None
816 return None
813
817
814 def __call__(self):
818 def __call__(self):
815 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
819 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
816 headerblock = self._readpartheader()
820 headerblock = self._readpartheader()
817 if headerblock is None:
821 if headerblock is None:
818 self.ui.debug('no part found during interruption.\n')
822 self.ui.debug('no part found during interruption.\n')
819 return
823 return
820 part = unbundlepart(self.ui, headerblock, self._fp)
824 part = unbundlepart(self.ui, headerblock, self._fp)
821 op = interruptoperation(self.ui)
825 op = interruptoperation(self.ui)
822 _processpart(op, part)
826 _processpart(op, part)
823
827
824 class interruptoperation(object):
828 class interruptoperation(object):
825 """A limited operation to be use by part handler during interruption
829 """A limited operation to be use by part handler during interruption
826
830
827 It only have access to an ui object.
831 It only have access to an ui object.
828 """
832 """
829
833
830 def __init__(self, ui):
834 def __init__(self, ui):
831 self.ui = ui
835 self.ui = ui
832 self.reply = None
836 self.reply = None
833
837
834 @property
838 @property
835 def repo(self):
839 def repo(self):
836 raise RuntimeError('no repo access from stream interruption')
840 raise RuntimeError('no repo access from stream interruption')
837
841
838 def gettransaction(self):
842 def gettransaction(self):
839 raise TransactionUnavailable('no repo access from stream interruption')
843 raise TransactionUnavailable('no repo access from stream interruption')
840
844
841 class unbundlepart(unpackermixin):
845 class unbundlepart(unpackermixin):
842 """a bundle part read from a bundle"""
846 """a bundle part read from a bundle"""
843
847
844 def __init__(self, ui, header, fp):
848 def __init__(self, ui, header, fp):
845 super(unbundlepart, self).__init__(fp)
849 super(unbundlepart, self).__init__(fp)
846 self.ui = ui
850 self.ui = ui
847 # unbundle state attr
851 # unbundle state attr
848 self._headerdata = header
852 self._headerdata = header
849 self._headeroffset = 0
853 self._headeroffset = 0
850 self._initialized = False
854 self._initialized = False
851 self.consumed = False
855 self.consumed = False
852 # part data
856 # part data
853 self.id = None
857 self.id = None
854 self.type = None
858 self.type = None
855 self.mandatoryparams = None
859 self.mandatoryparams = None
856 self.advisoryparams = None
860 self.advisoryparams = None
857 self.params = None
861 self.params = None
858 self.mandatorykeys = ()
862 self.mandatorykeys = ()
859 self._payloadstream = None
863 self._payloadstream = None
860 self._readheader()
864 self._readheader()
861 self._mandatory = None
865 self._mandatory = None
862 self._chunkindex = [] #(payload, file) position tuples for chunk starts
866 self._chunkindex = [] #(payload, file) position tuples for chunk starts
863 self._pos = 0
867 self._pos = 0
864
868
865 def _fromheader(self, size):
869 def _fromheader(self, size):
866 """return the next <size> byte from the header"""
870 """return the next <size> byte from the header"""
867 offset = self._headeroffset
871 offset = self._headeroffset
868 data = self._headerdata[offset:(offset + size)]
872 data = self._headerdata[offset:(offset + size)]
869 self._headeroffset = offset + size
873 self._headeroffset = offset + size
870 return data
874 return data
871
875
872 def _unpackheader(self, format):
876 def _unpackheader(self, format):
873 """read given format from header
877 """read given format from header
874
878
875 This automatically compute the size of the format to read."""
879 This automatically compute the size of the format to read."""
876 data = self._fromheader(struct.calcsize(format))
880 data = self._fromheader(struct.calcsize(format))
877 return _unpack(format, data)
881 return _unpack(format, data)
878
882
879 def _initparams(self, mandatoryparams, advisoryparams):
883 def _initparams(self, mandatoryparams, advisoryparams):
880 """internal function to setup all logic related parameters"""
884 """internal function to setup all logic related parameters"""
881 # make it read only to prevent people touching it by mistake.
885 # make it read only to prevent people touching it by mistake.
882 self.mandatoryparams = tuple(mandatoryparams)
886 self.mandatoryparams = tuple(mandatoryparams)
883 self.advisoryparams = tuple(advisoryparams)
887 self.advisoryparams = tuple(advisoryparams)
884 # user friendly UI
888 # user friendly UI
885 self.params = dict(self.mandatoryparams)
889 self.params = dict(self.mandatoryparams)
886 self.params.update(dict(self.advisoryparams))
890 self.params.update(dict(self.advisoryparams))
887 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
891 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
888
892
889 def _payloadchunks(self, chunknum=0):
893 def _payloadchunks(self, chunknum=0):
890 '''seek to specified chunk and start yielding data'''
894 '''seek to specified chunk and start yielding data'''
891 if len(self._chunkindex) == 0:
895 if len(self._chunkindex) == 0:
892 assert chunknum == 0, 'Must start with chunk 0'
896 assert chunknum == 0, 'Must start with chunk 0'
893 self._chunkindex.append((0, super(unbundlepart, self).tell()))
897 self._chunkindex.append((0, super(unbundlepart, self).tell()))
894 else:
898 else:
895 assert chunknum < len(self._chunkindex), \
899 assert chunknum < len(self._chunkindex), \
896 'Unknown chunk %d' % chunknum
900 'Unknown chunk %d' % chunknum
897 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
901 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
898
902
899 pos = self._chunkindex[chunknum][0]
903 pos = self._chunkindex[chunknum][0]
900 payloadsize = self._unpack(_fpayloadsize)[0]
904 payloadsize = self._unpack(_fpayloadsize)[0]
901 self.ui.debug('payload chunk size: %i\n' % payloadsize)
905 self.ui.debug('payload chunk size: %i\n' % payloadsize)
902 while payloadsize:
906 while payloadsize:
903 if payloadsize == flaginterrupt:
907 if payloadsize == flaginterrupt:
904 # interruption detection, the handler will now read a
908 # interruption detection, the handler will now read a
905 # single part and process it.
909 # single part and process it.
906 interrupthandler(self.ui, self._fp)()
910 interrupthandler(self.ui, self._fp)()
907 elif payloadsize < 0:
911 elif payloadsize < 0:
908 msg = 'negative payload chunk size: %i' % payloadsize
912 msg = 'negative payload chunk size: %i' % payloadsize
909 raise error.BundleValueError(msg)
913 raise error.BundleValueError(msg)
910 else:
914 else:
911 result = self._readexact(payloadsize)
915 result = self._readexact(payloadsize)
912 chunknum += 1
916 chunknum += 1
913 pos += payloadsize
917 pos += payloadsize
914 if chunknum == len(self._chunkindex):
918 if chunknum == len(self._chunkindex):
915 self._chunkindex.append((pos,
919 self._chunkindex.append((pos,
916 super(unbundlepart, self).tell()))
920 super(unbundlepart, self).tell()))
917 yield result
921 yield result
918 payloadsize = self._unpack(_fpayloadsize)[0]
922 payloadsize = self._unpack(_fpayloadsize)[0]
919 self.ui.debug('payload chunk size: %i\n' % payloadsize)
923 self.ui.debug('payload chunk size: %i\n' % payloadsize)
920
924
921 def _findchunk(self, pos):
925 def _findchunk(self, pos):
922 '''for a given payload position, return a chunk number and offset'''
926 '''for a given payload position, return a chunk number and offset'''
923 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
927 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
924 if ppos == pos:
928 if ppos == pos:
925 return chunk, 0
929 return chunk, 0
926 elif ppos > pos:
930 elif ppos > pos:
927 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
931 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
928 raise ValueError('Unknown chunk')
932 raise ValueError('Unknown chunk')
929
933
930 def _readheader(self):
934 def _readheader(self):
931 """read the header and setup the object"""
935 """read the header and setup the object"""
932 typesize = self._unpackheader(_fparttypesize)[0]
936 typesize = self._unpackheader(_fparttypesize)[0]
933 self.type = self._fromheader(typesize)
937 self.type = self._fromheader(typesize)
934 self.ui.debug('part type: "%s"\n' % self.type)
938 self.ui.debug('part type: "%s"\n' % self.type)
935 self.id = self._unpackheader(_fpartid)[0]
939 self.id = self._unpackheader(_fpartid)[0]
936 self.ui.debug('part id: "%s"\n' % self.id)
940 self.ui.debug('part id: "%s"\n' % self.id)
937 # extract mandatory bit from type
941 # extract mandatory bit from type
938 self.mandatory = (self.type != self.type.lower())
942 self.mandatory = (self.type != self.type.lower())
939 self.type = self.type.lower()
943 self.type = self.type.lower()
940 ## reading parameters
944 ## reading parameters
941 # param count
945 # param count
942 mancount, advcount = self._unpackheader(_fpartparamcount)
946 mancount, advcount = self._unpackheader(_fpartparamcount)
943 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
947 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
944 # param size
948 # param size
945 fparamsizes = _makefpartparamsizes(mancount + advcount)
949 fparamsizes = _makefpartparamsizes(mancount + advcount)
946 paramsizes = self._unpackheader(fparamsizes)
950 paramsizes = self._unpackheader(fparamsizes)
947 # make it a list of couple again
951 # make it a list of couple again
948 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
952 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
949 # split mandatory from advisory
953 # split mandatory from advisory
950 mansizes = paramsizes[:mancount]
954 mansizes = paramsizes[:mancount]
951 advsizes = paramsizes[mancount:]
955 advsizes = paramsizes[mancount:]
952 # retrieve param value
956 # retrieve param value
953 manparams = []
957 manparams = []
954 for key, value in mansizes:
958 for key, value in mansizes:
955 manparams.append((self._fromheader(key), self._fromheader(value)))
959 manparams.append((self._fromheader(key), self._fromheader(value)))
956 advparams = []
960 advparams = []
957 for key, value in advsizes:
961 for key, value in advsizes:
958 advparams.append((self._fromheader(key), self._fromheader(value)))
962 advparams.append((self._fromheader(key), self._fromheader(value)))
959 self._initparams(manparams, advparams)
963 self._initparams(manparams, advparams)
960 ## part payload
964 ## part payload
961 self._payloadstream = util.chunkbuffer(self._payloadchunks())
965 self._payloadstream = util.chunkbuffer(self._payloadchunks())
962 # we read the data, tell it
966 # we read the data, tell it
963 self._initialized = True
967 self._initialized = True
964
968
965 def read(self, size=None):
969 def read(self, size=None):
966 """read payload data"""
970 """read payload data"""
967 if not self._initialized:
971 if not self._initialized:
968 self._readheader()
972 self._readheader()
969 if size is None:
973 if size is None:
970 data = self._payloadstream.read()
974 data = self._payloadstream.read()
971 else:
975 else:
972 data = self._payloadstream.read(size)
976 data = self._payloadstream.read(size)
973 if size is None or len(data) < size:
977 if size is None or len(data) < size:
974 self.consumed = True
978 self.consumed = True
975 self._pos += len(data)
979 self._pos += len(data)
976 return data
980 return data
977
981
978 def tell(self):
982 def tell(self):
979 return self._pos
983 return self._pos
980
984
981 def seek(self, offset, whence=0):
985 def seek(self, offset, whence=0):
982 if whence == 0:
986 if whence == 0:
983 newpos = offset
987 newpos = offset
984 elif whence == 1:
988 elif whence == 1:
985 newpos = self._pos + offset
989 newpos = self._pos + offset
986 elif whence == 2:
990 elif whence == 2:
987 if not self.consumed:
991 if not self.consumed:
988 self.read()
992 self.read()
989 newpos = self._chunkindex[-1][0] - offset
993 newpos = self._chunkindex[-1][0] - offset
990 else:
994 else:
991 raise ValueError('Unknown whence value: %r' % (whence,))
995 raise ValueError('Unknown whence value: %r' % (whence,))
992
996
993 if newpos > self._chunkindex[-1][0] and not self.consumed:
997 if newpos > self._chunkindex[-1][0] and not self.consumed:
994 self.read()
998 self.read()
995 if not 0 <= newpos <= self._chunkindex[-1][0]:
999 if not 0 <= newpos <= self._chunkindex[-1][0]:
996 raise ValueError('Offset out of range')
1000 raise ValueError('Offset out of range')
997
1001
998 if self._pos != newpos:
1002 if self._pos != newpos:
999 chunk, internaloffset = self._findchunk(newpos)
1003 chunk, internaloffset = self._findchunk(newpos)
1000 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1004 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1001 adjust = self.read(internaloffset)
1005 adjust = self.read(internaloffset)
1002 if len(adjust) != internaloffset:
1006 if len(adjust) != internaloffset:
1003 raise util.Abort(_('Seek failed\n'))
1007 raise util.Abort(_('Seek failed\n'))
1004 self._pos = newpos
1008 self._pos = newpos
1005
1009
1006 capabilities = {'HG20': (),
1010 capabilities = {'HG20': (),
1007 'listkeys': (),
1011 'listkeys': (),
1008 'pushkey': (),
1012 'pushkey': (),
1009 'digests': tuple(sorted(util.DIGESTS.keys())),
1013 'digests': tuple(sorted(util.DIGESTS.keys())),
1010 'remote-changegroup': ('http', 'https'),
1014 'remote-changegroup': ('http', 'https'),
1011 }
1015 }
1012
1016
1013 def getrepocaps(repo, allowpushback=False):
1017 def getrepocaps(repo, allowpushback=False):
1014 """return the bundle2 capabilities for a given repo
1018 """return the bundle2 capabilities for a given repo
1015
1019
1016 Exists to allow extensions (like evolution) to mutate the capabilities.
1020 Exists to allow extensions (like evolution) to mutate the capabilities.
1017 """
1021 """
1018 caps = capabilities.copy()
1022 caps = capabilities.copy()
1019 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1023 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1020 if obsolete.isenabled(repo, obsolete.exchangeopt):
1024 if obsolete.isenabled(repo, obsolete.exchangeopt):
1021 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1025 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1022 caps['obsmarkers'] = supportedformat
1026 caps['obsmarkers'] = supportedformat
1023 if allowpushback:
1027 if allowpushback:
1024 caps['pushback'] = ()
1028 caps['pushback'] = ()
1025 return caps
1029 return caps
1026
1030
1027 def bundle2caps(remote):
1031 def bundle2caps(remote):
1028 """return the bundle capabilities of a peer as dict"""
1032 """return the bundle capabilities of a peer as dict"""
1029 raw = remote.capable('bundle2')
1033 raw = remote.capable('bundle2')
1030 if not raw and raw != '':
1034 if not raw and raw != '':
1031 return {}
1035 return {}
1032 capsblob = urllib.unquote(remote.capable('bundle2'))
1036 capsblob = urllib.unquote(remote.capable('bundle2'))
1033 return decodecaps(capsblob)
1037 return decodecaps(capsblob)
1034
1038
1035 def obsmarkersversion(caps):
1039 def obsmarkersversion(caps):
1036 """extract the list of supported obsmarkers versions from a bundle2caps dict
1040 """extract the list of supported obsmarkers versions from a bundle2caps dict
1037 """
1041 """
1038 obscaps = caps.get('obsmarkers', ())
1042 obscaps = caps.get('obsmarkers', ())
1039 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1043 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1040
1044
1041 @parthandler('changegroup', ('version',))
1045 @parthandler('changegroup', ('version',))
1042 def handlechangegroup(op, inpart):
1046 def handlechangegroup(op, inpart):
1043 """apply a changegroup part on the repo
1047 """apply a changegroup part on the repo
1044
1048
1045 This is a very early implementation that will massive rework before being
1049 This is a very early implementation that will massive rework before being
1046 inflicted to any end-user.
1050 inflicted to any end-user.
1047 """
1051 """
1048 # Make sure we trigger a transaction creation
1052 # Make sure we trigger a transaction creation
1049 #
1053 #
1050 # The addchangegroup function will get a transaction object by itself, but
1054 # The addchangegroup function will get a transaction object by itself, but
1051 # we need to make sure we trigger the creation of a transaction object used
1055 # we need to make sure we trigger the creation of a transaction object used
1052 # for the whole processing scope.
1056 # for the whole processing scope.
1053 op.gettransaction()
1057 op.gettransaction()
1054 unpackerversion = inpart.params.get('version', '01')
1058 unpackerversion = inpart.params.get('version', '01')
1055 # We should raise an appropriate exception here
1059 # We should raise an appropriate exception here
1056 unpacker = changegroup.packermap[unpackerversion][1]
1060 unpacker = changegroup.packermap[unpackerversion][1]
1057 cg = unpacker(inpart, 'UN')
1061 cg = unpacker(inpart, 'UN')
1058 # the source and url passed here are overwritten by the one contained in
1062 # the source and url passed here are overwritten by the one contained in
1059 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1063 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1060 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1064 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1061 op.records.add('changegroup', {'return': ret})
1065 op.records.add('changegroup', {'return': ret})
1062 if op.reply is not None:
1066 if op.reply is not None:
1063 # This is definitely not the final form of this
1067 # This is definitely not the final form of this
1064 # return. But one need to start somewhere.
1068 # return. But one need to start somewhere.
1065 part = op.reply.newpart('reply:changegroup', mandatory=False)
1069 part = op.reply.newpart('reply:changegroup', mandatory=False)
1066 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1070 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1067 part.addparam('return', '%i' % ret, mandatory=False)
1071 part.addparam('return', '%i' % ret, mandatory=False)
1068 assert not inpart.read()
1072 assert not inpart.read()
1069
1073
1070 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1074 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1071 ['digest:%s' % k for k in util.DIGESTS.keys()])
1075 ['digest:%s' % k for k in util.DIGESTS.keys()])
1072 @parthandler('remote-changegroup', _remotechangegroupparams)
1076 @parthandler('remote-changegroup', _remotechangegroupparams)
1073 def handleremotechangegroup(op, inpart):
1077 def handleremotechangegroup(op, inpart):
1074 """apply a bundle10 on the repo, given an url and validation information
1078 """apply a bundle10 on the repo, given an url and validation information
1075
1079
1076 All the information about the remote bundle to import are given as
1080 All the information about the remote bundle to import are given as
1077 parameters. The parameters include:
1081 parameters. The parameters include:
1078 - url: the url to the bundle10.
1082 - url: the url to the bundle10.
1079 - size: the bundle10 file size. It is used to validate what was
1083 - size: the bundle10 file size. It is used to validate what was
1080 retrieved by the client matches the server knowledge about the bundle.
1084 retrieved by the client matches the server knowledge about the bundle.
1081 - digests: a space separated list of the digest types provided as
1085 - digests: a space separated list of the digest types provided as
1082 parameters.
1086 parameters.
1083 - digest:<digest-type>: the hexadecimal representation of the digest with
1087 - digest:<digest-type>: the hexadecimal representation of the digest with
1084 that name. Like the size, it is used to validate what was retrieved by
1088 that name. Like the size, it is used to validate what was retrieved by
1085 the client matches what the server knows about the bundle.
1089 the client matches what the server knows about the bundle.
1086
1090
1087 When multiple digest types are given, all of them are checked.
1091 When multiple digest types are given, all of them are checked.
1088 """
1092 """
1089 try:
1093 try:
1090 raw_url = inpart.params['url']
1094 raw_url = inpart.params['url']
1091 except KeyError:
1095 except KeyError:
1092 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1096 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1093 parsed_url = util.url(raw_url)
1097 parsed_url = util.url(raw_url)
1094 if parsed_url.scheme not in capabilities['remote-changegroup']:
1098 if parsed_url.scheme not in capabilities['remote-changegroup']:
1095 raise util.Abort(_('remote-changegroup does not support %s urls') %
1099 raise util.Abort(_('remote-changegroup does not support %s urls') %
1096 parsed_url.scheme)
1100 parsed_url.scheme)
1097
1101
1098 try:
1102 try:
1099 size = int(inpart.params['size'])
1103 size = int(inpart.params['size'])
1100 except ValueError:
1104 except ValueError:
1101 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1105 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1102 % 'size')
1106 % 'size')
1103 except KeyError:
1107 except KeyError:
1104 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1108 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1105
1109
1106 digests = {}
1110 digests = {}
1107 for typ in inpart.params.get('digests', '').split():
1111 for typ in inpart.params.get('digests', '').split():
1108 param = 'digest:%s' % typ
1112 param = 'digest:%s' % typ
1109 try:
1113 try:
1110 value = inpart.params[param]
1114 value = inpart.params[param]
1111 except KeyError:
1115 except KeyError:
1112 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1116 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1113 param)
1117 param)
1114 digests[typ] = value
1118 digests[typ] = value
1115
1119
1116 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1120 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1117
1121
1118 # Make sure we trigger a transaction creation
1122 # Make sure we trigger a transaction creation
1119 #
1123 #
1120 # The addchangegroup function will get a transaction object by itself, but
1124 # The addchangegroup function will get a transaction object by itself, but
1121 # we need to make sure we trigger the creation of a transaction object used
1125 # we need to make sure we trigger the creation of a transaction object used
1122 # for the whole processing scope.
1126 # for the whole processing scope.
1123 op.gettransaction()
1127 op.gettransaction()
1124 import exchange
1128 import exchange
1125 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1129 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1126 if not isinstance(cg, changegroup.cg1unpacker):
1130 if not isinstance(cg, changegroup.cg1unpacker):
1127 raise util.Abort(_('%s: not a bundle version 1.0') %
1131 raise util.Abort(_('%s: not a bundle version 1.0') %
1128 util.hidepassword(raw_url))
1132 util.hidepassword(raw_url))
1129 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1133 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1130 op.records.add('changegroup', {'return': ret})
1134 op.records.add('changegroup', {'return': ret})
1131 if op.reply is not None:
1135 if op.reply is not None:
1132 # This is definitely not the final form of this
1136 # This is definitely not the final form of this
1133 # return. But one need to start somewhere.
1137 # return. But one need to start somewhere.
1134 part = op.reply.newpart('reply:changegroup')
1138 part = op.reply.newpart('reply:changegroup')
1135 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1139 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1136 part.addparam('return', '%i' % ret, mandatory=False)
1140 part.addparam('return', '%i' % ret, mandatory=False)
1137 try:
1141 try:
1138 real_part.validate()
1142 real_part.validate()
1139 except util.Abort, e:
1143 except util.Abort, e:
1140 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1144 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1141 (util.hidepassword(raw_url), str(e)))
1145 (util.hidepassword(raw_url), str(e)))
1142 assert not inpart.read()
1146 assert not inpart.read()
1143
1147
1144 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1148 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1145 def handlereplychangegroup(op, inpart):
1149 def handlereplychangegroup(op, inpart):
1146 ret = int(inpart.params['return'])
1150 ret = int(inpart.params['return'])
1147 replyto = int(inpart.params['in-reply-to'])
1151 replyto = int(inpart.params['in-reply-to'])
1148 op.records.add('changegroup', {'return': ret}, replyto)
1152 op.records.add('changegroup', {'return': ret}, replyto)
1149
1153
1150 @parthandler('check:heads')
1154 @parthandler('check:heads')
1151 def handlecheckheads(op, inpart):
1155 def handlecheckheads(op, inpart):
1152 """check that head of the repo did not change
1156 """check that head of the repo did not change
1153
1157
1154 This is used to detect a push race when using unbundle.
1158 This is used to detect a push race when using unbundle.
1155 This replaces the "heads" argument of unbundle."""
1159 This replaces the "heads" argument of unbundle."""
1156 h = inpart.read(20)
1160 h = inpart.read(20)
1157 heads = []
1161 heads = []
1158 while len(h) == 20:
1162 while len(h) == 20:
1159 heads.append(h)
1163 heads.append(h)
1160 h = inpart.read(20)
1164 h = inpart.read(20)
1161 assert not h
1165 assert not h
1162 if heads != op.repo.heads():
1166 if heads != op.repo.heads():
1163 raise error.PushRaced('repository changed while pushing - '
1167 raise error.PushRaced('repository changed while pushing - '
1164 'please try again')
1168 'please try again')
1165
1169
1166 @parthandler('output')
1170 @parthandler('output')
1167 def handleoutput(op, inpart):
1171 def handleoutput(op, inpart):
1168 """forward output captured on the server to the client"""
1172 """forward output captured on the server to the client"""
1169 for line in inpart.read().splitlines():
1173 for line in inpart.read().splitlines():
1170 op.ui.write(('remote: %s\n' % line))
1174 op.ui.write(('remote: %s\n' % line))
1171
1175
1172 @parthandler('replycaps')
1176 @parthandler('replycaps')
1173 def handlereplycaps(op, inpart):
1177 def handlereplycaps(op, inpart):
1174 """Notify that a reply bundle should be created
1178 """Notify that a reply bundle should be created
1175
1179
1176 The payload contains the capabilities information for the reply"""
1180 The payload contains the capabilities information for the reply"""
1177 caps = decodecaps(inpart.read())
1181 caps = decodecaps(inpart.read())
1178 if op.reply is None:
1182 if op.reply is None:
1179 op.reply = bundle20(op.ui, caps)
1183 op.reply = bundle20(op.ui, caps)
1180
1184
1181 @parthandler('error:abort', ('message', 'hint'))
1185 @parthandler('error:abort', ('message', 'hint'))
1182 def handleerrorabort(op, inpart):
1186 def handleerrorabort(op, inpart):
1183 """Used to transmit abort error over the wire"""
1187 """Used to transmit abort error over the wire"""
1184 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1188 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1185
1189
1186 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1190 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1187 def handleerrorunsupportedcontent(op, inpart):
1191 def handleerrorunsupportedcontent(op, inpart):
1188 """Used to transmit unknown content error over the wire"""
1192 """Used to transmit unknown content error over the wire"""
1189 kwargs = {}
1193 kwargs = {}
1190 parttype = inpart.params.get('parttype')
1194 parttype = inpart.params.get('parttype')
1191 if parttype is not None:
1195 if parttype is not None:
1192 kwargs['parttype'] = parttype
1196 kwargs['parttype'] = parttype
1193 params = inpart.params.get('params')
1197 params = inpart.params.get('params')
1194 if params is not None:
1198 if params is not None:
1195 kwargs['params'] = params.split('\0')
1199 kwargs['params'] = params.split('\0')
1196
1200
1197 raise error.UnsupportedPartError(**kwargs)
1201 raise error.UnsupportedPartError(**kwargs)
1198
1202
1199 @parthandler('error:pushraced', ('message',))
1203 @parthandler('error:pushraced', ('message',))
1200 def handleerrorpushraced(op, inpart):
1204 def handleerrorpushraced(op, inpart):
1201 """Used to transmit push race error over the wire"""
1205 """Used to transmit push race error over the wire"""
1202 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1206 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1203
1207
1204 @parthandler('listkeys', ('namespace',))
1208 @parthandler('listkeys', ('namespace',))
1205 def handlelistkeys(op, inpart):
1209 def handlelistkeys(op, inpart):
1206 """retrieve pushkey namespace content stored in a bundle2"""
1210 """retrieve pushkey namespace content stored in a bundle2"""
1207 namespace = inpart.params['namespace']
1211 namespace = inpart.params['namespace']
1208 r = pushkey.decodekeys(inpart.read())
1212 r = pushkey.decodekeys(inpart.read())
1209 op.records.add('listkeys', (namespace, r))
1213 op.records.add('listkeys', (namespace, r))
1210
1214
1211 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1215 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1212 def handlepushkey(op, inpart):
1216 def handlepushkey(op, inpart):
1213 """process a pushkey request"""
1217 """process a pushkey request"""
1214 dec = pushkey.decode
1218 dec = pushkey.decode
1215 namespace = dec(inpart.params['namespace'])
1219 namespace = dec(inpart.params['namespace'])
1216 key = dec(inpart.params['key'])
1220 key = dec(inpart.params['key'])
1217 old = dec(inpart.params['old'])
1221 old = dec(inpart.params['old'])
1218 new = dec(inpart.params['new'])
1222 new = dec(inpart.params['new'])
1219 ret = op.repo.pushkey(namespace, key, old, new)
1223 ret = op.repo.pushkey(namespace, key, old, new)
1220 record = {'namespace': namespace,
1224 record = {'namespace': namespace,
1221 'key': key,
1225 'key': key,
1222 'old': old,
1226 'old': old,
1223 'new': new}
1227 'new': new}
1224 op.records.add('pushkey', record)
1228 op.records.add('pushkey', record)
1225 if op.reply is not None:
1229 if op.reply is not None:
1226 rpart = op.reply.newpart('reply:pushkey')
1230 rpart = op.reply.newpart('reply:pushkey')
1227 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1231 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1228 rpart.addparam('return', '%i' % ret, mandatory=False)
1232 rpart.addparam('return', '%i' % ret, mandatory=False)
1229
1233
1230 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1234 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1231 def handlepushkeyreply(op, inpart):
1235 def handlepushkeyreply(op, inpart):
1232 """retrieve the result of a pushkey request"""
1236 """retrieve the result of a pushkey request"""
1233 ret = int(inpart.params['return'])
1237 ret = int(inpart.params['return'])
1234 partid = int(inpart.params['in-reply-to'])
1238 partid = int(inpart.params['in-reply-to'])
1235 op.records.add('pushkey', {'return': ret}, partid)
1239 op.records.add('pushkey', {'return': ret}, partid)
1236
1240
1237 @parthandler('obsmarkers')
1241 @parthandler('obsmarkers')
1238 def handleobsmarker(op, inpart):
1242 def handleobsmarker(op, inpart):
1239 """add a stream of obsmarkers to the repo"""
1243 """add a stream of obsmarkers to the repo"""
1240 tr = op.gettransaction()
1244 tr = op.gettransaction()
1241 markerdata = inpart.read()
1245 markerdata = inpart.read()
1242 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1246 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1243 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1247 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1244 % len(markerdata))
1248 % len(markerdata))
1245 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1249 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1246 if new:
1250 if new:
1247 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1251 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1248 op.records.add('obsmarkers', {'new': new})
1252 op.records.add('obsmarkers', {'new': new})
1249 if op.reply is not None:
1253 if op.reply is not None:
1250 rpart = op.reply.newpart('reply:obsmarkers')
1254 rpart = op.reply.newpart('reply:obsmarkers')
1251 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1255 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1252 rpart.addparam('new', '%i' % new, mandatory=False)
1256 rpart.addparam('new', '%i' % new, mandatory=False)
1253
1257
1254
1258
1255 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1259 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1256 def handlepushkeyreply(op, inpart):
1260 def handlepushkeyreply(op, inpart):
1257 """retrieve the result of a pushkey request"""
1261 """retrieve the result of a pushkey request"""
1258 ret = int(inpart.params['new'])
1262 ret = int(inpart.params['new'])
1259 partid = int(inpart.params['in-reply-to'])
1263 partid = int(inpart.params['in-reply-to'])
1260 op.records.add('obsmarkers', {'new': ret}, partid)
1264 op.records.add('obsmarkers', {'new': ret}, partid)
@@ -1,1308 +1,1311 b''
1 # exchange.py - utility to exchange data between repos.
1 # exchange.py - utility to exchange data between repos.
2 #
2 #
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from i18n import _
8 from i18n import _
9 from node import hex, nullid
9 from node import hex, nullid
10 import errno, urllib
10 import errno, urllib
11 import util, scmutil, changegroup, base85, error
11 import util, scmutil, changegroup, base85, error
12 import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey
12 import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey
13 import lock as lockmod
13 import lock as lockmod
14
14
15 def readbundle(ui, fh, fname, vfs=None):
15 def readbundle(ui, fh, fname, vfs=None):
16 header = changegroup.readexactly(fh, 4)
16 header = changegroup.readexactly(fh, 4)
17
17
18 alg = None
18 alg = None
19 if not fname:
19 if not fname:
20 fname = "stream"
20 fname = "stream"
21 if not header.startswith('HG') and header.startswith('\0'):
21 if not header.startswith('HG') and header.startswith('\0'):
22 fh = changegroup.headerlessfixup(fh, header)
22 fh = changegroup.headerlessfixup(fh, header)
23 header = "HG10"
23 header = "HG10"
24 alg = 'UN'
24 alg = 'UN'
25 elif vfs:
25 elif vfs:
26 fname = vfs.join(fname)
26 fname = vfs.join(fname)
27
27
28 magic, version = header[0:2], header[2:4]
28 magic, version = header[0:2], header[2:4]
29
29
30 if magic != 'HG':
30 if magic != 'HG':
31 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
31 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
32 if version == '10':
32 if version == '10':
33 if alg is None:
33 if alg is None:
34 alg = changegroup.readexactly(fh, 2)
34 alg = changegroup.readexactly(fh, 2)
35 return changegroup.cg1unpacker(fh, alg)
35 return changegroup.cg1unpacker(fh, alg)
36 elif version.startswith('2'):
36 elif version.startswith('2'):
37 return bundle2.getunbundler(ui, fh, header=magic + version)
37 return bundle2.getunbundler(ui, fh, header=magic + version)
38 else:
38 else:
39 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
39 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
40
40
41 def buildobsmarkerspart(bundler, markers):
41 def buildobsmarkerspart(bundler, markers):
42 """add an obsmarker part to the bundler with <markers>
42 """add an obsmarker part to the bundler with <markers>
43
43
44 No part is created if markers is empty.
44 No part is created if markers is empty.
45 Raises ValueError if the bundler doesn't support any known obsmarker format.
45 Raises ValueError if the bundler doesn't support any known obsmarker format.
46 """
46 """
47 if markers:
47 if markers:
48 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
48 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
49 version = obsolete.commonversion(remoteversions)
49 version = obsolete.commonversion(remoteversions)
50 if version is None:
50 if version is None:
51 raise ValueError('bundler do not support common obsmarker format')
51 raise ValueError('bundler do not support common obsmarker format')
52 stream = obsolete.encodemarkers(markers, True, version=version)
52 stream = obsolete.encodemarkers(markers, True, version=version)
53 return bundler.newpart('obsmarkers', data=stream)
53 return bundler.newpart('obsmarkers', data=stream)
54 return None
54 return None
55
55
56 def _canusebundle2(op):
56 def _canusebundle2(op):
57 """return true if a pull/push can use bundle2
57 """return true if a pull/push can use bundle2
58
58
59 Feel free to nuke this function when we drop the experimental option"""
59 Feel free to nuke this function when we drop the experimental option"""
60 return (op.repo.ui.configbool('experimental', 'bundle2-exp', False)
60 return (op.repo.ui.configbool('experimental', 'bundle2-exp', False)
61 and op.remote.capable('bundle2'))
61 and op.remote.capable('bundle2'))
62
62
63
63
64 class pushoperation(object):
64 class pushoperation(object):
65 """A object that represent a single push operation
65 """A object that represent a single push operation
66
66
67 It purpose is to carry push related state and very common operation.
67 It purpose is to carry push related state and very common operation.
68
68
69 A new should be created at the beginning of each push and discarded
69 A new should be created at the beginning of each push and discarded
70 afterward.
70 afterward.
71 """
71 """
72
72
73 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
73 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
74 bookmarks=()):
74 bookmarks=()):
75 # repo we push from
75 # repo we push from
76 self.repo = repo
76 self.repo = repo
77 self.ui = repo.ui
77 self.ui = repo.ui
78 # repo we push to
78 # repo we push to
79 self.remote = remote
79 self.remote = remote
80 # force option provided
80 # force option provided
81 self.force = force
81 self.force = force
82 # revs to be pushed (None is "all")
82 # revs to be pushed (None is "all")
83 self.revs = revs
83 self.revs = revs
84 # bookmark explicitly pushed
84 # bookmark explicitly pushed
85 self.bookmarks = bookmarks
85 self.bookmarks = bookmarks
86 # allow push of new branch
86 # allow push of new branch
87 self.newbranch = newbranch
87 self.newbranch = newbranch
88 # did a local lock get acquired?
88 # did a local lock get acquired?
89 self.locallocked = None
89 self.locallocked = None
90 # step already performed
90 # step already performed
91 # (used to check what steps have been already performed through bundle2)
91 # (used to check what steps have been already performed through bundle2)
92 self.stepsdone = set()
92 self.stepsdone = set()
93 # Integer version of the changegroup push result
93 # Integer version of the changegroup push result
94 # - None means nothing to push
94 # - None means nothing to push
95 # - 0 means HTTP error
95 # - 0 means HTTP error
96 # - 1 means we pushed and remote head count is unchanged *or*
96 # - 1 means we pushed and remote head count is unchanged *or*
97 # we have outgoing changesets but refused to push
97 # we have outgoing changesets but refused to push
98 # - other values as described by addchangegroup()
98 # - other values as described by addchangegroup()
99 self.cgresult = None
99 self.cgresult = None
100 # Boolean value for the bookmark push
100 # Boolean value for the bookmark push
101 self.bkresult = None
101 self.bkresult = None
102 # discover.outgoing object (contains common and outgoing data)
102 # discover.outgoing object (contains common and outgoing data)
103 self.outgoing = None
103 self.outgoing = None
104 # all remote heads before the push
104 # all remote heads before the push
105 self.remoteheads = None
105 self.remoteheads = None
106 # testable as a boolean indicating if any nodes are missing locally.
106 # testable as a boolean indicating if any nodes are missing locally.
107 self.incoming = None
107 self.incoming = None
108 # phases changes that must be pushed along side the changesets
108 # phases changes that must be pushed along side the changesets
109 self.outdatedphases = None
109 self.outdatedphases = None
110 # phases changes that must be pushed if changeset push fails
110 # phases changes that must be pushed if changeset push fails
111 self.fallbackoutdatedphases = None
111 self.fallbackoutdatedphases = None
112 # outgoing obsmarkers
112 # outgoing obsmarkers
113 self.outobsmarkers = set()
113 self.outobsmarkers = set()
114 # outgoing bookmarks
114 # outgoing bookmarks
115 self.outbookmarks = []
115 self.outbookmarks = []
116 # transaction manager
116 # transaction manager
117 self.trmanager = None
117 self.trmanager = None
118
118
119 @util.propertycache
119 @util.propertycache
120 def futureheads(self):
120 def futureheads(self):
121 """future remote heads if the changeset push succeeds"""
121 """future remote heads if the changeset push succeeds"""
122 return self.outgoing.missingheads
122 return self.outgoing.missingheads
123
123
124 @util.propertycache
124 @util.propertycache
125 def fallbackheads(self):
125 def fallbackheads(self):
126 """future remote heads if the changeset push fails"""
126 """future remote heads if the changeset push fails"""
127 if self.revs is None:
127 if self.revs is None:
128 # not target to push, all common are relevant
128 # not target to push, all common are relevant
129 return self.outgoing.commonheads
129 return self.outgoing.commonheads
130 unfi = self.repo.unfiltered()
130 unfi = self.repo.unfiltered()
131 # I want cheads = heads(::missingheads and ::commonheads)
131 # I want cheads = heads(::missingheads and ::commonheads)
132 # (missingheads is revs with secret changeset filtered out)
132 # (missingheads is revs with secret changeset filtered out)
133 #
133 #
134 # This can be expressed as:
134 # This can be expressed as:
135 # cheads = ( (missingheads and ::commonheads)
135 # cheads = ( (missingheads and ::commonheads)
136 # + (commonheads and ::missingheads))"
136 # + (commonheads and ::missingheads))"
137 # )
137 # )
138 #
138 #
139 # while trying to push we already computed the following:
139 # while trying to push we already computed the following:
140 # common = (::commonheads)
140 # common = (::commonheads)
141 # missing = ((commonheads::missingheads) - commonheads)
141 # missing = ((commonheads::missingheads) - commonheads)
142 #
142 #
143 # We can pick:
143 # We can pick:
144 # * missingheads part of common (::commonheads)
144 # * missingheads part of common (::commonheads)
145 common = set(self.outgoing.common)
145 common = set(self.outgoing.common)
146 nm = self.repo.changelog.nodemap
146 nm = self.repo.changelog.nodemap
147 cheads = [node for node in self.revs if nm[node] in common]
147 cheads = [node for node in self.revs if nm[node] in common]
148 # and
148 # and
149 # * commonheads parents on missing
149 # * commonheads parents on missing
150 revset = unfi.set('%ln and parents(roots(%ln))',
150 revset = unfi.set('%ln and parents(roots(%ln))',
151 self.outgoing.commonheads,
151 self.outgoing.commonheads,
152 self.outgoing.missing)
152 self.outgoing.missing)
153 cheads.extend(c.node() for c in revset)
153 cheads.extend(c.node() for c in revset)
154 return cheads
154 return cheads
155
155
156 @property
156 @property
157 def commonheads(self):
157 def commonheads(self):
158 """set of all common heads after changeset bundle push"""
158 """set of all common heads after changeset bundle push"""
159 if self.cgresult:
159 if self.cgresult:
160 return self.futureheads
160 return self.futureheads
161 else:
161 else:
162 return self.fallbackheads
162 return self.fallbackheads
163
163
164 # mapping of message used when pushing bookmark
164 # mapping of message used when pushing bookmark
165 bookmsgmap = {'update': (_("updating bookmark %s\n"),
165 bookmsgmap = {'update': (_("updating bookmark %s\n"),
166 _('updating bookmark %s failed!\n')),
166 _('updating bookmark %s failed!\n')),
167 'export': (_("exporting bookmark %s\n"),
167 'export': (_("exporting bookmark %s\n"),
168 _('exporting bookmark %s failed!\n')),
168 _('exporting bookmark %s failed!\n')),
169 'delete': (_("deleting remote bookmark %s\n"),
169 'delete': (_("deleting remote bookmark %s\n"),
170 _('deleting remote bookmark %s failed!\n')),
170 _('deleting remote bookmark %s failed!\n')),
171 }
171 }
172
172
173
173
174 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=()):
174 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=()):
175 '''Push outgoing changesets (limited by revs) from a local
175 '''Push outgoing changesets (limited by revs) from a local
176 repository to remote. Return an integer:
176 repository to remote. Return an integer:
177 - None means nothing to push
177 - None means nothing to push
178 - 0 means HTTP error
178 - 0 means HTTP error
179 - 1 means we pushed and remote head count is unchanged *or*
179 - 1 means we pushed and remote head count is unchanged *or*
180 we have outgoing changesets but refused to push
180 we have outgoing changesets but refused to push
181 - other values as described by addchangegroup()
181 - other values as described by addchangegroup()
182 '''
182 '''
183 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks)
183 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks)
184 if pushop.remote.local():
184 if pushop.remote.local():
185 missing = (set(pushop.repo.requirements)
185 missing = (set(pushop.repo.requirements)
186 - pushop.remote.local().supported)
186 - pushop.remote.local().supported)
187 if missing:
187 if missing:
188 msg = _("required features are not"
188 msg = _("required features are not"
189 " supported in the destination:"
189 " supported in the destination:"
190 " %s") % (', '.join(sorted(missing)))
190 " %s") % (', '.join(sorted(missing)))
191 raise util.Abort(msg)
191 raise util.Abort(msg)
192
192
193 # there are two ways to push to remote repo:
193 # there are two ways to push to remote repo:
194 #
194 #
195 # addchangegroup assumes local user can lock remote
195 # addchangegroup assumes local user can lock remote
196 # repo (local filesystem, old ssh servers).
196 # repo (local filesystem, old ssh servers).
197 #
197 #
198 # unbundle assumes local user cannot lock remote repo (new ssh
198 # unbundle assumes local user cannot lock remote repo (new ssh
199 # servers, http servers).
199 # servers, http servers).
200
200
201 if not pushop.remote.canpush():
201 if not pushop.remote.canpush():
202 raise util.Abort(_("destination does not support push"))
202 raise util.Abort(_("destination does not support push"))
203 # get local lock as we might write phase data
203 # get local lock as we might write phase data
204 localwlock = locallock = None
204 localwlock = locallock = None
205 try:
205 try:
206 # bundle2 push may receive a reply bundle touching bookmarks or other
206 # bundle2 push may receive a reply bundle touching bookmarks or other
207 # things requiring the wlock. Take it now to ensure proper ordering.
207 # things requiring the wlock. Take it now to ensure proper ordering.
208 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
208 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
209 if _canusebundle2(pushop) and maypushback:
209 if _canusebundle2(pushop) and maypushback:
210 localwlock = pushop.repo.wlock()
210 localwlock = pushop.repo.wlock()
211 locallock = pushop.repo.lock()
211 locallock = pushop.repo.lock()
212 pushop.locallocked = True
212 pushop.locallocked = True
213 except IOError, err:
213 except IOError, err:
214 pushop.locallocked = False
214 pushop.locallocked = False
215 if err.errno != errno.EACCES:
215 if err.errno != errno.EACCES:
216 raise
216 raise
217 # source repo cannot be locked.
217 # source repo cannot be locked.
218 # We do not abort the push, but just disable the local phase
218 # We do not abort the push, but just disable the local phase
219 # synchronisation.
219 # synchronisation.
220 msg = 'cannot lock source repository: %s\n' % err
220 msg = 'cannot lock source repository: %s\n' % err
221 pushop.ui.debug(msg)
221 pushop.ui.debug(msg)
222 try:
222 try:
223 if pushop.locallocked:
223 if pushop.locallocked:
224 pushop.trmanager = transactionmanager(repo,
224 pushop.trmanager = transactionmanager(repo,
225 'push-response',
225 'push-response',
226 pushop.remote.url())
226 pushop.remote.url())
227 pushop.repo.checkpush(pushop)
227 pushop.repo.checkpush(pushop)
228 lock = None
228 lock = None
229 unbundle = pushop.remote.capable('unbundle')
229 unbundle = pushop.remote.capable('unbundle')
230 if not unbundle:
230 if not unbundle:
231 lock = pushop.remote.lock()
231 lock = pushop.remote.lock()
232 try:
232 try:
233 _pushdiscovery(pushop)
233 _pushdiscovery(pushop)
234 if _canusebundle2(pushop):
234 if _canusebundle2(pushop):
235 _pushbundle2(pushop)
235 _pushbundle2(pushop)
236 _pushchangeset(pushop)
236 _pushchangeset(pushop)
237 _pushsyncphase(pushop)
237 _pushsyncphase(pushop)
238 _pushobsolete(pushop)
238 _pushobsolete(pushop)
239 _pushbookmark(pushop)
239 _pushbookmark(pushop)
240 finally:
240 finally:
241 if lock is not None:
241 if lock is not None:
242 lock.release()
242 lock.release()
243 if pushop.trmanager:
243 if pushop.trmanager:
244 pushop.trmanager.close()
244 pushop.trmanager.close()
245 finally:
245 finally:
246 if pushop.trmanager:
246 if pushop.trmanager:
247 pushop.trmanager.release()
247 pushop.trmanager.release()
248 if locallock is not None:
248 if locallock is not None:
249 locallock.release()
249 locallock.release()
250 if localwlock is not None:
250 if localwlock is not None:
251 localwlock.release()
251 localwlock.release()
252
252
253 return pushop
253 return pushop
254
254
255 # list of steps to perform discovery before push
255 # list of steps to perform discovery before push
256 pushdiscoveryorder = []
256 pushdiscoveryorder = []
257
257
258 # Mapping between step name and function
258 # Mapping between step name and function
259 #
259 #
260 # This exists to help extensions wrap steps if necessary
260 # This exists to help extensions wrap steps if necessary
261 pushdiscoverymapping = {}
261 pushdiscoverymapping = {}
262
262
263 def pushdiscovery(stepname):
263 def pushdiscovery(stepname):
264 """decorator for function performing discovery before push
264 """decorator for function performing discovery before push
265
265
266 The function is added to the step -> function mapping and appended to the
266 The function is added to the step -> function mapping and appended to the
267 list of steps. Beware that decorated function will be added in order (this
267 list of steps. Beware that decorated function will be added in order (this
268 may matter).
268 may matter).
269
269
270 You can only use this decorator for a new step, if you want to wrap a step
270 You can only use this decorator for a new step, if you want to wrap a step
271 from an extension, change the pushdiscovery dictionary directly."""
271 from an extension, change the pushdiscovery dictionary directly."""
272 def dec(func):
272 def dec(func):
273 assert stepname not in pushdiscoverymapping
273 assert stepname not in pushdiscoverymapping
274 pushdiscoverymapping[stepname] = func
274 pushdiscoverymapping[stepname] = func
275 pushdiscoveryorder.append(stepname)
275 pushdiscoveryorder.append(stepname)
276 return func
276 return func
277 return dec
277 return dec
278
278
279 def _pushdiscovery(pushop):
279 def _pushdiscovery(pushop):
280 """Run all discovery steps"""
280 """Run all discovery steps"""
281 for stepname in pushdiscoveryorder:
281 for stepname in pushdiscoveryorder:
282 step = pushdiscoverymapping[stepname]
282 step = pushdiscoverymapping[stepname]
283 step(pushop)
283 step(pushop)
284
284
285 @pushdiscovery('changeset')
285 @pushdiscovery('changeset')
286 def _pushdiscoverychangeset(pushop):
286 def _pushdiscoverychangeset(pushop):
287 """discover the changeset that need to be pushed"""
287 """discover the changeset that need to be pushed"""
288 fci = discovery.findcommonincoming
288 fci = discovery.findcommonincoming
289 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
289 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
290 common, inc, remoteheads = commoninc
290 common, inc, remoteheads = commoninc
291 fco = discovery.findcommonoutgoing
291 fco = discovery.findcommonoutgoing
292 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
292 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
293 commoninc=commoninc, force=pushop.force)
293 commoninc=commoninc, force=pushop.force)
294 pushop.outgoing = outgoing
294 pushop.outgoing = outgoing
295 pushop.remoteheads = remoteheads
295 pushop.remoteheads = remoteheads
296 pushop.incoming = inc
296 pushop.incoming = inc
297
297
298 @pushdiscovery('phase')
298 @pushdiscovery('phase')
299 def _pushdiscoveryphase(pushop):
299 def _pushdiscoveryphase(pushop):
300 """discover the phase that needs to be pushed
300 """discover the phase that needs to be pushed
301
301
302 (computed for both success and failure case for changesets push)"""
302 (computed for both success and failure case for changesets push)"""
303 outgoing = pushop.outgoing
303 outgoing = pushop.outgoing
304 unfi = pushop.repo.unfiltered()
304 unfi = pushop.repo.unfiltered()
305 remotephases = pushop.remote.listkeys('phases')
305 remotephases = pushop.remote.listkeys('phases')
306 publishing = remotephases.get('publishing', False)
306 publishing = remotephases.get('publishing', False)
307 ana = phases.analyzeremotephases(pushop.repo,
307 ana = phases.analyzeremotephases(pushop.repo,
308 pushop.fallbackheads,
308 pushop.fallbackheads,
309 remotephases)
309 remotephases)
310 pheads, droots = ana
310 pheads, droots = ana
311 extracond = ''
311 extracond = ''
312 if not publishing:
312 if not publishing:
313 extracond = ' and public()'
313 extracond = ' and public()'
314 revset = 'heads((%%ln::%%ln) %s)' % extracond
314 revset = 'heads((%%ln::%%ln) %s)' % extracond
315 # Get the list of all revs draft on remote by public here.
315 # Get the list of all revs draft on remote by public here.
316 # XXX Beware that revset break if droots is not strictly
316 # XXX Beware that revset break if droots is not strictly
317 # XXX root we may want to ensure it is but it is costly
317 # XXX root we may want to ensure it is but it is costly
318 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
318 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
319 if not outgoing.missing:
319 if not outgoing.missing:
320 future = fallback
320 future = fallback
321 else:
321 else:
322 # adds changeset we are going to push as draft
322 # adds changeset we are going to push as draft
323 #
323 #
324 # should not be necessary for publishing server, but because of an
324 # should not be necessary for publishing server, but because of an
325 # issue fixed in xxxxx we have to do it anyway.
325 # issue fixed in xxxxx we have to do it anyway.
326 fdroots = list(unfi.set('roots(%ln + %ln::)',
326 fdroots = list(unfi.set('roots(%ln + %ln::)',
327 outgoing.missing, droots))
327 outgoing.missing, droots))
328 fdroots = [f.node() for f in fdroots]
328 fdroots = [f.node() for f in fdroots]
329 future = list(unfi.set(revset, fdroots, pushop.futureheads))
329 future = list(unfi.set(revset, fdroots, pushop.futureheads))
330 pushop.outdatedphases = future
330 pushop.outdatedphases = future
331 pushop.fallbackoutdatedphases = fallback
331 pushop.fallbackoutdatedphases = fallback
332
332
333 @pushdiscovery('obsmarker')
333 @pushdiscovery('obsmarker')
334 def _pushdiscoveryobsmarkers(pushop):
334 def _pushdiscoveryobsmarkers(pushop):
335 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
335 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
336 and pushop.repo.obsstore
336 and pushop.repo.obsstore
337 and 'obsolete' in pushop.remote.listkeys('namespaces')):
337 and 'obsolete' in pushop.remote.listkeys('namespaces')):
338 repo = pushop.repo
338 repo = pushop.repo
339 # very naive computation, that can be quite expensive on big repo.
339 # very naive computation, that can be quite expensive on big repo.
340 # However: evolution is currently slow on them anyway.
340 # However: evolution is currently slow on them anyway.
341 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
341 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
342 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
342 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
343
343
344 @pushdiscovery('bookmarks')
344 @pushdiscovery('bookmarks')
345 def _pushdiscoverybookmarks(pushop):
345 def _pushdiscoverybookmarks(pushop):
346 ui = pushop.ui
346 ui = pushop.ui
347 repo = pushop.repo.unfiltered()
347 repo = pushop.repo.unfiltered()
348 remote = pushop.remote
348 remote = pushop.remote
349 ui.debug("checking for updated bookmarks\n")
349 ui.debug("checking for updated bookmarks\n")
350 ancestors = ()
350 ancestors = ()
351 if pushop.revs:
351 if pushop.revs:
352 revnums = map(repo.changelog.rev, pushop.revs)
352 revnums = map(repo.changelog.rev, pushop.revs)
353 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
353 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
354 remotebookmark = remote.listkeys('bookmarks')
354 remotebookmark = remote.listkeys('bookmarks')
355
355
356 explicit = set(pushop.bookmarks)
356 explicit = set(pushop.bookmarks)
357
357
358 comp = bookmod.compare(repo, repo._bookmarks, remotebookmark, srchex=hex)
358 comp = bookmod.compare(repo, repo._bookmarks, remotebookmark, srchex=hex)
359 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
359 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
360 for b, scid, dcid in advsrc:
360 for b, scid, dcid in advsrc:
361 if b in explicit:
361 if b in explicit:
362 explicit.remove(b)
362 explicit.remove(b)
363 if not ancestors or repo[scid].rev() in ancestors:
363 if not ancestors or repo[scid].rev() in ancestors:
364 pushop.outbookmarks.append((b, dcid, scid))
364 pushop.outbookmarks.append((b, dcid, scid))
365 # search added bookmark
365 # search added bookmark
366 for b, scid, dcid in addsrc:
366 for b, scid, dcid in addsrc:
367 if b in explicit:
367 if b in explicit:
368 explicit.remove(b)
368 explicit.remove(b)
369 pushop.outbookmarks.append((b, '', scid))
369 pushop.outbookmarks.append((b, '', scid))
370 # search for overwritten bookmark
370 # search for overwritten bookmark
371 for b, scid, dcid in advdst + diverge + differ:
371 for b, scid, dcid in advdst + diverge + differ:
372 if b in explicit:
372 if b in explicit:
373 explicit.remove(b)
373 explicit.remove(b)
374 pushop.outbookmarks.append((b, dcid, scid))
374 pushop.outbookmarks.append((b, dcid, scid))
375 # search for bookmark to delete
375 # search for bookmark to delete
376 for b, scid, dcid in adddst:
376 for b, scid, dcid in adddst:
377 if b in explicit:
377 if b in explicit:
378 explicit.remove(b)
378 explicit.remove(b)
379 # treat as "deleted locally"
379 # treat as "deleted locally"
380 pushop.outbookmarks.append((b, dcid, ''))
380 pushop.outbookmarks.append((b, dcid, ''))
381 # identical bookmarks shouldn't get reported
381 # identical bookmarks shouldn't get reported
382 for b, scid, dcid in same:
382 for b, scid, dcid in same:
383 if b in explicit:
383 if b in explicit:
384 explicit.remove(b)
384 explicit.remove(b)
385
385
386 if explicit:
386 if explicit:
387 explicit = sorted(explicit)
387 explicit = sorted(explicit)
388 # we should probably list all of them
388 # we should probably list all of them
389 ui.warn(_('bookmark %s does not exist on the local '
389 ui.warn(_('bookmark %s does not exist on the local '
390 'or remote repository!\n') % explicit[0])
390 'or remote repository!\n') % explicit[0])
391 pushop.bkresult = 2
391 pushop.bkresult = 2
392
392
393 pushop.outbookmarks.sort()
393 pushop.outbookmarks.sort()
394
394
395 def _pushcheckoutgoing(pushop):
395 def _pushcheckoutgoing(pushop):
396 outgoing = pushop.outgoing
396 outgoing = pushop.outgoing
397 unfi = pushop.repo.unfiltered()
397 unfi = pushop.repo.unfiltered()
398 if not outgoing.missing:
398 if not outgoing.missing:
399 # nothing to push
399 # nothing to push
400 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
400 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
401 return False
401 return False
402 # something to push
402 # something to push
403 if not pushop.force:
403 if not pushop.force:
404 # if repo.obsstore == False --> no obsolete
404 # if repo.obsstore == False --> no obsolete
405 # then, save the iteration
405 # then, save the iteration
406 if unfi.obsstore:
406 if unfi.obsstore:
407 # this message are here for 80 char limit reason
407 # this message are here for 80 char limit reason
408 mso = _("push includes obsolete changeset: %s!")
408 mso = _("push includes obsolete changeset: %s!")
409 mst = {"unstable": _("push includes unstable changeset: %s!"),
409 mst = {"unstable": _("push includes unstable changeset: %s!"),
410 "bumped": _("push includes bumped changeset: %s!"),
410 "bumped": _("push includes bumped changeset: %s!"),
411 "divergent": _("push includes divergent changeset: %s!")}
411 "divergent": _("push includes divergent changeset: %s!")}
412 # If we are to push if there is at least one
412 # If we are to push if there is at least one
413 # obsolete or unstable changeset in missing, at
413 # obsolete or unstable changeset in missing, at
414 # least one of the missinghead will be obsolete or
414 # least one of the missinghead will be obsolete or
415 # unstable. So checking heads only is ok
415 # unstable. So checking heads only is ok
416 for node in outgoing.missingheads:
416 for node in outgoing.missingheads:
417 ctx = unfi[node]
417 ctx = unfi[node]
418 if ctx.obsolete():
418 if ctx.obsolete():
419 raise util.Abort(mso % ctx)
419 raise util.Abort(mso % ctx)
420 elif ctx.troubled():
420 elif ctx.troubled():
421 raise util.Abort(mst[ctx.troubles()[0]] % ctx)
421 raise util.Abort(mst[ctx.troubles()[0]] % ctx)
422 newbm = pushop.ui.configlist('bookmarks', 'pushing')
422 newbm = pushop.ui.configlist('bookmarks', 'pushing')
423 discovery.checkheads(unfi, pushop.remote, outgoing,
423 discovery.checkheads(unfi, pushop.remote, outgoing,
424 pushop.remoteheads,
424 pushop.remoteheads,
425 pushop.newbranch,
425 pushop.newbranch,
426 bool(pushop.incoming),
426 bool(pushop.incoming),
427 newbm)
427 newbm)
428 return True
428 return True
429
429
430 # List of names of steps to perform for an outgoing bundle2, order matters.
430 # List of names of steps to perform for an outgoing bundle2, order matters.
431 b2partsgenorder = []
431 b2partsgenorder = []
432
432
433 # Mapping between step name and function
433 # Mapping between step name and function
434 #
434 #
435 # This exists to help extensions wrap steps if necessary
435 # This exists to help extensions wrap steps if necessary
436 b2partsgenmapping = {}
436 b2partsgenmapping = {}
437
437
438 def b2partsgenerator(stepname, idx=None):
438 def b2partsgenerator(stepname, idx=None):
439 """decorator for function generating bundle2 part
439 """decorator for function generating bundle2 part
440
440
441 The function is added to the step -> function mapping and appended to the
441 The function is added to the step -> function mapping and appended to the
442 list of steps. Beware that decorated functions will be added in order
442 list of steps. Beware that decorated functions will be added in order
443 (this may matter).
443 (this may matter).
444
444
445 You can only use this decorator for new steps, if you want to wrap a step
445 You can only use this decorator for new steps, if you want to wrap a step
446 from an extension, attack the b2partsgenmapping dictionary directly."""
446 from an extension, attack the b2partsgenmapping dictionary directly."""
447 def dec(func):
447 def dec(func):
448 assert stepname not in b2partsgenmapping
448 assert stepname not in b2partsgenmapping
449 b2partsgenmapping[stepname] = func
449 b2partsgenmapping[stepname] = func
450 if idx is None:
450 if idx is None:
451 b2partsgenorder.append(stepname)
451 b2partsgenorder.append(stepname)
452 else:
452 else:
453 b2partsgenorder.insert(idx, stepname)
453 b2partsgenorder.insert(idx, stepname)
454 return func
454 return func
455 return dec
455 return dec
456
456
457 @b2partsgenerator('changeset')
457 @b2partsgenerator('changeset')
458 def _pushb2ctx(pushop, bundler):
458 def _pushb2ctx(pushop, bundler):
459 """handle changegroup push through bundle2
459 """handle changegroup push through bundle2
460
460
461 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
461 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
462 """
462 """
463 if 'changesets' in pushop.stepsdone:
463 if 'changesets' in pushop.stepsdone:
464 return
464 return
465 pushop.stepsdone.add('changesets')
465 pushop.stepsdone.add('changesets')
466 # Send known heads to the server for race detection.
466 # Send known heads to the server for race detection.
467 if not _pushcheckoutgoing(pushop):
467 if not _pushcheckoutgoing(pushop):
468 return
468 return
469 pushop.repo.prepushoutgoinghooks(pushop.repo,
469 pushop.repo.prepushoutgoinghooks(pushop.repo,
470 pushop.remote,
470 pushop.remote,
471 pushop.outgoing)
471 pushop.outgoing)
472 if not pushop.force:
472 if not pushop.force:
473 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
473 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
474 b2caps = bundle2.bundle2caps(pushop.remote)
474 b2caps = bundle2.bundle2caps(pushop.remote)
475 version = None
475 version = None
476 cgversions = b2caps.get('changegroup')
476 cgversions = b2caps.get('changegroup')
477 if not cgversions: # 3.1 and 3.2 ship with an empty value
477 if not cgversions: # 3.1 and 3.2 ship with an empty value
478 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
478 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
479 pushop.outgoing)
479 pushop.outgoing)
480 else:
480 else:
481 cgversions = [v for v in cgversions if v in changegroup.packermap]
481 cgversions = [v for v in cgversions if v in changegroup.packermap]
482 if not cgversions:
482 if not cgversions:
483 raise ValueError(_('no common changegroup version'))
483 raise ValueError(_('no common changegroup version'))
484 version = max(cgversions)
484 version = max(cgversions)
485 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
485 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
486 pushop.outgoing,
486 pushop.outgoing,
487 version=version)
487 version=version)
488 cgpart = bundler.newpart('changegroup', data=cg)
488 cgpart = bundler.newpart('changegroup', data=cg)
489 if version is not None:
489 if version is not None:
490 cgpart.addparam('version', version)
490 cgpart.addparam('version', version)
491 def handlereply(op):
491 def handlereply(op):
492 """extract addchangegroup returns from server reply"""
492 """extract addchangegroup returns from server reply"""
493 cgreplies = op.records.getreplies(cgpart.id)
493 cgreplies = op.records.getreplies(cgpart.id)
494 assert len(cgreplies['changegroup']) == 1
494 assert len(cgreplies['changegroup']) == 1
495 pushop.cgresult = cgreplies['changegroup'][0]['return']
495 pushop.cgresult = cgreplies['changegroup'][0]['return']
496 return handlereply
496 return handlereply
497
497
498 @b2partsgenerator('phase')
498 @b2partsgenerator('phase')
499 def _pushb2phases(pushop, bundler):
499 def _pushb2phases(pushop, bundler):
500 """handle phase push through bundle2"""
500 """handle phase push through bundle2"""
501 if 'phases' in pushop.stepsdone:
501 if 'phases' in pushop.stepsdone:
502 return
502 return
503 b2caps = bundle2.bundle2caps(pushop.remote)
503 b2caps = bundle2.bundle2caps(pushop.remote)
504 if not 'pushkey' in b2caps:
504 if not 'pushkey' in b2caps:
505 return
505 return
506 pushop.stepsdone.add('phases')
506 pushop.stepsdone.add('phases')
507 part2node = []
507 part2node = []
508 enc = pushkey.encode
508 enc = pushkey.encode
509 for newremotehead in pushop.outdatedphases:
509 for newremotehead in pushop.outdatedphases:
510 part = bundler.newpart('pushkey')
510 part = bundler.newpart('pushkey')
511 part.addparam('namespace', enc('phases'))
511 part.addparam('namespace', enc('phases'))
512 part.addparam('key', enc(newremotehead.hex()))
512 part.addparam('key', enc(newremotehead.hex()))
513 part.addparam('old', enc(str(phases.draft)))
513 part.addparam('old', enc(str(phases.draft)))
514 part.addparam('new', enc(str(phases.public)))
514 part.addparam('new', enc(str(phases.public)))
515 part2node.append((part.id, newremotehead))
515 part2node.append((part.id, newremotehead))
516 def handlereply(op):
516 def handlereply(op):
517 for partid, node in part2node:
517 for partid, node in part2node:
518 partrep = op.records.getreplies(partid)
518 partrep = op.records.getreplies(partid)
519 results = partrep['pushkey']
519 results = partrep['pushkey']
520 assert len(results) <= 1
520 assert len(results) <= 1
521 msg = None
521 msg = None
522 if not results:
522 if not results:
523 msg = _('server ignored update of %s to public!\n') % node
523 msg = _('server ignored update of %s to public!\n') % node
524 elif not int(results[0]['return']):
524 elif not int(results[0]['return']):
525 msg = _('updating %s to public failed!\n') % node
525 msg = _('updating %s to public failed!\n') % node
526 if msg is not None:
526 if msg is not None:
527 pushop.ui.warn(msg)
527 pushop.ui.warn(msg)
528 return handlereply
528 return handlereply
529
529
530 @b2partsgenerator('obsmarkers')
530 @b2partsgenerator('obsmarkers')
531 def _pushb2obsmarkers(pushop, bundler):
531 def _pushb2obsmarkers(pushop, bundler):
532 if 'obsmarkers' in pushop.stepsdone:
532 if 'obsmarkers' in pushop.stepsdone:
533 return
533 return
534 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
534 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
535 if obsolete.commonversion(remoteversions) is None:
535 if obsolete.commonversion(remoteversions) is None:
536 return
536 return
537 pushop.stepsdone.add('obsmarkers')
537 pushop.stepsdone.add('obsmarkers')
538 if pushop.outobsmarkers:
538 if pushop.outobsmarkers:
539 buildobsmarkerspart(bundler, pushop.outobsmarkers)
539 buildobsmarkerspart(bundler, pushop.outobsmarkers)
540
540
541 @b2partsgenerator('bookmarks')
541 @b2partsgenerator('bookmarks')
542 def _pushb2bookmarks(pushop, bundler):
542 def _pushb2bookmarks(pushop, bundler):
543 """handle phase push through bundle2"""
543 """handle phase push through bundle2"""
544 if 'bookmarks' in pushop.stepsdone:
544 if 'bookmarks' in pushop.stepsdone:
545 return
545 return
546 b2caps = bundle2.bundle2caps(pushop.remote)
546 b2caps = bundle2.bundle2caps(pushop.remote)
547 if 'pushkey' not in b2caps:
547 if 'pushkey' not in b2caps:
548 return
548 return
549 pushop.stepsdone.add('bookmarks')
549 pushop.stepsdone.add('bookmarks')
550 part2book = []
550 part2book = []
551 enc = pushkey.encode
551 enc = pushkey.encode
552 for book, old, new in pushop.outbookmarks:
552 for book, old, new in pushop.outbookmarks:
553 part = bundler.newpart('pushkey')
553 part = bundler.newpart('pushkey')
554 part.addparam('namespace', enc('bookmarks'))
554 part.addparam('namespace', enc('bookmarks'))
555 part.addparam('key', enc(book))
555 part.addparam('key', enc(book))
556 part.addparam('old', enc(old))
556 part.addparam('old', enc(old))
557 part.addparam('new', enc(new))
557 part.addparam('new', enc(new))
558 action = 'update'
558 action = 'update'
559 if not old:
559 if not old:
560 action = 'export'
560 action = 'export'
561 elif not new:
561 elif not new:
562 action = 'delete'
562 action = 'delete'
563 part2book.append((part.id, book, action))
563 part2book.append((part.id, book, action))
564
564
565
565
566 def handlereply(op):
566 def handlereply(op):
567 ui = pushop.ui
567 ui = pushop.ui
568 for partid, book, action in part2book:
568 for partid, book, action in part2book:
569 partrep = op.records.getreplies(partid)
569 partrep = op.records.getreplies(partid)
570 results = partrep['pushkey']
570 results = partrep['pushkey']
571 assert len(results) <= 1
571 assert len(results) <= 1
572 if not results:
572 if not results:
573 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
573 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
574 else:
574 else:
575 ret = int(results[0]['return'])
575 ret = int(results[0]['return'])
576 if ret:
576 if ret:
577 ui.status(bookmsgmap[action][0] % book)
577 ui.status(bookmsgmap[action][0] % book)
578 else:
578 else:
579 ui.warn(bookmsgmap[action][1] % book)
579 ui.warn(bookmsgmap[action][1] % book)
580 if pushop.bkresult is not None:
580 if pushop.bkresult is not None:
581 pushop.bkresult = 1
581 pushop.bkresult = 1
582 return handlereply
582 return handlereply
583
583
584
584
585 def _pushbundle2(pushop):
585 def _pushbundle2(pushop):
586 """push data to the remote using bundle2
586 """push data to the remote using bundle2
587
587
588 The only currently supported type of data is changegroup but this will
588 The only currently supported type of data is changegroup but this will
589 evolve in the future."""
589 evolve in the future."""
590 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
590 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
591 pushback = (pushop.trmanager
591 pushback = (pushop.trmanager
592 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
592 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
593
593
594 # create reply capability
594 # create reply capability
595 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
595 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
596 allowpushback=pushback))
596 allowpushback=pushback))
597 bundler.newpart('replycaps', data=capsblob)
597 bundler.newpart('replycaps', data=capsblob)
598 replyhandlers = []
598 replyhandlers = []
599 for partgenname in b2partsgenorder:
599 for partgenname in b2partsgenorder:
600 partgen = b2partsgenmapping[partgenname]
600 partgen = b2partsgenmapping[partgenname]
601 ret = partgen(pushop, bundler)
601 ret = partgen(pushop, bundler)
602 if callable(ret):
602 if callable(ret):
603 replyhandlers.append(ret)
603 replyhandlers.append(ret)
604 # do not push if nothing to push
604 # do not push if nothing to push
605 if bundler.nbparts <= 1:
605 if bundler.nbparts <= 1:
606 return
606 return
607 stream = util.chunkbuffer(bundler.getchunks())
607 stream = util.chunkbuffer(bundler.getchunks())
608 try:
608 try:
609 reply = pushop.remote.unbundle(stream, ['force'], 'push')
609 reply = pushop.remote.unbundle(stream, ['force'], 'push')
610 except error.BundleValueError, exc:
610 except error.BundleValueError, exc:
611 raise util.Abort('missing support for %s' % exc)
611 raise util.Abort('missing support for %s' % exc)
612 try:
612 try:
613 trgetter = None
613 trgetter = None
614 if pushback:
614 if pushback:
615 trgetter = pushop.trmanager.transaction
615 trgetter = pushop.trmanager.transaction
616 op = bundle2.processbundle(pushop.repo, reply, trgetter)
616 op = bundle2.processbundle(pushop.repo, reply, trgetter)
617 except error.BundleValueError, exc:
617 except error.BundleValueError, exc:
618 raise util.Abort('missing support for %s' % exc)
618 raise util.Abort('missing support for %s' % exc)
619 for rephand in replyhandlers:
619 for rephand in replyhandlers:
620 rephand(op)
620 rephand(op)
621
621
622 def _pushchangeset(pushop):
622 def _pushchangeset(pushop):
623 """Make the actual push of changeset bundle to remote repo"""
623 """Make the actual push of changeset bundle to remote repo"""
624 if 'changesets' in pushop.stepsdone:
624 if 'changesets' in pushop.stepsdone:
625 return
625 return
626 pushop.stepsdone.add('changesets')
626 pushop.stepsdone.add('changesets')
627 if not _pushcheckoutgoing(pushop):
627 if not _pushcheckoutgoing(pushop):
628 return
628 return
629 pushop.repo.prepushoutgoinghooks(pushop.repo,
629 pushop.repo.prepushoutgoinghooks(pushop.repo,
630 pushop.remote,
630 pushop.remote,
631 pushop.outgoing)
631 pushop.outgoing)
632 outgoing = pushop.outgoing
632 outgoing = pushop.outgoing
633 unbundle = pushop.remote.capable('unbundle')
633 unbundle = pushop.remote.capable('unbundle')
634 # TODO: get bundlecaps from remote
634 # TODO: get bundlecaps from remote
635 bundlecaps = None
635 bundlecaps = None
636 # create a changegroup from local
636 # create a changegroup from local
637 if pushop.revs is None and not (outgoing.excluded
637 if pushop.revs is None and not (outgoing.excluded
638 or pushop.repo.changelog.filteredrevs):
638 or pushop.repo.changelog.filteredrevs):
639 # push everything,
639 # push everything,
640 # use the fast path, no race possible on push
640 # use the fast path, no race possible on push
641 bundler = changegroup.cg1packer(pushop.repo, bundlecaps)
641 bundler = changegroup.cg1packer(pushop.repo, bundlecaps)
642 cg = changegroup.getsubset(pushop.repo,
642 cg = changegroup.getsubset(pushop.repo,
643 outgoing,
643 outgoing,
644 bundler,
644 bundler,
645 'push',
645 'push',
646 fastpath=True)
646 fastpath=True)
647 else:
647 else:
648 cg = changegroup.getlocalchangegroup(pushop.repo, 'push', outgoing,
648 cg = changegroup.getlocalchangegroup(pushop.repo, 'push', outgoing,
649 bundlecaps)
649 bundlecaps)
650
650
651 # apply changegroup to remote
651 # apply changegroup to remote
652 if unbundle:
652 if unbundle:
653 # local repo finds heads on server, finds out what
653 # local repo finds heads on server, finds out what
654 # revs it must push. once revs transferred, if server
654 # revs it must push. once revs transferred, if server
655 # finds it has different heads (someone else won
655 # finds it has different heads (someone else won
656 # commit/push race), server aborts.
656 # commit/push race), server aborts.
657 if pushop.force:
657 if pushop.force:
658 remoteheads = ['force']
658 remoteheads = ['force']
659 else:
659 else:
660 remoteheads = pushop.remoteheads
660 remoteheads = pushop.remoteheads
661 # ssh: return remote's addchangegroup()
661 # ssh: return remote's addchangegroup()
662 # http: return remote's addchangegroup() or 0 for error
662 # http: return remote's addchangegroup() or 0 for error
663 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
663 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
664 pushop.repo.url())
664 pushop.repo.url())
665 else:
665 else:
666 # we return an integer indicating remote head count
666 # we return an integer indicating remote head count
667 # change
667 # change
668 pushop.cgresult = pushop.remote.addchangegroup(cg, 'push',
668 pushop.cgresult = pushop.remote.addchangegroup(cg, 'push',
669 pushop.repo.url())
669 pushop.repo.url())
670
670
671 def _pushsyncphase(pushop):
671 def _pushsyncphase(pushop):
672 """synchronise phase information locally and remotely"""
672 """synchronise phase information locally and remotely"""
673 cheads = pushop.commonheads
673 cheads = pushop.commonheads
674 # even when we don't push, exchanging phase data is useful
674 # even when we don't push, exchanging phase data is useful
675 remotephases = pushop.remote.listkeys('phases')
675 remotephases = pushop.remote.listkeys('phases')
676 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
676 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
677 and remotephases # server supports phases
677 and remotephases # server supports phases
678 and pushop.cgresult is None # nothing was pushed
678 and pushop.cgresult is None # nothing was pushed
679 and remotephases.get('publishing', False)):
679 and remotephases.get('publishing', False)):
680 # When:
680 # When:
681 # - this is a subrepo push
681 # - this is a subrepo push
682 # - and remote support phase
682 # - and remote support phase
683 # - and no changeset was pushed
683 # - and no changeset was pushed
684 # - and remote is publishing
684 # - and remote is publishing
685 # We may be in issue 3871 case!
685 # We may be in issue 3871 case!
686 # We drop the possible phase synchronisation done by
686 # We drop the possible phase synchronisation done by
687 # courtesy to publish changesets possibly locally draft
687 # courtesy to publish changesets possibly locally draft
688 # on the remote.
688 # on the remote.
689 remotephases = {'publishing': 'True'}
689 remotephases = {'publishing': 'True'}
690 if not remotephases: # old server or public only reply from non-publishing
690 if not remotephases: # old server or public only reply from non-publishing
691 _localphasemove(pushop, cheads)
691 _localphasemove(pushop, cheads)
692 # don't push any phase data as there is nothing to push
692 # don't push any phase data as there is nothing to push
693 else:
693 else:
694 ana = phases.analyzeremotephases(pushop.repo, cheads,
694 ana = phases.analyzeremotephases(pushop.repo, cheads,
695 remotephases)
695 remotephases)
696 pheads, droots = ana
696 pheads, droots = ana
697 ### Apply remote phase on local
697 ### Apply remote phase on local
698 if remotephases.get('publishing', False):
698 if remotephases.get('publishing', False):
699 _localphasemove(pushop, cheads)
699 _localphasemove(pushop, cheads)
700 else: # publish = False
700 else: # publish = False
701 _localphasemove(pushop, pheads)
701 _localphasemove(pushop, pheads)
702 _localphasemove(pushop, cheads, phases.draft)
702 _localphasemove(pushop, cheads, phases.draft)
703 ### Apply local phase on remote
703 ### Apply local phase on remote
704
704
705 if pushop.cgresult:
705 if pushop.cgresult:
706 if 'phases' in pushop.stepsdone:
706 if 'phases' in pushop.stepsdone:
707 # phases already pushed though bundle2
707 # phases already pushed though bundle2
708 return
708 return
709 outdated = pushop.outdatedphases
709 outdated = pushop.outdatedphases
710 else:
710 else:
711 outdated = pushop.fallbackoutdatedphases
711 outdated = pushop.fallbackoutdatedphases
712
712
713 pushop.stepsdone.add('phases')
713 pushop.stepsdone.add('phases')
714
714
715 # filter heads already turned public by the push
715 # filter heads already turned public by the push
716 outdated = [c for c in outdated if c.node() not in pheads]
716 outdated = [c for c in outdated if c.node() not in pheads]
717 # fallback to independent pushkey command
717 # fallback to independent pushkey command
718 for newremotehead in outdated:
718 for newremotehead in outdated:
719 r = pushop.remote.pushkey('phases',
719 r = pushop.remote.pushkey('phases',
720 newremotehead.hex(),
720 newremotehead.hex(),
721 str(phases.draft),
721 str(phases.draft),
722 str(phases.public))
722 str(phases.public))
723 if not r:
723 if not r:
724 pushop.ui.warn(_('updating %s to public failed!\n')
724 pushop.ui.warn(_('updating %s to public failed!\n')
725 % newremotehead)
725 % newremotehead)
726
726
727 def _localphasemove(pushop, nodes, phase=phases.public):
727 def _localphasemove(pushop, nodes, phase=phases.public):
728 """move <nodes> to <phase> in the local source repo"""
728 """move <nodes> to <phase> in the local source repo"""
729 if pushop.trmanager:
729 if pushop.trmanager:
730 phases.advanceboundary(pushop.repo,
730 phases.advanceboundary(pushop.repo,
731 pushop.trmanager.transaction(),
731 pushop.trmanager.transaction(),
732 phase,
732 phase,
733 nodes)
733 nodes)
734 else:
734 else:
735 # repo is not locked, do not change any phases!
735 # repo is not locked, do not change any phases!
736 # Informs the user that phases should have been moved when
736 # Informs the user that phases should have been moved when
737 # applicable.
737 # applicable.
738 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
738 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
739 phasestr = phases.phasenames[phase]
739 phasestr = phases.phasenames[phase]
740 if actualmoves:
740 if actualmoves:
741 pushop.ui.status(_('cannot lock source repo, skipping '
741 pushop.ui.status(_('cannot lock source repo, skipping '
742 'local %s phase update\n') % phasestr)
742 'local %s phase update\n') % phasestr)
743
743
744 def _pushobsolete(pushop):
744 def _pushobsolete(pushop):
745 """utility function to push obsolete markers to a remote"""
745 """utility function to push obsolete markers to a remote"""
746 if 'obsmarkers' in pushop.stepsdone:
746 if 'obsmarkers' in pushop.stepsdone:
747 return
747 return
748 pushop.ui.debug('try to push obsolete markers to remote\n')
748 pushop.ui.debug('try to push obsolete markers to remote\n')
749 repo = pushop.repo
749 repo = pushop.repo
750 remote = pushop.remote
750 remote = pushop.remote
751 pushop.stepsdone.add('obsmarkers')
751 pushop.stepsdone.add('obsmarkers')
752 if pushop.outobsmarkers:
752 if pushop.outobsmarkers:
753 rslts = []
753 rslts = []
754 remotedata = obsolete._pushkeyescape(pushop.outobsmarkers)
754 remotedata = obsolete._pushkeyescape(pushop.outobsmarkers)
755 for key in sorted(remotedata, reverse=True):
755 for key in sorted(remotedata, reverse=True):
756 # reverse sort to ensure we end with dump0
756 # reverse sort to ensure we end with dump0
757 data = remotedata[key]
757 data = remotedata[key]
758 rslts.append(remote.pushkey('obsolete', key, '', data))
758 rslts.append(remote.pushkey('obsolete', key, '', data))
759 if [r for r in rslts if not r]:
759 if [r for r in rslts if not r]:
760 msg = _('failed to push some obsolete markers!\n')
760 msg = _('failed to push some obsolete markers!\n')
761 repo.ui.warn(msg)
761 repo.ui.warn(msg)
762
762
763 def _pushbookmark(pushop):
763 def _pushbookmark(pushop):
764 """Update bookmark position on remote"""
764 """Update bookmark position on remote"""
765 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
765 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
766 return
766 return
767 pushop.stepsdone.add('bookmarks')
767 pushop.stepsdone.add('bookmarks')
768 ui = pushop.ui
768 ui = pushop.ui
769 remote = pushop.remote
769 remote = pushop.remote
770
770
771 for b, old, new in pushop.outbookmarks:
771 for b, old, new in pushop.outbookmarks:
772 action = 'update'
772 action = 'update'
773 if not old:
773 if not old:
774 action = 'export'
774 action = 'export'
775 elif not new:
775 elif not new:
776 action = 'delete'
776 action = 'delete'
777 if remote.pushkey('bookmarks', b, old, new):
777 if remote.pushkey('bookmarks', b, old, new):
778 ui.status(bookmsgmap[action][0] % b)
778 ui.status(bookmsgmap[action][0] % b)
779 else:
779 else:
780 ui.warn(bookmsgmap[action][1] % b)
780 ui.warn(bookmsgmap[action][1] % b)
781 # discovery can have set the value form invalid entry
781 # discovery can have set the value form invalid entry
782 if pushop.bkresult is not None:
782 if pushop.bkresult is not None:
783 pushop.bkresult = 1
783 pushop.bkresult = 1
784
784
785 class pulloperation(object):
785 class pulloperation(object):
786 """A object that represent a single pull operation
786 """A object that represent a single pull operation
787
787
788 It purpose is to carry pull related state and very common operation.
788 It purpose is to carry pull related state and very common operation.
789
789
790 A new should be created at the beginning of each pull and discarded
790 A new should be created at the beginning of each pull and discarded
791 afterward.
791 afterward.
792 """
792 """
793
793
794 def __init__(self, repo, remote, heads=None, force=False, bookmarks=()):
794 def __init__(self, repo, remote, heads=None, force=False, bookmarks=()):
795 # repo we pull into
795 # repo we pull into
796 self.repo = repo
796 self.repo = repo
797 # repo we pull from
797 # repo we pull from
798 self.remote = remote
798 self.remote = remote
799 # revision we try to pull (None is "all")
799 # revision we try to pull (None is "all")
800 self.heads = heads
800 self.heads = heads
801 # bookmark pulled explicitly
801 # bookmark pulled explicitly
802 self.explicitbookmarks = bookmarks
802 self.explicitbookmarks = bookmarks
803 # do we force pull?
803 # do we force pull?
804 self.force = force
804 self.force = force
805 # transaction manager
805 # transaction manager
806 self.trmanager = None
806 self.trmanager = None
807 # set of common changeset between local and remote before pull
807 # set of common changeset between local and remote before pull
808 self.common = None
808 self.common = None
809 # set of pulled head
809 # set of pulled head
810 self.rheads = None
810 self.rheads = None
811 # list of missing changeset to fetch remotely
811 # list of missing changeset to fetch remotely
812 self.fetch = None
812 self.fetch = None
813 # remote bookmarks data
813 # remote bookmarks data
814 self.remotebookmarks = None
814 self.remotebookmarks = None
815 # result of changegroup pulling (used as return code by pull)
815 # result of changegroup pulling (used as return code by pull)
816 self.cgresult = None
816 self.cgresult = None
817 # list of step already done
817 # list of step already done
818 self.stepsdone = set()
818 self.stepsdone = set()
819
819
820 @util.propertycache
820 @util.propertycache
821 def pulledsubset(self):
821 def pulledsubset(self):
822 """heads of the set of changeset target by the pull"""
822 """heads of the set of changeset target by the pull"""
823 # compute target subset
823 # compute target subset
824 if self.heads is None:
824 if self.heads is None:
825 # We pulled every thing possible
825 # We pulled every thing possible
826 # sync on everything common
826 # sync on everything common
827 c = set(self.common)
827 c = set(self.common)
828 ret = list(self.common)
828 ret = list(self.common)
829 for n in self.rheads:
829 for n in self.rheads:
830 if n not in c:
830 if n not in c:
831 ret.append(n)
831 ret.append(n)
832 return ret
832 return ret
833 else:
833 else:
834 # We pulled a specific subset
834 # We pulled a specific subset
835 # sync on this subset
835 # sync on this subset
836 return self.heads
836 return self.heads
837
837
838 def gettransaction(self):
838 def gettransaction(self):
839 # deprecated; talk to trmanager directly
839 # deprecated; talk to trmanager directly
840 return self.trmanager.transaction()
840 return self.trmanager.transaction()
841
841
842 class transactionmanager(object):
842 class transactionmanager(object):
843 """An object to manage the life cycle of a transaction
843 """An object to manage the life cycle of a transaction
844
844
845 It creates the transaction on demand and calls the appropriate hooks when
845 It creates the transaction on demand and calls the appropriate hooks when
846 closing the transaction."""
846 closing the transaction."""
847 def __init__(self, repo, source, url):
847 def __init__(self, repo, source, url):
848 self.repo = repo
848 self.repo = repo
849 self.source = source
849 self.source = source
850 self.url = url
850 self.url = url
851 self._tr = None
851 self._tr = None
852
852
853 def transaction(self):
853 def transaction(self):
854 """Return an open transaction object, constructing if necessary"""
854 """Return an open transaction object, constructing if necessary"""
855 if not self._tr:
855 if not self._tr:
856 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
856 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
857 self._tr = self.repo.transaction(trname)
857 self._tr = self.repo.transaction(trname)
858 self._tr.hookargs['source'] = self.source
858 self._tr.hookargs['source'] = self.source
859 self._tr.hookargs['url'] = self.url
859 self._tr.hookargs['url'] = self.url
860 return self._tr
860 return self._tr
861
861
862 def close(self):
862 def close(self):
863 """close transaction if created"""
863 """close transaction if created"""
864 if self._tr is not None:
864 if self._tr is not None:
865 self._tr.close()
865 self._tr.close()
866
866
867 def release(self):
867 def release(self):
868 """release transaction if created"""
868 """release transaction if created"""
869 if self._tr is not None:
869 if self._tr is not None:
870 self._tr.release()
870 self._tr.release()
871
871
872 def pull(repo, remote, heads=None, force=False, bookmarks=()):
872 def pull(repo, remote, heads=None, force=False, bookmarks=()):
873 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks)
873 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks)
874 if pullop.remote.local():
874 if pullop.remote.local():
875 missing = set(pullop.remote.requirements) - pullop.repo.supported
875 missing = set(pullop.remote.requirements) - pullop.repo.supported
876 if missing:
876 if missing:
877 msg = _("required features are not"
877 msg = _("required features are not"
878 " supported in the destination:"
878 " supported in the destination:"
879 " %s") % (', '.join(sorted(missing)))
879 " %s") % (', '.join(sorted(missing)))
880 raise util.Abort(msg)
880 raise util.Abort(msg)
881
881
882 pullop.remotebookmarks = remote.listkeys('bookmarks')
882 pullop.remotebookmarks = remote.listkeys('bookmarks')
883 lock = pullop.repo.lock()
883 lock = pullop.repo.lock()
884 try:
884 try:
885 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
885 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
886 _pulldiscovery(pullop)
886 _pulldiscovery(pullop)
887 if _canusebundle2(pullop):
887 if _canusebundle2(pullop):
888 _pullbundle2(pullop)
888 _pullbundle2(pullop)
889 _pullchangeset(pullop)
889 _pullchangeset(pullop)
890 _pullphase(pullop)
890 _pullphase(pullop)
891 _pullbookmarks(pullop)
891 _pullbookmarks(pullop)
892 _pullobsolete(pullop)
892 _pullobsolete(pullop)
893 pullop.trmanager.close()
893 pullop.trmanager.close()
894 finally:
894 finally:
895 pullop.trmanager.release()
895 pullop.trmanager.release()
896 lock.release()
896 lock.release()
897
897
898 return pullop
898 return pullop
899
899
900 # list of steps to perform discovery before pull
900 # list of steps to perform discovery before pull
901 pulldiscoveryorder = []
901 pulldiscoveryorder = []
902
902
903 # Mapping between step name and function
903 # Mapping between step name and function
904 #
904 #
905 # This exists to help extensions wrap steps if necessary
905 # This exists to help extensions wrap steps if necessary
906 pulldiscoverymapping = {}
906 pulldiscoverymapping = {}
907
907
908 def pulldiscovery(stepname):
908 def pulldiscovery(stepname):
909 """decorator for function performing discovery before pull
909 """decorator for function performing discovery before pull
910
910
911 The function is added to the step -> function mapping and appended to the
911 The function is added to the step -> function mapping and appended to the
912 list of steps. Beware that decorated function will be added in order (this
912 list of steps. Beware that decorated function will be added in order (this
913 may matter).
913 may matter).
914
914
915 You can only use this decorator for a new step, if you want to wrap a step
915 You can only use this decorator for a new step, if you want to wrap a step
916 from an extension, change the pulldiscovery dictionary directly."""
916 from an extension, change the pulldiscovery dictionary directly."""
917 def dec(func):
917 def dec(func):
918 assert stepname not in pulldiscoverymapping
918 assert stepname not in pulldiscoverymapping
919 pulldiscoverymapping[stepname] = func
919 pulldiscoverymapping[stepname] = func
920 pulldiscoveryorder.append(stepname)
920 pulldiscoveryorder.append(stepname)
921 return func
921 return func
922 return dec
922 return dec
923
923
924 def _pulldiscovery(pullop):
924 def _pulldiscovery(pullop):
925 """Run all discovery steps"""
925 """Run all discovery steps"""
926 for stepname in pulldiscoveryorder:
926 for stepname in pulldiscoveryorder:
927 step = pulldiscoverymapping[stepname]
927 step = pulldiscoverymapping[stepname]
928 step(pullop)
928 step(pullop)
929
929
930 @pulldiscovery('changegroup')
930 @pulldiscovery('changegroup')
931 def _pulldiscoverychangegroup(pullop):
931 def _pulldiscoverychangegroup(pullop):
932 """discovery phase for the pull
932 """discovery phase for the pull
933
933
934 Current handle changeset discovery only, will change handle all discovery
934 Current handle changeset discovery only, will change handle all discovery
935 at some point."""
935 at some point."""
936 tmp = discovery.findcommonincoming(pullop.repo,
936 tmp = discovery.findcommonincoming(pullop.repo,
937 pullop.remote,
937 pullop.remote,
938 heads=pullop.heads,
938 heads=pullop.heads,
939 force=pullop.force)
939 force=pullop.force)
940 common, fetch, rheads = tmp
940 common, fetch, rheads = tmp
941 nm = pullop.repo.unfiltered().changelog.nodemap
941 nm = pullop.repo.unfiltered().changelog.nodemap
942 if fetch and rheads:
942 if fetch and rheads:
943 # If a remote heads in filtered locally, lets drop it from the unknown
943 # If a remote heads in filtered locally, lets drop it from the unknown
944 # remote heads and put in back in common.
944 # remote heads and put in back in common.
945 #
945 #
946 # This is a hackish solution to catch most of "common but locally
946 # This is a hackish solution to catch most of "common but locally
947 # hidden situation". We do not performs discovery on unfiltered
947 # hidden situation". We do not performs discovery on unfiltered
948 # repository because it end up doing a pathological amount of round
948 # repository because it end up doing a pathological amount of round
949 # trip for w huge amount of changeset we do not care about.
949 # trip for w huge amount of changeset we do not care about.
950 #
950 #
951 # If a set of such "common but filtered" changeset exist on the server
951 # If a set of such "common but filtered" changeset exist on the server
952 # but are not including a remote heads, we'll not be able to detect it,
952 # but are not including a remote heads, we'll not be able to detect it,
953 scommon = set(common)
953 scommon = set(common)
954 filteredrheads = []
954 filteredrheads = []
955 for n in rheads:
955 for n in rheads:
956 if n in nm:
956 if n in nm:
957 if n not in scommon:
957 if n not in scommon:
958 common.append(n)
958 common.append(n)
959 else:
959 else:
960 filteredrheads.append(n)
960 filteredrheads.append(n)
961 if not filteredrheads:
961 if not filteredrheads:
962 fetch = []
962 fetch = []
963 rheads = filteredrheads
963 rheads = filteredrheads
964 pullop.common = common
964 pullop.common = common
965 pullop.fetch = fetch
965 pullop.fetch = fetch
966 pullop.rheads = rheads
966 pullop.rheads = rheads
967
967
968 def _pullbundle2(pullop):
968 def _pullbundle2(pullop):
969 """pull data using bundle2
969 """pull data using bundle2
970
970
971 For now, the only supported data are changegroup."""
971 For now, the only supported data are changegroup."""
972 remotecaps = bundle2.bundle2caps(pullop.remote)
972 remotecaps = bundle2.bundle2caps(pullop.remote)
973 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
973 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
974 # pulling changegroup
974 # pulling changegroup
975 pullop.stepsdone.add('changegroup')
975 pullop.stepsdone.add('changegroup')
976
976
977 kwargs['common'] = pullop.common
977 kwargs['common'] = pullop.common
978 kwargs['heads'] = pullop.heads or pullop.rheads
978 kwargs['heads'] = pullop.heads or pullop.rheads
979 kwargs['cg'] = pullop.fetch
979 kwargs['cg'] = pullop.fetch
980 if 'listkeys' in remotecaps:
980 if 'listkeys' in remotecaps:
981 kwargs['listkeys'] = ['phase', 'bookmarks']
981 kwargs['listkeys'] = ['phase', 'bookmarks']
982 if not pullop.fetch:
982 if not pullop.fetch:
983 pullop.repo.ui.status(_("no changes found\n"))
983 pullop.repo.ui.status(_("no changes found\n"))
984 pullop.cgresult = 0
984 pullop.cgresult = 0
985 else:
985 else:
986 if pullop.heads is None and list(pullop.common) == [nullid]:
986 if pullop.heads is None and list(pullop.common) == [nullid]:
987 pullop.repo.ui.status(_("requesting all changes\n"))
987 pullop.repo.ui.status(_("requesting all changes\n"))
988 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
988 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
989 remoteversions = bundle2.obsmarkersversion(remotecaps)
989 remoteversions = bundle2.obsmarkersversion(remotecaps)
990 if obsolete.commonversion(remoteversions) is not None:
990 if obsolete.commonversion(remoteversions) is not None:
991 kwargs['obsmarkers'] = True
991 kwargs['obsmarkers'] = True
992 pullop.stepsdone.add('obsmarkers')
992 pullop.stepsdone.add('obsmarkers')
993 _pullbundle2extraprepare(pullop, kwargs)
993 _pullbundle2extraprepare(pullop, kwargs)
994 bundle = pullop.remote.getbundle('pull', **kwargs)
994 bundle = pullop.remote.getbundle('pull', **kwargs)
995 try:
995 try:
996 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
996 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
997 except error.BundleValueError, exc:
997 except error.BundleValueError, exc:
998 raise util.Abort('missing support for %s' % exc)
998 raise util.Abort('missing support for %s' % exc)
999
999
1000 if pullop.fetch:
1000 if pullop.fetch:
1001 results = [cg['return'] for cg in op.records['changegroup']]
1001 results = [cg['return'] for cg in op.records['changegroup']]
1002 pullop.cgresult = changegroup.combineresults(results)
1002 pullop.cgresult = changegroup.combineresults(results)
1003
1003
1004 # processing phases change
1004 # processing phases change
1005 for namespace, value in op.records['listkeys']:
1005 for namespace, value in op.records['listkeys']:
1006 if namespace == 'phases':
1006 if namespace == 'phases':
1007 _pullapplyphases(pullop, value)
1007 _pullapplyphases(pullop, value)
1008
1008
1009 # processing bookmark update
1009 # processing bookmark update
1010 for namespace, value in op.records['listkeys']:
1010 for namespace, value in op.records['listkeys']:
1011 if namespace == 'bookmarks':
1011 if namespace == 'bookmarks':
1012 pullop.remotebookmarks = value
1012 pullop.remotebookmarks = value
1013 _pullbookmarks(pullop)
1013 _pullbookmarks(pullop)
1014
1014
1015 def _pullbundle2extraprepare(pullop, kwargs):
1015 def _pullbundle2extraprepare(pullop, kwargs):
1016 """hook function so that extensions can extend the getbundle call"""
1016 """hook function so that extensions can extend the getbundle call"""
1017 pass
1017 pass
1018
1018
1019 def _pullchangeset(pullop):
1019 def _pullchangeset(pullop):
1020 """pull changeset from unbundle into the local repo"""
1020 """pull changeset from unbundle into the local repo"""
1021 # We delay the open of the transaction as late as possible so we
1021 # We delay the open of the transaction as late as possible so we
1022 # don't open transaction for nothing or you break future useful
1022 # don't open transaction for nothing or you break future useful
1023 # rollback call
1023 # rollback call
1024 if 'changegroup' in pullop.stepsdone:
1024 if 'changegroup' in pullop.stepsdone:
1025 return
1025 return
1026 pullop.stepsdone.add('changegroup')
1026 pullop.stepsdone.add('changegroup')
1027 if not pullop.fetch:
1027 if not pullop.fetch:
1028 pullop.repo.ui.status(_("no changes found\n"))
1028 pullop.repo.ui.status(_("no changes found\n"))
1029 pullop.cgresult = 0
1029 pullop.cgresult = 0
1030 return
1030 return
1031 pullop.gettransaction()
1031 pullop.gettransaction()
1032 if pullop.heads is None and list(pullop.common) == [nullid]:
1032 if pullop.heads is None and list(pullop.common) == [nullid]:
1033 pullop.repo.ui.status(_("requesting all changes\n"))
1033 pullop.repo.ui.status(_("requesting all changes\n"))
1034 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1034 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1035 # issue1320, avoid a race if remote changed after discovery
1035 # issue1320, avoid a race if remote changed after discovery
1036 pullop.heads = pullop.rheads
1036 pullop.heads = pullop.rheads
1037
1037
1038 if pullop.remote.capable('getbundle'):
1038 if pullop.remote.capable('getbundle'):
1039 # TODO: get bundlecaps from remote
1039 # TODO: get bundlecaps from remote
1040 cg = pullop.remote.getbundle('pull', common=pullop.common,
1040 cg = pullop.remote.getbundle('pull', common=pullop.common,
1041 heads=pullop.heads or pullop.rheads)
1041 heads=pullop.heads or pullop.rheads)
1042 elif pullop.heads is None:
1042 elif pullop.heads is None:
1043 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1043 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1044 elif not pullop.remote.capable('changegroupsubset'):
1044 elif not pullop.remote.capable('changegroupsubset'):
1045 raise util.Abort(_("partial pull cannot be done because "
1045 raise util.Abort(_("partial pull cannot be done because "
1046 "other repository doesn't support "
1046 "other repository doesn't support "
1047 "changegroupsubset."))
1047 "changegroupsubset."))
1048 else:
1048 else:
1049 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1049 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1050 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
1050 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
1051 pullop.remote.url())
1051 pullop.remote.url())
1052
1052
1053 def _pullphase(pullop):
1053 def _pullphase(pullop):
1054 # Get remote phases data from remote
1054 # Get remote phases data from remote
1055 if 'phases' in pullop.stepsdone:
1055 if 'phases' in pullop.stepsdone:
1056 return
1056 return
1057 remotephases = pullop.remote.listkeys('phases')
1057 remotephases = pullop.remote.listkeys('phases')
1058 _pullapplyphases(pullop, remotephases)
1058 _pullapplyphases(pullop, remotephases)
1059
1059
1060 def _pullapplyphases(pullop, remotephases):
1060 def _pullapplyphases(pullop, remotephases):
1061 """apply phase movement from observed remote state"""
1061 """apply phase movement from observed remote state"""
1062 if 'phases' in pullop.stepsdone:
1062 if 'phases' in pullop.stepsdone:
1063 return
1063 return
1064 pullop.stepsdone.add('phases')
1064 pullop.stepsdone.add('phases')
1065 publishing = bool(remotephases.get('publishing', False))
1065 publishing = bool(remotephases.get('publishing', False))
1066 if remotephases and not publishing:
1066 if remotephases and not publishing:
1067 # remote is new and unpublishing
1067 # remote is new and unpublishing
1068 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1068 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1069 pullop.pulledsubset,
1069 pullop.pulledsubset,
1070 remotephases)
1070 remotephases)
1071 dheads = pullop.pulledsubset
1071 dheads = pullop.pulledsubset
1072 else:
1072 else:
1073 # Remote is old or publishing all common changesets
1073 # Remote is old or publishing all common changesets
1074 # should be seen as public
1074 # should be seen as public
1075 pheads = pullop.pulledsubset
1075 pheads = pullop.pulledsubset
1076 dheads = []
1076 dheads = []
1077 unfi = pullop.repo.unfiltered()
1077 unfi = pullop.repo.unfiltered()
1078 phase = unfi._phasecache.phase
1078 phase = unfi._phasecache.phase
1079 rev = unfi.changelog.nodemap.get
1079 rev = unfi.changelog.nodemap.get
1080 public = phases.public
1080 public = phases.public
1081 draft = phases.draft
1081 draft = phases.draft
1082
1082
1083 # exclude changesets already public locally and update the others
1083 # exclude changesets already public locally and update the others
1084 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1084 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1085 if pheads:
1085 if pheads:
1086 tr = pullop.gettransaction()
1086 tr = pullop.gettransaction()
1087 phases.advanceboundary(pullop.repo, tr, public, pheads)
1087 phases.advanceboundary(pullop.repo, tr, public, pheads)
1088
1088
1089 # exclude changesets already draft locally and update the others
1089 # exclude changesets already draft locally and update the others
1090 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1090 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1091 if dheads:
1091 if dheads:
1092 tr = pullop.gettransaction()
1092 tr = pullop.gettransaction()
1093 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1093 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1094
1094
1095 def _pullbookmarks(pullop):
1095 def _pullbookmarks(pullop):
1096 """process the remote bookmark information to update the local one"""
1096 """process the remote bookmark information to update the local one"""
1097 if 'bookmarks' in pullop.stepsdone:
1097 if 'bookmarks' in pullop.stepsdone:
1098 return
1098 return
1099 pullop.stepsdone.add('bookmarks')
1099 pullop.stepsdone.add('bookmarks')
1100 repo = pullop.repo
1100 repo = pullop.repo
1101 remotebookmarks = pullop.remotebookmarks
1101 remotebookmarks = pullop.remotebookmarks
1102 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1102 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1103 pullop.remote.url(),
1103 pullop.remote.url(),
1104 pullop.gettransaction,
1104 pullop.gettransaction,
1105 explicit=pullop.explicitbookmarks)
1105 explicit=pullop.explicitbookmarks)
1106
1106
1107 def _pullobsolete(pullop):
1107 def _pullobsolete(pullop):
1108 """utility function to pull obsolete markers from a remote
1108 """utility function to pull obsolete markers from a remote
1109
1109
1110 The `gettransaction` is function that return the pull transaction, creating
1110 The `gettransaction` is function that return the pull transaction, creating
1111 one if necessary. We return the transaction to inform the calling code that
1111 one if necessary. We return the transaction to inform the calling code that
1112 a new transaction have been created (when applicable).
1112 a new transaction have been created (when applicable).
1113
1113
1114 Exists mostly to allow overriding for experimentation purpose"""
1114 Exists mostly to allow overriding for experimentation purpose"""
1115 if 'obsmarkers' in pullop.stepsdone:
1115 if 'obsmarkers' in pullop.stepsdone:
1116 return
1116 return
1117 pullop.stepsdone.add('obsmarkers')
1117 pullop.stepsdone.add('obsmarkers')
1118 tr = None
1118 tr = None
1119 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1119 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1120 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1120 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1121 remoteobs = pullop.remote.listkeys('obsolete')
1121 remoteobs = pullop.remote.listkeys('obsolete')
1122 if 'dump0' in remoteobs:
1122 if 'dump0' in remoteobs:
1123 tr = pullop.gettransaction()
1123 tr = pullop.gettransaction()
1124 for key in sorted(remoteobs, reverse=True):
1124 for key in sorted(remoteobs, reverse=True):
1125 if key.startswith('dump'):
1125 if key.startswith('dump'):
1126 data = base85.b85decode(remoteobs[key])
1126 data = base85.b85decode(remoteobs[key])
1127 pullop.repo.obsstore.mergemarkers(tr, data)
1127 pullop.repo.obsstore.mergemarkers(tr, data)
1128 pullop.repo.invalidatevolatilesets()
1128 pullop.repo.invalidatevolatilesets()
1129 return tr
1129 return tr
1130
1130
1131 def caps20to10(repo):
1131 def caps20to10(repo):
1132 """return a set with appropriate options to use bundle20 during getbundle"""
1132 """return a set with appropriate options to use bundle20 during getbundle"""
1133 caps = set(['HG20'])
1133 caps = set(['HG20'])
1134 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1134 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1135 caps.add('bundle2=' + urllib.quote(capsblob))
1135 caps.add('bundle2=' + urllib.quote(capsblob))
1136 return caps
1136 return caps
1137
1137
1138 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1138 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1139 getbundle2partsorder = []
1139 getbundle2partsorder = []
1140
1140
1141 # Mapping between step name and function
1141 # Mapping between step name and function
1142 #
1142 #
1143 # This exists to help extensions wrap steps if necessary
1143 # This exists to help extensions wrap steps if necessary
1144 getbundle2partsmapping = {}
1144 getbundle2partsmapping = {}
1145
1145
1146 def getbundle2partsgenerator(stepname, idx=None):
1146 def getbundle2partsgenerator(stepname, idx=None):
1147 """decorator for function generating bundle2 part for getbundle
1147 """decorator for function generating bundle2 part for getbundle
1148
1148
1149 The function is added to the step -> function mapping and appended to the
1149 The function is added to the step -> function mapping and appended to the
1150 list of steps. Beware that decorated functions will be added in order
1150 list of steps. Beware that decorated functions will be added in order
1151 (this may matter).
1151 (this may matter).
1152
1152
1153 You can only use this decorator for new steps, if you want to wrap a step
1153 You can only use this decorator for new steps, if you want to wrap a step
1154 from an extension, attack the getbundle2partsmapping dictionary directly."""
1154 from an extension, attack the getbundle2partsmapping dictionary directly."""
1155 def dec(func):
1155 def dec(func):
1156 assert stepname not in getbundle2partsmapping
1156 assert stepname not in getbundle2partsmapping
1157 getbundle2partsmapping[stepname] = func
1157 getbundle2partsmapping[stepname] = func
1158 if idx is None:
1158 if idx is None:
1159 getbundle2partsorder.append(stepname)
1159 getbundle2partsorder.append(stepname)
1160 else:
1160 else:
1161 getbundle2partsorder.insert(idx, stepname)
1161 getbundle2partsorder.insert(idx, stepname)
1162 return func
1162 return func
1163 return dec
1163 return dec
1164
1164
1165 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
1165 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
1166 **kwargs):
1166 **kwargs):
1167 """return a full bundle (with potentially multiple kind of parts)
1167 """return a full bundle (with potentially multiple kind of parts)
1168
1168
1169 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
1169 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
1170 passed. For now, the bundle can contain only changegroup, but this will
1170 passed. For now, the bundle can contain only changegroup, but this will
1171 changes when more part type will be available for bundle2.
1171 changes when more part type will be available for bundle2.
1172
1172
1173 This is different from changegroup.getchangegroup that only returns an HG10
1173 This is different from changegroup.getchangegroup that only returns an HG10
1174 changegroup bundle. They may eventually get reunited in the future when we
1174 changegroup bundle. They may eventually get reunited in the future when we
1175 have a clearer idea of the API we what to query different data.
1175 have a clearer idea of the API we what to query different data.
1176
1176
1177 The implementation is at a very early stage and will get massive rework
1177 The implementation is at a very early stage and will get massive rework
1178 when the API of bundle is refined.
1178 when the API of bundle is refined.
1179 """
1179 """
1180 # bundle10 case
1180 # bundle10 case
1181 usebundle2 = False
1181 usebundle2 = False
1182 if bundlecaps is not None:
1182 if bundlecaps is not None:
1183 usebundle2 = util.any((cap.startswith('HG2') for cap in bundlecaps))
1183 usebundle2 = util.any((cap.startswith('HG2') for cap in bundlecaps))
1184 if not usebundle2:
1184 if not usebundle2:
1185 if bundlecaps and not kwargs.get('cg', True):
1185 if bundlecaps and not kwargs.get('cg', True):
1186 raise ValueError(_('request for bundle10 must include changegroup'))
1186 raise ValueError(_('request for bundle10 must include changegroup'))
1187
1187
1188 if kwargs:
1188 if kwargs:
1189 raise ValueError(_('unsupported getbundle arguments: %s')
1189 raise ValueError(_('unsupported getbundle arguments: %s')
1190 % ', '.join(sorted(kwargs.keys())))
1190 % ', '.join(sorted(kwargs.keys())))
1191 return changegroup.getchangegroup(repo, source, heads=heads,
1191 return changegroup.getchangegroup(repo, source, heads=heads,
1192 common=common, bundlecaps=bundlecaps)
1192 common=common, bundlecaps=bundlecaps)
1193
1193
1194 # bundle20 case
1194 # bundle20 case
1195 b2caps = {}
1195 b2caps = {}
1196 for bcaps in bundlecaps:
1196 for bcaps in bundlecaps:
1197 if bcaps.startswith('bundle2='):
1197 if bcaps.startswith('bundle2='):
1198 blob = urllib.unquote(bcaps[len('bundle2='):])
1198 blob = urllib.unquote(bcaps[len('bundle2='):])
1199 b2caps.update(bundle2.decodecaps(blob))
1199 b2caps.update(bundle2.decodecaps(blob))
1200 bundler = bundle2.bundle20(repo.ui, b2caps)
1200 bundler = bundle2.bundle20(repo.ui, b2caps)
1201
1201
1202 kwargs['heads'] = heads
1202 kwargs['heads'] = heads
1203 kwargs['common'] = common
1203 kwargs['common'] = common
1204
1204
1205 for name in getbundle2partsorder:
1205 for name in getbundle2partsorder:
1206 func = getbundle2partsmapping[name]
1206 func = getbundle2partsmapping[name]
1207 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1207 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1208 **kwargs)
1208 **kwargs)
1209
1209
1210 return util.chunkbuffer(bundler.getchunks())
1210 return util.chunkbuffer(bundler.getchunks())
1211
1211
1212 @getbundle2partsgenerator('changegroup')
1212 @getbundle2partsgenerator('changegroup')
1213 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1213 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1214 b2caps=None, heads=None, common=None, **kwargs):
1214 b2caps=None, heads=None, common=None, **kwargs):
1215 """add a changegroup part to the requested bundle"""
1215 """add a changegroup part to the requested bundle"""
1216 cg = None
1216 cg = None
1217 if kwargs.get('cg', True):
1217 if kwargs.get('cg', True):
1218 # build changegroup bundle here.
1218 # build changegroup bundle here.
1219 version = None
1219 version = None
1220 cgversions = b2caps.get('changegroup')
1220 cgversions = b2caps.get('changegroup')
1221 if not cgversions: # 3.1 and 3.2 ship with an empty value
1221 if not cgversions: # 3.1 and 3.2 ship with an empty value
1222 cg = changegroup.getchangegroupraw(repo, source, heads=heads,
1222 cg = changegroup.getchangegroupraw(repo, source, heads=heads,
1223 common=common,
1223 common=common,
1224 bundlecaps=bundlecaps)
1224 bundlecaps=bundlecaps)
1225 else:
1225 else:
1226 cgversions = [v for v in cgversions if v in changegroup.packermap]
1226 cgversions = [v for v in cgversions if v in changegroup.packermap]
1227 if not cgversions:
1227 if not cgversions:
1228 raise ValueError(_('no common changegroup version'))
1228 raise ValueError(_('no common changegroup version'))
1229 version = max(cgversions)
1229 version = max(cgversions)
1230 cg = changegroup.getchangegroupraw(repo, source, heads=heads,
1230 cg = changegroup.getchangegroupraw(repo, source, heads=heads,
1231 common=common,
1231 common=common,
1232 bundlecaps=bundlecaps,
1232 bundlecaps=bundlecaps,
1233 version=version)
1233 version=version)
1234
1234
1235 if cg:
1235 if cg:
1236 part = bundler.newpart('changegroup', data=cg)
1236 part = bundler.newpart('changegroup', data=cg)
1237 if version is not None:
1237 if version is not None:
1238 part.addparam('version', version)
1238 part.addparam('version', version)
1239
1239
1240 @getbundle2partsgenerator('listkeys')
1240 @getbundle2partsgenerator('listkeys')
1241 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1241 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1242 b2caps=None, **kwargs):
1242 b2caps=None, **kwargs):
1243 """add parts containing listkeys namespaces to the requested bundle"""
1243 """add parts containing listkeys namespaces to the requested bundle"""
1244 listkeys = kwargs.get('listkeys', ())
1244 listkeys = kwargs.get('listkeys', ())
1245 for namespace in listkeys:
1245 for namespace in listkeys:
1246 part = bundler.newpart('listkeys')
1246 part = bundler.newpart('listkeys')
1247 part.addparam('namespace', namespace)
1247 part.addparam('namespace', namespace)
1248 keys = repo.listkeys(namespace).items()
1248 keys = repo.listkeys(namespace).items()
1249 part.data = pushkey.encodekeys(keys)
1249 part.data = pushkey.encodekeys(keys)
1250
1250
1251 @getbundle2partsgenerator('obsmarkers')
1251 @getbundle2partsgenerator('obsmarkers')
1252 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1252 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1253 b2caps=None, heads=None, **kwargs):
1253 b2caps=None, heads=None, **kwargs):
1254 """add an obsolescence markers part to the requested bundle"""
1254 """add an obsolescence markers part to the requested bundle"""
1255 if kwargs.get('obsmarkers', False):
1255 if kwargs.get('obsmarkers', False):
1256 if heads is None:
1256 if heads is None:
1257 heads = repo.heads()
1257 heads = repo.heads()
1258 subset = [c.node() for c in repo.set('::%ln', heads)]
1258 subset = [c.node() for c in repo.set('::%ln', heads)]
1259 markers = repo.obsstore.relevantmarkers(subset)
1259 markers = repo.obsstore.relevantmarkers(subset)
1260 buildobsmarkerspart(bundler, markers)
1260 buildobsmarkerspart(bundler, markers)
1261
1261
1262 def check_heads(repo, their_heads, context):
1262 def check_heads(repo, their_heads, context):
1263 """check if the heads of a repo have been modified
1263 """check if the heads of a repo have been modified
1264
1264
1265 Used by peer for unbundling.
1265 Used by peer for unbundling.
1266 """
1266 """
1267 heads = repo.heads()
1267 heads = repo.heads()
1268 heads_hash = util.sha1(''.join(sorted(heads))).digest()
1268 heads_hash = util.sha1(''.join(sorted(heads))).digest()
1269 if not (their_heads == ['force'] or their_heads == heads or
1269 if not (their_heads == ['force'] or their_heads == heads or
1270 their_heads == ['hashed', heads_hash]):
1270 their_heads == ['hashed', heads_hash]):
1271 # someone else committed/pushed/unbundled while we
1271 # someone else committed/pushed/unbundled while we
1272 # were transferring data
1272 # were transferring data
1273 raise error.PushRaced('repository changed while %s - '
1273 raise error.PushRaced('repository changed while %s - '
1274 'please try again' % context)
1274 'please try again' % context)
1275
1275
1276 def unbundle(repo, cg, heads, source, url):
1276 def unbundle(repo, cg, heads, source, url):
1277 """Apply a bundle to a repo.
1277 """Apply a bundle to a repo.
1278
1278
1279 this function makes sure the repo is locked during the application and have
1279 this function makes sure the repo is locked during the application and have
1280 mechanism to check that no push race occurred between the creation of the
1280 mechanism to check that no push race occurred between the creation of the
1281 bundle and its application.
1281 bundle and its application.
1282
1282
1283 If the push was raced as PushRaced exception is raised."""
1283 If the push was raced as PushRaced exception is raised."""
1284 r = 0
1284 r = 0
1285 # need a transaction when processing a bundle2 stream
1285 # need a transaction when processing a bundle2 stream
1286 wlock = lock = tr = None
1286 wlock = lock = tr = None
1287 try:
1287 try:
1288 check_heads(repo, heads, 'uploading changes')
1288 check_heads(repo, heads, 'uploading changes')
1289 # push can proceed
1289 # push can proceed
1290 if util.safehasattr(cg, 'params'):
1290 if util.safehasattr(cg, 'params'):
1291 r = None
1291 try:
1292 try:
1292 wlock = repo.wlock()
1293 wlock = repo.wlock()
1293 lock = repo.lock()
1294 lock = repo.lock()
1294 tr = repo.transaction(source)
1295 tr = repo.transaction(source)
1295 tr.hookargs['source'] = source
1296 tr.hookargs['source'] = source
1296 tr.hookargs['url'] = url
1297 tr.hookargs['url'] = url
1297 tr.hookargs['bundle2'] = '1'
1298 tr.hookargs['bundle2'] = '1'
1298 r = bundle2.processbundle(repo, cg, lambda: tr).reply
1299 r = bundle2.processbundle(repo, cg, lambda: tr).reply
1299 tr.close()
1300 tr.close()
1300 except Exception, exc:
1301 except Exception, exc:
1301 exc.duringunbundle2 = True
1302 exc.duringunbundle2 = True
1303 if r is not None:
1304 exc._bundle2salvagedoutput = r.salvageoutput()
1302 raise
1305 raise
1303 else:
1306 else:
1304 lock = repo.lock()
1307 lock = repo.lock()
1305 r = changegroup.addchangegroup(repo, cg, source, url)
1308 r = changegroup.addchangegroup(repo, cg, source, url)
1306 finally:
1309 finally:
1307 lockmod.release(tr, lock, wlock)
1310 lockmod.release(tr, lock, wlock)
1308 return r
1311 return r
General Comments 0
You need to be logged in to leave comments. Login now