##// END OF EJS Templates
bundle2: introduce a bundle2caps function...
Pierre-Yves David -
r21644:17755dd8 default
parent child Browse files
Show More
@@ -1,855 +1,862 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: (16 bits integer)
34 :params size: (16 bits integer)
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: (16 bits inter)
67 :header size: (16 bits inter)
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name
88 :parttype: alphanumerical part name
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
122 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 Bundle processing
128 Bundle processing
129 ============================
129 ============================
130
130
131 Each part is processed in order using a "part handler". Handler are registered
131 Each part is processed in order using a "part handler". Handler are registered
132 for a certain part type.
132 for a certain part type.
133
133
134 The matching of a part to its handler is case insensitive. The case of the
134 The matching of a part to its handler is case insensitive. The case of the
135 part type is used to know if a part is mandatory or advisory. If the Part type
135 part type is used to know if a part is mandatory or advisory. If the Part type
136 contains any uppercase char it is considered mandatory. When no handler is
136 contains any uppercase char it is considered mandatory. When no handler is
137 known for a Mandatory part, the process is aborted and an exception is raised.
137 known for a Mandatory part, the process is aborted and an exception is raised.
138 If the part is advisory and no handler is known, the part is ignored. When the
138 If the part is advisory and no handler is known, the part is ignored. When the
139 process is aborted, the full bundle is still read from the stream to keep the
139 process is aborted, the full bundle is still read from the stream to keep the
140 channel usable. But none of the part read from an abort are processed. In the
140 channel usable. But none of the part read from an abort are processed. In the
141 future, dropping the stream may become an option for channel we do not care to
141 future, dropping the stream may become an option for channel we do not care to
142 preserve.
142 preserve.
143 """
143 """
144
144
145 import util
145 import util
146 import struct
146 import struct
147 import urllib
147 import urllib
148 import string
148 import string
149
149
150 import changegroup, error
150 import changegroup, error
151 from i18n import _
151 from i18n import _
152
152
153 _pack = struct.pack
153 _pack = struct.pack
154 _unpack = struct.unpack
154 _unpack = struct.unpack
155
155
156 _magicstring = 'HG2X'
156 _magicstring = 'HG2X'
157
157
158 _fstreamparamsize = '>H'
158 _fstreamparamsize = '>H'
159 _fpartheadersize = '>H'
159 _fpartheadersize = '>H'
160 _fparttypesize = '>B'
160 _fparttypesize = '>B'
161 _fpartid = '>I'
161 _fpartid = '>I'
162 _fpayloadsize = '>I'
162 _fpayloadsize = '>I'
163 _fpartparamcount = '>BB'
163 _fpartparamcount = '>BB'
164
164
165 preferedchunksize = 4096
165 preferedchunksize = 4096
166
166
167 def _makefpartparamsizes(nbparams):
167 def _makefpartparamsizes(nbparams):
168 """return a struct format to read part parameter sizes
168 """return a struct format to read part parameter sizes
169
169
170 The number parameters is variable so we need to build that format
170 The number parameters is variable so we need to build that format
171 dynamically.
171 dynamically.
172 """
172 """
173 return '>'+('BB'*nbparams)
173 return '>'+('BB'*nbparams)
174
174
175 parthandlermapping = {}
175 parthandlermapping = {}
176
176
177 def parthandler(parttype, params=()):
177 def parthandler(parttype, params=()):
178 """decorator that register a function as a bundle2 part handler
178 """decorator that register a function as a bundle2 part handler
179
179
180 eg::
180 eg::
181
181
182 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
182 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
183 def myparttypehandler(...):
183 def myparttypehandler(...):
184 '''process a part of type "my part".'''
184 '''process a part of type "my part".'''
185 ...
185 ...
186 """
186 """
187 def _decorator(func):
187 def _decorator(func):
188 lparttype = parttype.lower() # enforce lower case matching.
188 lparttype = parttype.lower() # enforce lower case matching.
189 assert lparttype not in parthandlermapping
189 assert lparttype not in parthandlermapping
190 parthandlermapping[lparttype] = func
190 parthandlermapping[lparttype] = func
191 func.params = frozenset(params)
191 func.params = frozenset(params)
192 return func
192 return func
193 return _decorator
193 return _decorator
194
194
195 class unbundlerecords(object):
195 class unbundlerecords(object):
196 """keep record of what happens during and unbundle
196 """keep record of what happens during and unbundle
197
197
198 New records are added using `records.add('cat', obj)`. Where 'cat' is a
198 New records are added using `records.add('cat', obj)`. Where 'cat' is a
199 category of record and obj is an arbitrary object.
199 category of record and obj is an arbitrary object.
200
200
201 `records['cat']` will return all entries of this category 'cat'.
201 `records['cat']` will return all entries of this category 'cat'.
202
202
203 Iterating on the object itself will yield `('category', obj)` tuples
203 Iterating on the object itself will yield `('category', obj)` tuples
204 for all entries.
204 for all entries.
205
205
206 All iterations happens in chronological order.
206 All iterations happens in chronological order.
207 """
207 """
208
208
209 def __init__(self):
209 def __init__(self):
210 self._categories = {}
210 self._categories = {}
211 self._sequences = []
211 self._sequences = []
212 self._replies = {}
212 self._replies = {}
213
213
214 def add(self, category, entry, inreplyto=None):
214 def add(self, category, entry, inreplyto=None):
215 """add a new record of a given category.
215 """add a new record of a given category.
216
216
217 The entry can then be retrieved in the list returned by
217 The entry can then be retrieved in the list returned by
218 self['category']."""
218 self['category']."""
219 self._categories.setdefault(category, []).append(entry)
219 self._categories.setdefault(category, []).append(entry)
220 self._sequences.append((category, entry))
220 self._sequences.append((category, entry))
221 if inreplyto is not None:
221 if inreplyto is not None:
222 self.getreplies(inreplyto).add(category, entry)
222 self.getreplies(inreplyto).add(category, entry)
223
223
224 def getreplies(self, partid):
224 def getreplies(self, partid):
225 """get the subrecords that replies to a specific part"""
225 """get the subrecords that replies to a specific part"""
226 return self._replies.setdefault(partid, unbundlerecords())
226 return self._replies.setdefault(partid, unbundlerecords())
227
227
228 def __getitem__(self, cat):
228 def __getitem__(self, cat):
229 return tuple(self._categories.get(cat, ()))
229 return tuple(self._categories.get(cat, ()))
230
230
231 def __iter__(self):
231 def __iter__(self):
232 return iter(self._sequences)
232 return iter(self._sequences)
233
233
234 def __len__(self):
234 def __len__(self):
235 return len(self._sequences)
235 return len(self._sequences)
236
236
237 def __nonzero__(self):
237 def __nonzero__(self):
238 return bool(self._sequences)
238 return bool(self._sequences)
239
239
240 class bundleoperation(object):
240 class bundleoperation(object):
241 """an object that represents a single bundling process
241 """an object that represents a single bundling process
242
242
243 Its purpose is to carry unbundle-related objects and states.
243 Its purpose is to carry unbundle-related objects and states.
244
244
245 A new object should be created at the beginning of each bundle processing.
245 A new object should be created at the beginning of each bundle processing.
246 The object is to be returned by the processing function.
246 The object is to be returned by the processing function.
247
247
248 The object has very little content now it will ultimately contain:
248 The object has very little content now it will ultimately contain:
249 * an access to the repo the bundle is applied to,
249 * an access to the repo the bundle is applied to,
250 * a ui object,
250 * a ui object,
251 * a way to retrieve a transaction to add changes to the repo,
251 * a way to retrieve a transaction to add changes to the repo,
252 * a way to record the result of processing each part,
252 * a way to record the result of processing each part,
253 * a way to construct a bundle response when applicable.
253 * a way to construct a bundle response when applicable.
254 """
254 """
255
255
256 def __init__(self, repo, transactiongetter):
256 def __init__(self, repo, transactiongetter):
257 self.repo = repo
257 self.repo = repo
258 self.ui = repo.ui
258 self.ui = repo.ui
259 self.records = unbundlerecords()
259 self.records = unbundlerecords()
260 self.gettransaction = transactiongetter
260 self.gettransaction = transactiongetter
261 self.reply = None
261 self.reply = None
262
262
263 class TransactionUnavailable(RuntimeError):
263 class TransactionUnavailable(RuntimeError):
264 pass
264 pass
265
265
266 def _notransaction():
266 def _notransaction():
267 """default method to get a transaction while processing a bundle
267 """default method to get a transaction while processing a bundle
268
268
269 Raise an exception to highlight the fact that no transaction was expected
269 Raise an exception to highlight the fact that no transaction was expected
270 to be created"""
270 to be created"""
271 raise TransactionUnavailable()
271 raise TransactionUnavailable()
272
272
273 def processbundle(repo, unbundler, transactiongetter=_notransaction):
273 def processbundle(repo, unbundler, transactiongetter=_notransaction):
274 """This function process a bundle, apply effect to/from a repo
274 """This function process a bundle, apply effect to/from a repo
275
275
276 It iterates over each part then searches for and uses the proper handling
276 It iterates over each part then searches for and uses the proper handling
277 code to process the part. Parts are processed in order.
277 code to process the part. Parts are processed in order.
278
278
279 This is very early version of this function that will be strongly reworked
279 This is very early version of this function that will be strongly reworked
280 before final usage.
280 before final usage.
281
281
282 Unknown Mandatory part will abort the process.
282 Unknown Mandatory part will abort the process.
283 """
283 """
284 op = bundleoperation(repo, transactiongetter)
284 op = bundleoperation(repo, transactiongetter)
285 # todo:
285 # todo:
286 # - replace this is a init function soon.
286 # - replace this is a init function soon.
287 # - exception catching
287 # - exception catching
288 unbundler.params
288 unbundler.params
289 iterparts = unbundler.iterparts()
289 iterparts = unbundler.iterparts()
290 part = None
290 part = None
291 try:
291 try:
292 for part in iterparts:
292 for part in iterparts:
293 parttype = part.type
293 parttype = part.type
294 # part key are matched lower case
294 # part key are matched lower case
295 key = parttype.lower()
295 key = parttype.lower()
296 try:
296 try:
297 handler = parthandlermapping.get(key)
297 handler = parthandlermapping.get(key)
298 if handler is None:
298 if handler is None:
299 raise error.BundleValueError(parttype=key)
299 raise error.BundleValueError(parttype=key)
300 op.ui.debug('found a handler for part %r\n' % parttype)
300 op.ui.debug('found a handler for part %r\n' % parttype)
301 unknownparams = part.mandatorykeys - handler.params
301 unknownparams = part.mandatorykeys - handler.params
302 if unknownparams:
302 if unknownparams:
303 unknownparams = list(unknownparams)
303 unknownparams = list(unknownparams)
304 unknownparams.sort()
304 unknownparams.sort()
305 raise error.BundleValueError(parttype=key,
305 raise error.BundleValueError(parttype=key,
306 params=unknownparams)
306 params=unknownparams)
307 except error.BundleValueError, exc:
307 except error.BundleValueError, exc:
308 if key != parttype: # mandatory parts
308 if key != parttype: # mandatory parts
309 raise
309 raise
310 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
310 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
311 # consuming the part
311 # consuming the part
312 part.read()
312 part.read()
313 continue
313 continue
314
314
315
315
316 # handler is called outside the above try block so that we don't
316 # handler is called outside the above try block so that we don't
317 # risk catching KeyErrors from anything other than the
317 # risk catching KeyErrors from anything other than the
318 # parthandlermapping lookup (any KeyError raised by handler()
318 # parthandlermapping lookup (any KeyError raised by handler()
319 # itself represents a defect of a different variety).
319 # itself represents a defect of a different variety).
320 output = None
320 output = None
321 if op.reply is not None:
321 if op.reply is not None:
322 op.ui.pushbuffer(error=True)
322 op.ui.pushbuffer(error=True)
323 output = ''
323 output = ''
324 try:
324 try:
325 handler(op, part)
325 handler(op, part)
326 finally:
326 finally:
327 if output is not None:
327 if output is not None:
328 output = op.ui.popbuffer()
328 output = op.ui.popbuffer()
329 if output:
329 if output:
330 outpart = op.reply.newpart('b2x:output', data=output)
330 outpart = op.reply.newpart('b2x:output', data=output)
331 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
331 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
332 part.read()
332 part.read()
333 except Exception, exc:
333 except Exception, exc:
334 if part is not None:
334 if part is not None:
335 # consume the bundle content
335 # consume the bundle content
336 part.read()
336 part.read()
337 for part in iterparts:
337 for part in iterparts:
338 # consume the bundle content
338 # consume the bundle content
339 part.read()
339 part.read()
340 # Small hack to let caller code distinguish exceptions from bundle2
340 # Small hack to let caller code distinguish exceptions from bundle2
341 # processing fron the ones from bundle1 processing. This is mostly
341 # processing fron the ones from bundle1 processing. This is mostly
342 # needed to handle different return codes to unbundle according to the
342 # needed to handle different return codes to unbundle according to the
343 # type of bundle. We should probably clean up or drop this return code
343 # type of bundle. We should probably clean up or drop this return code
344 # craziness in a future version.
344 # craziness in a future version.
345 exc.duringunbundle2 = True
345 exc.duringunbundle2 = True
346 raise
346 raise
347 return op
347 return op
348
348
349 def decodecaps(blob):
349 def decodecaps(blob):
350 """decode a bundle2 caps bytes blob into a dictionnary
350 """decode a bundle2 caps bytes blob into a dictionnary
351
351
352 The blob is a list of capabilities (one per line)
352 The blob is a list of capabilities (one per line)
353 Capabilities may have values using a line of the form::
353 Capabilities may have values using a line of the form::
354
354
355 capability=value1,value2,value3
355 capability=value1,value2,value3
356
356
357 The values are always a list."""
357 The values are always a list."""
358 caps = {}
358 caps = {}
359 for line in blob.splitlines():
359 for line in blob.splitlines():
360 if not line:
360 if not line:
361 continue
361 continue
362 if '=' not in line:
362 if '=' not in line:
363 key, vals = line, ()
363 key, vals = line, ()
364 else:
364 else:
365 key, vals = line.split('=', 1)
365 key, vals = line.split('=', 1)
366 vals = vals.split(',')
366 vals = vals.split(',')
367 key = urllib.unquote(key)
367 key = urllib.unquote(key)
368 vals = [urllib.unquote(v) for v in vals]
368 vals = [urllib.unquote(v) for v in vals]
369 caps[key] = vals
369 caps[key] = vals
370 return caps
370 return caps
371
371
372 def encodecaps(caps):
372 def encodecaps(caps):
373 """encode a bundle2 caps dictionary into a bytes blob"""
373 """encode a bundle2 caps dictionary into a bytes blob"""
374 chunks = []
374 chunks = []
375 for ca in sorted(caps):
375 for ca in sorted(caps):
376 vals = caps[ca]
376 vals = caps[ca]
377 ca = urllib.quote(ca)
377 ca = urllib.quote(ca)
378 vals = [urllib.quote(v) for v in vals]
378 vals = [urllib.quote(v) for v in vals]
379 if vals:
379 if vals:
380 ca = "%s=%s" % (ca, ','.join(vals))
380 ca = "%s=%s" % (ca, ','.join(vals))
381 chunks.append(ca)
381 chunks.append(ca)
382 return '\n'.join(chunks)
382 return '\n'.join(chunks)
383
383
384 class bundle20(object):
384 class bundle20(object):
385 """represent an outgoing bundle2 container
385 """represent an outgoing bundle2 container
386
386
387 Use the `addparam` method to add stream level parameter. and `newpart` to
387 Use the `addparam` method to add stream level parameter. and `newpart` to
388 populate it. Then call `getchunks` to retrieve all the binary chunks of
388 populate it. Then call `getchunks` to retrieve all the binary chunks of
389 data that compose the bundle2 container."""
389 data that compose the bundle2 container."""
390
390
391 def __init__(self, ui, capabilities=()):
391 def __init__(self, ui, capabilities=()):
392 self.ui = ui
392 self.ui = ui
393 self._params = []
393 self._params = []
394 self._parts = []
394 self._parts = []
395 self.capabilities = dict(capabilities)
395 self.capabilities = dict(capabilities)
396
396
397 # methods used to defines the bundle2 content
397 # methods used to defines the bundle2 content
398 def addparam(self, name, value=None):
398 def addparam(self, name, value=None):
399 """add a stream level parameter"""
399 """add a stream level parameter"""
400 if not name:
400 if not name:
401 raise ValueError('empty parameter name')
401 raise ValueError('empty parameter name')
402 if name[0] not in string.letters:
402 if name[0] not in string.letters:
403 raise ValueError('non letter first character: %r' % name)
403 raise ValueError('non letter first character: %r' % name)
404 self._params.append((name, value))
404 self._params.append((name, value))
405
405
406 def addpart(self, part):
406 def addpart(self, part):
407 """add a new part to the bundle2 container
407 """add a new part to the bundle2 container
408
408
409 Parts contains the actual applicative payload."""
409 Parts contains the actual applicative payload."""
410 assert part.id is None
410 assert part.id is None
411 part.id = len(self._parts) # very cheap counter
411 part.id = len(self._parts) # very cheap counter
412 self._parts.append(part)
412 self._parts.append(part)
413
413
414 def newpart(self, typeid, *args, **kwargs):
414 def newpart(self, typeid, *args, **kwargs):
415 """create a new part and add it to the containers
415 """create a new part and add it to the containers
416
416
417 As the part is directly added to the containers. For now, this means
417 As the part is directly added to the containers. For now, this means
418 that any failure to properly initialize the part after calling
418 that any failure to properly initialize the part after calling
419 ``newpart`` should result in a failure of the whole bundling process.
419 ``newpart`` should result in a failure of the whole bundling process.
420
420
421 You can still fall back to manually create and add if you need better
421 You can still fall back to manually create and add if you need better
422 control."""
422 control."""
423 part = bundlepart(typeid, *args, **kwargs)
423 part = bundlepart(typeid, *args, **kwargs)
424 self.addpart(part)
424 self.addpart(part)
425 return part
425 return part
426
426
427 # methods used to generate the bundle2 stream
427 # methods used to generate the bundle2 stream
428 def getchunks(self):
428 def getchunks(self):
429 self.ui.debug('start emission of %s stream\n' % _magicstring)
429 self.ui.debug('start emission of %s stream\n' % _magicstring)
430 yield _magicstring
430 yield _magicstring
431 param = self._paramchunk()
431 param = self._paramchunk()
432 self.ui.debug('bundle parameter: %s\n' % param)
432 self.ui.debug('bundle parameter: %s\n' % param)
433 yield _pack(_fstreamparamsize, len(param))
433 yield _pack(_fstreamparamsize, len(param))
434 if param:
434 if param:
435 yield param
435 yield param
436
436
437 self.ui.debug('start of parts\n')
437 self.ui.debug('start of parts\n')
438 for part in self._parts:
438 for part in self._parts:
439 self.ui.debug('bundle part: "%s"\n' % part.type)
439 self.ui.debug('bundle part: "%s"\n' % part.type)
440 for chunk in part.getchunks():
440 for chunk in part.getchunks():
441 yield chunk
441 yield chunk
442 self.ui.debug('end of bundle\n')
442 self.ui.debug('end of bundle\n')
443 yield '\0\0'
443 yield '\0\0'
444
444
445 def _paramchunk(self):
445 def _paramchunk(self):
446 """return a encoded version of all stream parameters"""
446 """return a encoded version of all stream parameters"""
447 blocks = []
447 blocks = []
448 for par, value in self._params:
448 for par, value in self._params:
449 par = urllib.quote(par)
449 par = urllib.quote(par)
450 if value is not None:
450 if value is not None:
451 value = urllib.quote(value)
451 value = urllib.quote(value)
452 par = '%s=%s' % (par, value)
452 par = '%s=%s' % (par, value)
453 blocks.append(par)
453 blocks.append(par)
454 return ' '.join(blocks)
454 return ' '.join(blocks)
455
455
456 class unpackermixin(object):
456 class unpackermixin(object):
457 """A mixin to extract bytes and struct data from a stream"""
457 """A mixin to extract bytes and struct data from a stream"""
458
458
459 def __init__(self, fp):
459 def __init__(self, fp):
460 self._fp = fp
460 self._fp = fp
461
461
462 def _unpack(self, format):
462 def _unpack(self, format):
463 """unpack this struct format from the stream"""
463 """unpack this struct format from the stream"""
464 data = self._readexact(struct.calcsize(format))
464 data = self._readexact(struct.calcsize(format))
465 return _unpack(format, data)
465 return _unpack(format, data)
466
466
467 def _readexact(self, size):
467 def _readexact(self, size):
468 """read exactly <size> bytes from the stream"""
468 """read exactly <size> bytes from the stream"""
469 return changegroup.readexactly(self._fp, size)
469 return changegroup.readexactly(self._fp, size)
470
470
471
471
472 class unbundle20(unpackermixin):
472 class unbundle20(unpackermixin):
473 """interpret a bundle2 stream
473 """interpret a bundle2 stream
474
474
475 This class is fed with a binary stream and yields parts through its
475 This class is fed with a binary stream and yields parts through its
476 `iterparts` methods."""
476 `iterparts` methods."""
477
477
478 def __init__(self, ui, fp, header=None):
478 def __init__(self, ui, fp, header=None):
479 """If header is specified, we do not read it out of the stream."""
479 """If header is specified, we do not read it out of the stream."""
480 self.ui = ui
480 self.ui = ui
481 super(unbundle20, self).__init__(fp)
481 super(unbundle20, self).__init__(fp)
482 if header is None:
482 if header is None:
483 header = self._readexact(4)
483 header = self._readexact(4)
484 magic, version = header[0:2], header[2:4]
484 magic, version = header[0:2], header[2:4]
485 if magic != 'HG':
485 if magic != 'HG':
486 raise util.Abort(_('not a Mercurial bundle'))
486 raise util.Abort(_('not a Mercurial bundle'))
487 if version != '2X':
487 if version != '2X':
488 raise util.Abort(_('unknown bundle version %s') % version)
488 raise util.Abort(_('unknown bundle version %s') % version)
489 self.ui.debug('start processing of %s stream\n' % header)
489 self.ui.debug('start processing of %s stream\n' % header)
490
490
491 @util.propertycache
491 @util.propertycache
492 def params(self):
492 def params(self):
493 """dictionary of stream level parameters"""
493 """dictionary of stream level parameters"""
494 self.ui.debug('reading bundle2 stream parameters\n')
494 self.ui.debug('reading bundle2 stream parameters\n')
495 params = {}
495 params = {}
496 paramssize = self._unpack(_fstreamparamsize)[0]
496 paramssize = self._unpack(_fstreamparamsize)[0]
497 if paramssize:
497 if paramssize:
498 for p in self._readexact(paramssize).split(' '):
498 for p in self._readexact(paramssize).split(' '):
499 p = p.split('=', 1)
499 p = p.split('=', 1)
500 p = [urllib.unquote(i) for i in p]
500 p = [urllib.unquote(i) for i in p]
501 if len(p) < 2:
501 if len(p) < 2:
502 p.append(None)
502 p.append(None)
503 self._processparam(*p)
503 self._processparam(*p)
504 params[p[0]] = p[1]
504 params[p[0]] = p[1]
505 return params
505 return params
506
506
507 def _processparam(self, name, value):
507 def _processparam(self, name, value):
508 """process a parameter, applying its effect if needed
508 """process a parameter, applying its effect if needed
509
509
510 Parameter starting with a lower case letter are advisory and will be
510 Parameter starting with a lower case letter are advisory and will be
511 ignored when unknown. Those starting with an upper case letter are
511 ignored when unknown. Those starting with an upper case letter are
512 mandatory and will this function will raise a KeyError when unknown.
512 mandatory and will this function will raise a KeyError when unknown.
513
513
514 Note: no option are currently supported. Any input will be either
514 Note: no option are currently supported. Any input will be either
515 ignored or failing.
515 ignored or failing.
516 """
516 """
517 if not name:
517 if not name:
518 raise ValueError('empty parameter name')
518 raise ValueError('empty parameter name')
519 if name[0] not in string.letters:
519 if name[0] not in string.letters:
520 raise ValueError('non letter first character: %r' % name)
520 raise ValueError('non letter first character: %r' % name)
521 # Some logic will be later added here to try to process the option for
521 # Some logic will be later added here to try to process the option for
522 # a dict of known parameter.
522 # a dict of known parameter.
523 if name[0].islower():
523 if name[0].islower():
524 self.ui.debug("ignoring unknown parameter %r\n" % name)
524 self.ui.debug("ignoring unknown parameter %r\n" % name)
525 else:
525 else:
526 raise error.BundleValueError(params=(name,))
526 raise error.BundleValueError(params=(name,))
527
527
528
528
529 def iterparts(self):
529 def iterparts(self):
530 """yield all parts contained in the stream"""
530 """yield all parts contained in the stream"""
531 # make sure param have been loaded
531 # make sure param have been loaded
532 self.params
532 self.params
533 self.ui.debug('start extraction of bundle2 parts\n')
533 self.ui.debug('start extraction of bundle2 parts\n')
534 headerblock = self._readpartheader()
534 headerblock = self._readpartheader()
535 while headerblock is not None:
535 while headerblock is not None:
536 part = unbundlepart(self.ui, headerblock, self._fp)
536 part = unbundlepart(self.ui, headerblock, self._fp)
537 yield part
537 yield part
538 headerblock = self._readpartheader()
538 headerblock = self._readpartheader()
539 self.ui.debug('end of bundle2 stream\n')
539 self.ui.debug('end of bundle2 stream\n')
540
540
541 def _readpartheader(self):
541 def _readpartheader(self):
542 """reads a part header size and return the bytes blob
542 """reads a part header size and return the bytes blob
543
543
544 returns None if empty"""
544 returns None if empty"""
545 headersize = self._unpack(_fpartheadersize)[0]
545 headersize = self._unpack(_fpartheadersize)[0]
546 self.ui.debug('part header size: %i\n' % headersize)
546 self.ui.debug('part header size: %i\n' % headersize)
547 if headersize:
547 if headersize:
548 return self._readexact(headersize)
548 return self._readexact(headersize)
549 return None
549 return None
550
550
551
551
552 class bundlepart(object):
552 class bundlepart(object):
553 """A bundle2 part contains application level payload
553 """A bundle2 part contains application level payload
554
554
555 The part `type` is used to route the part to the application level
555 The part `type` is used to route the part to the application level
556 handler.
556 handler.
557
557
558 The part payload is contained in ``part.data``. It could be raw bytes or a
558 The part payload is contained in ``part.data``. It could be raw bytes or a
559 generator of byte chunks.
559 generator of byte chunks.
560
560
561 You can add parameters to the part using the ``addparam`` method.
561 You can add parameters to the part using the ``addparam`` method.
562 Parameters can be either mandatory (default) or advisory. Remote side
562 Parameters can be either mandatory (default) or advisory. Remote side
563 should be able to safely ignore the advisory ones.
563 should be able to safely ignore the advisory ones.
564
564
565 Both data and parameters cannot be modified after the generation has begun.
565 Both data and parameters cannot be modified after the generation has begun.
566 """
566 """
567
567
568 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
568 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
569 data=''):
569 data=''):
570 self.id = None
570 self.id = None
571 self.type = parttype
571 self.type = parttype
572 self._data = data
572 self._data = data
573 self._mandatoryparams = list(mandatoryparams)
573 self._mandatoryparams = list(mandatoryparams)
574 self._advisoryparams = list(advisoryparams)
574 self._advisoryparams = list(advisoryparams)
575 # checking for duplicated entries
575 # checking for duplicated entries
576 self._seenparams = set()
576 self._seenparams = set()
577 for pname, __ in self._mandatoryparams + self._advisoryparams:
577 for pname, __ in self._mandatoryparams + self._advisoryparams:
578 if pname in self._seenparams:
578 if pname in self._seenparams:
579 raise RuntimeError('duplicated params: %s' % pname)
579 raise RuntimeError('duplicated params: %s' % pname)
580 self._seenparams.add(pname)
580 self._seenparams.add(pname)
581 # status of the part's generation:
581 # status of the part's generation:
582 # - None: not started,
582 # - None: not started,
583 # - False: currently generated,
583 # - False: currently generated,
584 # - True: generation done.
584 # - True: generation done.
585 self._generated = None
585 self._generated = None
586
586
587 # methods used to defines the part content
587 # methods used to defines the part content
588 def __setdata(self, data):
588 def __setdata(self, data):
589 if self._generated is not None:
589 if self._generated is not None:
590 raise error.ReadOnlyPartError('part is being generated')
590 raise error.ReadOnlyPartError('part is being generated')
591 self._data = data
591 self._data = data
592 def __getdata(self):
592 def __getdata(self):
593 return self._data
593 return self._data
594 data = property(__getdata, __setdata)
594 data = property(__getdata, __setdata)
595
595
596 @property
596 @property
597 def mandatoryparams(self):
597 def mandatoryparams(self):
598 # make it an immutable tuple to force people through ``addparam``
598 # make it an immutable tuple to force people through ``addparam``
599 return tuple(self._mandatoryparams)
599 return tuple(self._mandatoryparams)
600
600
601 @property
601 @property
602 def advisoryparams(self):
602 def advisoryparams(self):
603 # make it an immutable tuple to force people through ``addparam``
603 # make it an immutable tuple to force people through ``addparam``
604 return tuple(self._advisoryparams)
604 return tuple(self._advisoryparams)
605
605
606 def addparam(self, name, value='', mandatory=True):
606 def addparam(self, name, value='', mandatory=True):
607 if self._generated is not None:
607 if self._generated is not None:
608 raise error.ReadOnlyPartError('part is being generated')
608 raise error.ReadOnlyPartError('part is being generated')
609 if name in self._seenparams:
609 if name in self._seenparams:
610 raise ValueError('duplicated params: %s' % name)
610 raise ValueError('duplicated params: %s' % name)
611 self._seenparams.add(name)
611 self._seenparams.add(name)
612 params = self._advisoryparams
612 params = self._advisoryparams
613 if mandatory:
613 if mandatory:
614 params = self._mandatoryparams
614 params = self._mandatoryparams
615 params.append((name, value))
615 params.append((name, value))
616
616
617 # methods used to generates the bundle2 stream
617 # methods used to generates the bundle2 stream
618 def getchunks(self):
618 def getchunks(self):
619 if self._generated is not None:
619 if self._generated is not None:
620 raise RuntimeError('part can only be consumed once')
620 raise RuntimeError('part can only be consumed once')
621 self._generated = False
621 self._generated = False
622 #### header
622 #### header
623 ## parttype
623 ## parttype
624 header = [_pack(_fparttypesize, len(self.type)),
624 header = [_pack(_fparttypesize, len(self.type)),
625 self.type, _pack(_fpartid, self.id),
625 self.type, _pack(_fpartid, self.id),
626 ]
626 ]
627 ## parameters
627 ## parameters
628 # count
628 # count
629 manpar = self.mandatoryparams
629 manpar = self.mandatoryparams
630 advpar = self.advisoryparams
630 advpar = self.advisoryparams
631 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
631 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
632 # size
632 # size
633 parsizes = []
633 parsizes = []
634 for key, value in manpar:
634 for key, value in manpar:
635 parsizes.append(len(key))
635 parsizes.append(len(key))
636 parsizes.append(len(value))
636 parsizes.append(len(value))
637 for key, value in advpar:
637 for key, value in advpar:
638 parsizes.append(len(key))
638 parsizes.append(len(key))
639 parsizes.append(len(value))
639 parsizes.append(len(value))
640 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
640 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
641 header.append(paramsizes)
641 header.append(paramsizes)
642 # key, value
642 # key, value
643 for key, value in manpar:
643 for key, value in manpar:
644 header.append(key)
644 header.append(key)
645 header.append(value)
645 header.append(value)
646 for key, value in advpar:
646 for key, value in advpar:
647 header.append(key)
647 header.append(key)
648 header.append(value)
648 header.append(value)
649 ## finalize header
649 ## finalize header
650 headerchunk = ''.join(header)
650 headerchunk = ''.join(header)
651 yield _pack(_fpartheadersize, len(headerchunk))
651 yield _pack(_fpartheadersize, len(headerchunk))
652 yield headerchunk
652 yield headerchunk
653 ## payload
653 ## payload
654 for chunk in self._payloadchunks():
654 for chunk in self._payloadchunks():
655 yield _pack(_fpayloadsize, len(chunk))
655 yield _pack(_fpayloadsize, len(chunk))
656 yield chunk
656 yield chunk
657 # end of payload
657 # end of payload
658 yield _pack(_fpayloadsize, 0)
658 yield _pack(_fpayloadsize, 0)
659 self._generated = True
659 self._generated = True
660
660
661 def _payloadchunks(self):
661 def _payloadchunks(self):
662 """yield chunks of a the part payload
662 """yield chunks of a the part payload
663
663
664 Exists to handle the different methods to provide data to a part."""
664 Exists to handle the different methods to provide data to a part."""
665 # we only support fixed size data now.
665 # we only support fixed size data now.
666 # This will be improved in the future.
666 # This will be improved in the future.
667 if util.safehasattr(self.data, 'next'):
667 if util.safehasattr(self.data, 'next'):
668 buff = util.chunkbuffer(self.data)
668 buff = util.chunkbuffer(self.data)
669 chunk = buff.read(preferedchunksize)
669 chunk = buff.read(preferedchunksize)
670 while chunk:
670 while chunk:
671 yield chunk
671 yield chunk
672 chunk = buff.read(preferedchunksize)
672 chunk = buff.read(preferedchunksize)
673 elif len(self.data):
673 elif len(self.data):
674 yield self.data
674 yield self.data
675
675
676 class unbundlepart(unpackermixin):
676 class unbundlepart(unpackermixin):
677 """a bundle part read from a bundle"""
677 """a bundle part read from a bundle"""
678
678
679 def __init__(self, ui, header, fp):
679 def __init__(self, ui, header, fp):
680 super(unbundlepart, self).__init__(fp)
680 super(unbundlepart, self).__init__(fp)
681 self.ui = ui
681 self.ui = ui
682 # unbundle state attr
682 # unbundle state attr
683 self._headerdata = header
683 self._headerdata = header
684 self._headeroffset = 0
684 self._headeroffset = 0
685 self._initialized = False
685 self._initialized = False
686 self.consumed = False
686 self.consumed = False
687 # part data
687 # part data
688 self.id = None
688 self.id = None
689 self.type = None
689 self.type = None
690 self.mandatoryparams = None
690 self.mandatoryparams = None
691 self.advisoryparams = None
691 self.advisoryparams = None
692 self.params = None
692 self.params = None
693 self.mandatorykeys = ()
693 self.mandatorykeys = ()
694 self._payloadstream = None
694 self._payloadstream = None
695 self._readheader()
695 self._readheader()
696
696
697 def _fromheader(self, size):
697 def _fromheader(self, size):
698 """return the next <size> byte from the header"""
698 """return the next <size> byte from the header"""
699 offset = self._headeroffset
699 offset = self._headeroffset
700 data = self._headerdata[offset:(offset + size)]
700 data = self._headerdata[offset:(offset + size)]
701 self._headeroffset = offset + size
701 self._headeroffset = offset + size
702 return data
702 return data
703
703
704 def _unpackheader(self, format):
704 def _unpackheader(self, format):
705 """read given format from header
705 """read given format from header
706
706
707 This automatically compute the size of the format to read."""
707 This automatically compute the size of the format to read."""
708 data = self._fromheader(struct.calcsize(format))
708 data = self._fromheader(struct.calcsize(format))
709 return _unpack(format, data)
709 return _unpack(format, data)
710
710
711 def _initparams(self, mandatoryparams, advisoryparams):
711 def _initparams(self, mandatoryparams, advisoryparams):
712 """internal function to setup all logic related parameters"""
712 """internal function to setup all logic related parameters"""
713 # make it read only to prevent people touching it by mistake.
713 # make it read only to prevent people touching it by mistake.
714 self.mandatoryparams = tuple(mandatoryparams)
714 self.mandatoryparams = tuple(mandatoryparams)
715 self.advisoryparams = tuple(advisoryparams)
715 self.advisoryparams = tuple(advisoryparams)
716 # user friendly UI
716 # user friendly UI
717 self.params = dict(self.mandatoryparams)
717 self.params = dict(self.mandatoryparams)
718 self.params.update(dict(self.advisoryparams))
718 self.params.update(dict(self.advisoryparams))
719 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
719 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
720
720
721 def _readheader(self):
721 def _readheader(self):
722 """read the header and setup the object"""
722 """read the header and setup the object"""
723 typesize = self._unpackheader(_fparttypesize)[0]
723 typesize = self._unpackheader(_fparttypesize)[0]
724 self.type = self._fromheader(typesize)
724 self.type = self._fromheader(typesize)
725 self.ui.debug('part type: "%s"\n' % self.type)
725 self.ui.debug('part type: "%s"\n' % self.type)
726 self.id = self._unpackheader(_fpartid)[0]
726 self.id = self._unpackheader(_fpartid)[0]
727 self.ui.debug('part id: "%s"\n' % self.id)
727 self.ui.debug('part id: "%s"\n' % self.id)
728 ## reading parameters
728 ## reading parameters
729 # param count
729 # param count
730 mancount, advcount = self._unpackheader(_fpartparamcount)
730 mancount, advcount = self._unpackheader(_fpartparamcount)
731 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
731 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
732 # param size
732 # param size
733 fparamsizes = _makefpartparamsizes(mancount + advcount)
733 fparamsizes = _makefpartparamsizes(mancount + advcount)
734 paramsizes = self._unpackheader(fparamsizes)
734 paramsizes = self._unpackheader(fparamsizes)
735 # make it a list of couple again
735 # make it a list of couple again
736 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
736 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
737 # split mandatory from advisory
737 # split mandatory from advisory
738 mansizes = paramsizes[:mancount]
738 mansizes = paramsizes[:mancount]
739 advsizes = paramsizes[mancount:]
739 advsizes = paramsizes[mancount:]
740 # retrive param value
740 # retrive param value
741 manparams = []
741 manparams = []
742 for key, value in mansizes:
742 for key, value in mansizes:
743 manparams.append((self._fromheader(key), self._fromheader(value)))
743 manparams.append((self._fromheader(key), self._fromheader(value)))
744 advparams = []
744 advparams = []
745 for key, value in advsizes:
745 for key, value in advsizes:
746 advparams.append((self._fromheader(key), self._fromheader(value)))
746 advparams.append((self._fromheader(key), self._fromheader(value)))
747 self._initparams(manparams, advparams)
747 self._initparams(manparams, advparams)
748 ## part payload
748 ## part payload
749 def payloadchunks():
749 def payloadchunks():
750 payloadsize = self._unpack(_fpayloadsize)[0]
750 payloadsize = self._unpack(_fpayloadsize)[0]
751 self.ui.debug('payload chunk size: %i\n' % payloadsize)
751 self.ui.debug('payload chunk size: %i\n' % payloadsize)
752 while payloadsize:
752 while payloadsize:
753 yield self._readexact(payloadsize)
753 yield self._readexact(payloadsize)
754 payloadsize = self._unpack(_fpayloadsize)[0]
754 payloadsize = self._unpack(_fpayloadsize)[0]
755 self.ui.debug('payload chunk size: %i\n' % payloadsize)
755 self.ui.debug('payload chunk size: %i\n' % payloadsize)
756 self._payloadstream = util.chunkbuffer(payloadchunks())
756 self._payloadstream = util.chunkbuffer(payloadchunks())
757 # we read the data, tell it
757 # we read the data, tell it
758 self._initialized = True
758 self._initialized = True
759
759
760 def read(self, size=None):
760 def read(self, size=None):
761 """read payload data"""
761 """read payload data"""
762 if not self._initialized:
762 if not self._initialized:
763 self._readheader()
763 self._readheader()
764 if size is None:
764 if size is None:
765 data = self._payloadstream.read()
765 data = self._payloadstream.read()
766 else:
766 else:
767 data = self._payloadstream.read(size)
767 data = self._payloadstream.read(size)
768 if size is None or len(data) < size:
768 if size is None or len(data) < size:
769 self.consumed = True
769 self.consumed = True
770 return data
770 return data
771
771
772 def bundle2caps(remote):
773 """return the bundlecapabilities of a peer as dict"""
774 raw = remote.capable('bundle2-exp')
775 if not raw and raw != '':
776 return {}
777 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
778 return decodecaps(capsblob)
772
779
773 @parthandler('b2x:changegroup')
780 @parthandler('b2x:changegroup')
774 def handlechangegroup(op, inpart):
781 def handlechangegroup(op, inpart):
775 """apply a changegroup part on the repo
782 """apply a changegroup part on the repo
776
783
777 This is a very early implementation that will massive rework before being
784 This is a very early implementation that will massive rework before being
778 inflicted to any end-user.
785 inflicted to any end-user.
779 """
786 """
780 # Make sure we trigger a transaction creation
787 # Make sure we trigger a transaction creation
781 #
788 #
782 # The addchangegroup function will get a transaction object by itself, but
789 # The addchangegroup function will get a transaction object by itself, but
783 # we need to make sure we trigger the creation of a transaction object used
790 # we need to make sure we trigger the creation of a transaction object used
784 # for the whole processing scope.
791 # for the whole processing scope.
785 op.gettransaction()
792 op.gettransaction()
786 cg = changegroup.unbundle10(inpart, 'UN')
793 cg = changegroup.unbundle10(inpart, 'UN')
787 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
794 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
788 op.records.add('changegroup', {'return': ret})
795 op.records.add('changegroup', {'return': ret})
789 if op.reply is not None:
796 if op.reply is not None:
790 # This is definitly not the final form of this
797 # This is definitly not the final form of this
791 # return. But one need to start somewhere.
798 # return. But one need to start somewhere.
792 part = op.reply.newpart('b2x:reply:changegroup')
799 part = op.reply.newpart('b2x:reply:changegroup')
793 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
800 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
794 part.addparam('return', '%i' % ret, mandatory=False)
801 part.addparam('return', '%i' % ret, mandatory=False)
795 assert not inpart.read()
802 assert not inpart.read()
796
803
797 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
804 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
798 def handlechangegroup(op, inpart):
805 def handlechangegroup(op, inpart):
799 ret = int(inpart.params['return'])
806 ret = int(inpart.params['return'])
800 replyto = int(inpart.params['in-reply-to'])
807 replyto = int(inpart.params['in-reply-to'])
801 op.records.add('changegroup', {'return': ret}, replyto)
808 op.records.add('changegroup', {'return': ret}, replyto)
802
809
803 @parthandler('b2x:check:heads')
810 @parthandler('b2x:check:heads')
804 def handlechangegroup(op, inpart):
811 def handlechangegroup(op, inpart):
805 """check that head of the repo did not change
812 """check that head of the repo did not change
806
813
807 This is used to detect a push race when using unbundle.
814 This is used to detect a push race when using unbundle.
808 This replaces the "heads" argument of unbundle."""
815 This replaces the "heads" argument of unbundle."""
809 h = inpart.read(20)
816 h = inpart.read(20)
810 heads = []
817 heads = []
811 while len(h) == 20:
818 while len(h) == 20:
812 heads.append(h)
819 heads.append(h)
813 h = inpart.read(20)
820 h = inpart.read(20)
814 assert not h
821 assert not h
815 if heads != op.repo.heads():
822 if heads != op.repo.heads():
816 raise error.PushRaced('repository changed while pushing - '
823 raise error.PushRaced('repository changed while pushing - '
817 'please try again')
824 'please try again')
818
825
819 @parthandler('b2x:output')
826 @parthandler('b2x:output')
820 def handleoutput(op, inpart):
827 def handleoutput(op, inpart):
821 """forward output captured on the server to the client"""
828 """forward output captured on the server to the client"""
822 for line in inpart.read().splitlines():
829 for line in inpart.read().splitlines():
823 op.ui.write(('remote: %s\n' % line))
830 op.ui.write(('remote: %s\n' % line))
824
831
825 @parthandler('b2x:replycaps')
832 @parthandler('b2x:replycaps')
826 def handlereplycaps(op, inpart):
833 def handlereplycaps(op, inpart):
827 """Notify that a reply bundle should be created
834 """Notify that a reply bundle should be created
828
835
829 The payload contains the capabilities information for the reply"""
836 The payload contains the capabilities information for the reply"""
830 caps = decodecaps(inpart.read())
837 caps = decodecaps(inpart.read())
831 if op.reply is None:
838 if op.reply is None:
832 op.reply = bundle20(op.ui, caps)
839 op.reply = bundle20(op.ui, caps)
833
840
834 @parthandler('b2x:error:abort', ('message', 'hint'))
841 @parthandler('b2x:error:abort', ('message', 'hint'))
835 def handlereplycaps(op, inpart):
842 def handlereplycaps(op, inpart):
836 """Used to transmit abort error over the wire"""
843 """Used to transmit abort error over the wire"""
837 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
844 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
838
845
839 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
846 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
840 def handlereplycaps(op, inpart):
847 def handlereplycaps(op, inpart):
841 """Used to transmit unknown content error over the wire"""
848 """Used to transmit unknown content error over the wire"""
842 kwargs = {}
849 kwargs = {}
843 parttype = inpart.params.get('parttype')
850 parttype = inpart.params.get('parttype')
844 if parttype is not None:
851 if parttype is not None:
845 kwargs['parttype'] = parttype
852 kwargs['parttype'] = parttype
846 params = inpart.params.get('params')
853 params = inpart.params.get('params')
847 if params is not None:
854 if params is not None:
848 kwargs['params'] = params.split('\0')
855 kwargs['params'] = params.split('\0')
849
856
850 raise error.BundleValueError(**kwargs)
857 raise error.BundleValueError(**kwargs)
851
858
852 @parthandler('b2x:error:pushraced', ('message',))
859 @parthandler('b2x:error:pushraced', ('message',))
853 def handlereplycaps(op, inpart):
860 def handlereplycaps(op, inpart):
854 """Used to transmit push race error over the wire"""
861 """Used to transmit push race error over the wire"""
855 raise error.ResponseError(_('push failed:'), inpart.params['message'])
862 raise error.ResponseError(_('push failed:'), inpart.params['message'])
@@ -1,730 +1,728 b''
1 # exchange.py - utility to exchange data between repos.
1 # exchange.py - utility to exchange data between repos.
2 #
2 #
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from i18n import _
8 from i18n import _
9 from node import hex, nullid
9 from node import hex, nullid
10 import errno, urllib
10 import errno, urllib
11 import util, scmutil, changegroup, base85, error
11 import util, scmutil, changegroup, base85, error
12 import discovery, phases, obsolete, bookmarks, bundle2
12 import discovery, phases, obsolete, bookmarks, bundle2
13
13
14 def readbundle(ui, fh, fname, vfs=None):
14 def readbundle(ui, fh, fname, vfs=None):
15 header = changegroup.readexactly(fh, 4)
15 header = changegroup.readexactly(fh, 4)
16
16
17 alg = None
17 alg = None
18 if not fname:
18 if not fname:
19 fname = "stream"
19 fname = "stream"
20 if not header.startswith('HG') and header.startswith('\0'):
20 if not header.startswith('HG') and header.startswith('\0'):
21 fh = changegroup.headerlessfixup(fh, header)
21 fh = changegroup.headerlessfixup(fh, header)
22 header = "HG10"
22 header = "HG10"
23 alg = 'UN'
23 alg = 'UN'
24 elif vfs:
24 elif vfs:
25 fname = vfs.join(fname)
25 fname = vfs.join(fname)
26
26
27 magic, version = header[0:2], header[2:4]
27 magic, version = header[0:2], header[2:4]
28
28
29 if magic != 'HG':
29 if magic != 'HG':
30 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
30 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
31 if version == '10':
31 if version == '10':
32 if alg is None:
32 if alg is None:
33 alg = changegroup.readexactly(fh, 2)
33 alg = changegroup.readexactly(fh, 2)
34 return changegroup.unbundle10(fh, alg)
34 return changegroup.unbundle10(fh, alg)
35 elif version == '2X':
35 elif version == '2X':
36 return bundle2.unbundle20(ui, fh, header=magic + version)
36 return bundle2.unbundle20(ui, fh, header=magic + version)
37 else:
37 else:
38 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
38 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
39
39
40
40
41 class pushoperation(object):
41 class pushoperation(object):
42 """A object that represent a single push operation
42 """A object that represent a single push operation
43
43
44 It purpose is to carry push related state and very common operation.
44 It purpose is to carry push related state and very common operation.
45
45
46 A new should be created at the beginning of each push and discarded
46 A new should be created at the beginning of each push and discarded
47 afterward.
47 afterward.
48 """
48 """
49
49
50 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
50 def __init__(self, repo, remote, force=False, revs=None, newbranch=False):
51 # repo we push from
51 # repo we push from
52 self.repo = repo
52 self.repo = repo
53 self.ui = repo.ui
53 self.ui = repo.ui
54 # repo we push to
54 # repo we push to
55 self.remote = remote
55 self.remote = remote
56 # force option provided
56 # force option provided
57 self.force = force
57 self.force = force
58 # revs to be pushed (None is "all")
58 # revs to be pushed (None is "all")
59 self.revs = revs
59 self.revs = revs
60 # allow push of new branch
60 # allow push of new branch
61 self.newbranch = newbranch
61 self.newbranch = newbranch
62 # did a local lock get acquired?
62 # did a local lock get acquired?
63 self.locallocked = None
63 self.locallocked = None
64 # Integer version of the push result
64 # Integer version of the push result
65 # - None means nothing to push
65 # - None means nothing to push
66 # - 0 means HTTP error
66 # - 0 means HTTP error
67 # - 1 means we pushed and remote head count is unchanged *or*
67 # - 1 means we pushed and remote head count is unchanged *or*
68 # we have outgoing changesets but refused to push
68 # we have outgoing changesets but refused to push
69 # - other values as described by addchangegroup()
69 # - other values as described by addchangegroup()
70 self.ret = None
70 self.ret = None
71 # discover.outgoing object (contains common and outgoing data)
71 # discover.outgoing object (contains common and outgoing data)
72 self.outgoing = None
72 self.outgoing = None
73 # all remote heads before the push
73 # all remote heads before the push
74 self.remoteheads = None
74 self.remoteheads = None
75 # testable as a boolean indicating if any nodes are missing locally.
75 # testable as a boolean indicating if any nodes are missing locally.
76 self.incoming = None
76 self.incoming = None
77 # set of all heads common after changeset bundle push
77 # set of all heads common after changeset bundle push
78 self.commonheads = None
78 self.commonheads = None
79
79
80 def push(repo, remote, force=False, revs=None, newbranch=False):
80 def push(repo, remote, force=False, revs=None, newbranch=False):
81 '''Push outgoing changesets (limited by revs) from a local
81 '''Push outgoing changesets (limited by revs) from a local
82 repository to remote. Return an integer:
82 repository to remote. Return an integer:
83 - None means nothing to push
83 - None means nothing to push
84 - 0 means HTTP error
84 - 0 means HTTP error
85 - 1 means we pushed and remote head count is unchanged *or*
85 - 1 means we pushed and remote head count is unchanged *or*
86 we have outgoing changesets but refused to push
86 we have outgoing changesets but refused to push
87 - other values as described by addchangegroup()
87 - other values as described by addchangegroup()
88 '''
88 '''
89 pushop = pushoperation(repo, remote, force, revs, newbranch)
89 pushop = pushoperation(repo, remote, force, revs, newbranch)
90 if pushop.remote.local():
90 if pushop.remote.local():
91 missing = (set(pushop.repo.requirements)
91 missing = (set(pushop.repo.requirements)
92 - pushop.remote.local().supported)
92 - pushop.remote.local().supported)
93 if missing:
93 if missing:
94 msg = _("required features are not"
94 msg = _("required features are not"
95 " supported in the destination:"
95 " supported in the destination:"
96 " %s") % (', '.join(sorted(missing)))
96 " %s") % (', '.join(sorted(missing)))
97 raise util.Abort(msg)
97 raise util.Abort(msg)
98
98
99 # there are two ways to push to remote repo:
99 # there are two ways to push to remote repo:
100 #
100 #
101 # addchangegroup assumes local user can lock remote
101 # addchangegroup assumes local user can lock remote
102 # repo (local filesystem, old ssh servers).
102 # repo (local filesystem, old ssh servers).
103 #
103 #
104 # unbundle assumes local user cannot lock remote repo (new ssh
104 # unbundle assumes local user cannot lock remote repo (new ssh
105 # servers, http servers).
105 # servers, http servers).
106
106
107 if not pushop.remote.canpush():
107 if not pushop.remote.canpush():
108 raise util.Abort(_("destination does not support push"))
108 raise util.Abort(_("destination does not support push"))
109 # get local lock as we might write phase data
109 # get local lock as we might write phase data
110 locallock = None
110 locallock = None
111 try:
111 try:
112 locallock = pushop.repo.lock()
112 locallock = pushop.repo.lock()
113 pushop.locallocked = True
113 pushop.locallocked = True
114 except IOError, err:
114 except IOError, err:
115 pushop.locallocked = False
115 pushop.locallocked = False
116 if err.errno != errno.EACCES:
116 if err.errno != errno.EACCES:
117 raise
117 raise
118 # source repo cannot be locked.
118 # source repo cannot be locked.
119 # We do not abort the push, but just disable the local phase
119 # We do not abort the push, but just disable the local phase
120 # synchronisation.
120 # synchronisation.
121 msg = 'cannot lock source repository: %s\n' % err
121 msg = 'cannot lock source repository: %s\n' % err
122 pushop.ui.debug(msg)
122 pushop.ui.debug(msg)
123 try:
123 try:
124 pushop.repo.checkpush(pushop)
124 pushop.repo.checkpush(pushop)
125 lock = None
125 lock = None
126 unbundle = pushop.remote.capable('unbundle')
126 unbundle = pushop.remote.capable('unbundle')
127 if not unbundle:
127 if not unbundle:
128 lock = pushop.remote.lock()
128 lock = pushop.remote.lock()
129 try:
129 try:
130 _pushdiscovery(pushop)
130 _pushdiscovery(pushop)
131 if _pushcheckoutgoing(pushop):
131 if _pushcheckoutgoing(pushop):
132 pushop.repo.prepushoutgoinghooks(pushop.repo,
132 pushop.repo.prepushoutgoinghooks(pushop.repo,
133 pushop.remote,
133 pushop.remote,
134 pushop.outgoing)
134 pushop.outgoing)
135 if (pushop.repo.ui.configbool('experimental', 'bundle2-exp',
135 if (pushop.repo.ui.configbool('experimental', 'bundle2-exp',
136 False)
136 False)
137 and pushop.remote.capable('bundle2-exp')):
137 and pushop.remote.capable('bundle2-exp')):
138 _pushbundle2(pushop)
138 _pushbundle2(pushop)
139 else:
139 else:
140 _pushchangeset(pushop)
140 _pushchangeset(pushop)
141 _pushcomputecommonheads(pushop)
141 _pushcomputecommonheads(pushop)
142 _pushsyncphase(pushop)
142 _pushsyncphase(pushop)
143 _pushobsolete(pushop)
143 _pushobsolete(pushop)
144 finally:
144 finally:
145 if lock is not None:
145 if lock is not None:
146 lock.release()
146 lock.release()
147 finally:
147 finally:
148 if locallock is not None:
148 if locallock is not None:
149 locallock.release()
149 locallock.release()
150
150
151 _pushbookmark(pushop)
151 _pushbookmark(pushop)
152 return pushop.ret
152 return pushop.ret
153
153
154 def _pushdiscovery(pushop):
154 def _pushdiscovery(pushop):
155 # discovery
155 # discovery
156 unfi = pushop.repo.unfiltered()
156 unfi = pushop.repo.unfiltered()
157 fci = discovery.findcommonincoming
157 fci = discovery.findcommonincoming
158 commoninc = fci(unfi, pushop.remote, force=pushop.force)
158 commoninc = fci(unfi, pushop.remote, force=pushop.force)
159 common, inc, remoteheads = commoninc
159 common, inc, remoteheads = commoninc
160 fco = discovery.findcommonoutgoing
160 fco = discovery.findcommonoutgoing
161 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
161 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
162 commoninc=commoninc, force=pushop.force)
162 commoninc=commoninc, force=pushop.force)
163 pushop.outgoing = outgoing
163 pushop.outgoing = outgoing
164 pushop.remoteheads = remoteheads
164 pushop.remoteheads = remoteheads
165 pushop.incoming = inc
165 pushop.incoming = inc
166
166
167 def _pushcheckoutgoing(pushop):
167 def _pushcheckoutgoing(pushop):
168 outgoing = pushop.outgoing
168 outgoing = pushop.outgoing
169 unfi = pushop.repo.unfiltered()
169 unfi = pushop.repo.unfiltered()
170 if not outgoing.missing:
170 if not outgoing.missing:
171 # nothing to push
171 # nothing to push
172 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
172 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
173 return False
173 return False
174 # something to push
174 # something to push
175 if not pushop.force:
175 if not pushop.force:
176 # if repo.obsstore == False --> no obsolete
176 # if repo.obsstore == False --> no obsolete
177 # then, save the iteration
177 # then, save the iteration
178 if unfi.obsstore:
178 if unfi.obsstore:
179 # this message are here for 80 char limit reason
179 # this message are here for 80 char limit reason
180 mso = _("push includes obsolete changeset: %s!")
180 mso = _("push includes obsolete changeset: %s!")
181 mst = "push includes %s changeset: %s!"
181 mst = "push includes %s changeset: %s!"
182 # plain versions for i18n tool to detect them
182 # plain versions for i18n tool to detect them
183 _("push includes unstable changeset: %s!")
183 _("push includes unstable changeset: %s!")
184 _("push includes bumped changeset: %s!")
184 _("push includes bumped changeset: %s!")
185 _("push includes divergent changeset: %s!")
185 _("push includes divergent changeset: %s!")
186 # If we are to push if there is at least one
186 # If we are to push if there is at least one
187 # obsolete or unstable changeset in missing, at
187 # obsolete or unstable changeset in missing, at
188 # least one of the missinghead will be obsolete or
188 # least one of the missinghead will be obsolete or
189 # unstable. So checking heads only is ok
189 # unstable. So checking heads only is ok
190 for node in outgoing.missingheads:
190 for node in outgoing.missingheads:
191 ctx = unfi[node]
191 ctx = unfi[node]
192 if ctx.obsolete():
192 if ctx.obsolete():
193 raise util.Abort(mso % ctx)
193 raise util.Abort(mso % ctx)
194 elif ctx.troubled():
194 elif ctx.troubled():
195 raise util.Abort(_(mst)
195 raise util.Abort(_(mst)
196 % (ctx.troubles()[0],
196 % (ctx.troubles()[0],
197 ctx))
197 ctx))
198 newbm = pushop.ui.configlist('bookmarks', 'pushing')
198 newbm = pushop.ui.configlist('bookmarks', 'pushing')
199 discovery.checkheads(unfi, pushop.remote, outgoing,
199 discovery.checkheads(unfi, pushop.remote, outgoing,
200 pushop.remoteheads,
200 pushop.remoteheads,
201 pushop.newbranch,
201 pushop.newbranch,
202 bool(pushop.incoming),
202 bool(pushop.incoming),
203 newbm)
203 newbm)
204 return True
204 return True
205
205
206 def _pushbundle2(pushop):
206 def _pushbundle2(pushop):
207 """push data to the remote using bundle2
207 """push data to the remote using bundle2
208
208
209 The only currently supported type of data is changegroup but this will
209 The only currently supported type of data is changegroup but this will
210 evolve in the future."""
210 evolve in the future."""
211 capsblob = urllib.unquote(pushop.remote.capable('bundle2-exp'))
211 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
212 caps = bundle2.decodecaps(capsblob)
213 bundler = bundle2.bundle20(pushop.ui, caps)
214 # create reply capability
212 # create reply capability
215 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
213 capsblob = bundle2.encodecaps(pushop.repo.bundle2caps)
216 bundler.newpart('b2x:replycaps', data=capsblob)
214 bundler.newpart('b2x:replycaps', data=capsblob)
217 # Send known heads to the server for race detection.
215 # Send known heads to the server for race detection.
218 if not pushop.force:
216 if not pushop.force:
219 bundler.newpart('B2X:CHECK:HEADS', data=iter(pushop.remoteheads))
217 bundler.newpart('B2X:CHECK:HEADS', data=iter(pushop.remoteheads))
220 extrainfo = _pushbundle2extraparts(pushop, bundler)
218 extrainfo = _pushbundle2extraparts(pushop, bundler)
221 # add the changegroup bundle
219 # add the changegroup bundle
222 cg = changegroup.getlocalbundle(pushop.repo, 'push', pushop.outgoing)
220 cg = changegroup.getlocalbundle(pushop.repo, 'push', pushop.outgoing)
223 cgpart = bundler.newpart('B2X:CHANGEGROUP', data=cg.getchunks())
221 cgpart = bundler.newpart('B2X:CHANGEGROUP', data=cg.getchunks())
224 stream = util.chunkbuffer(bundler.getchunks())
222 stream = util.chunkbuffer(bundler.getchunks())
225 try:
223 try:
226 reply = pushop.remote.unbundle(stream, ['force'], 'push')
224 reply = pushop.remote.unbundle(stream, ['force'], 'push')
227 except error.BundleValueError, exc:
225 except error.BundleValueError, exc:
228 raise util.Abort('missing support for %s' % exc)
226 raise util.Abort('missing support for %s' % exc)
229 try:
227 try:
230 op = bundle2.processbundle(pushop.repo, reply)
228 op = bundle2.processbundle(pushop.repo, reply)
231 except error.BundleValueError, exc:
229 except error.BundleValueError, exc:
232 raise util.Abort('missing support for %s' % exc)
230 raise util.Abort('missing support for %s' % exc)
233 cgreplies = op.records.getreplies(cgpart.id)
231 cgreplies = op.records.getreplies(cgpart.id)
234 assert len(cgreplies['changegroup']) == 1
232 assert len(cgreplies['changegroup']) == 1
235 pushop.ret = cgreplies['changegroup'][0]['return']
233 pushop.ret = cgreplies['changegroup'][0]['return']
236 _pushbundle2extrareply(pushop, op, extrainfo)
234 _pushbundle2extrareply(pushop, op, extrainfo)
237
235
238 def _pushbundle2extraparts(pushop, bundler):
236 def _pushbundle2extraparts(pushop, bundler):
239 """hook function to let extensions add parts
237 """hook function to let extensions add parts
240
238
241 Return a dict to let extensions pass data to the reply processing.
239 Return a dict to let extensions pass data to the reply processing.
242 """
240 """
243 return {}
241 return {}
244
242
245 def _pushbundle2extrareply(pushop, op, extrainfo):
243 def _pushbundle2extrareply(pushop, op, extrainfo):
246 """hook function to let extensions react to part replies
244 """hook function to let extensions react to part replies
247
245
248 The dict from _pushbundle2extrareply is fed to this function.
246 The dict from _pushbundle2extrareply is fed to this function.
249 """
247 """
250 pass
248 pass
251
249
252 def _pushchangeset(pushop):
250 def _pushchangeset(pushop):
253 """Make the actual push of changeset bundle to remote repo"""
251 """Make the actual push of changeset bundle to remote repo"""
254 outgoing = pushop.outgoing
252 outgoing = pushop.outgoing
255 unbundle = pushop.remote.capable('unbundle')
253 unbundle = pushop.remote.capable('unbundle')
256 # TODO: get bundlecaps from remote
254 # TODO: get bundlecaps from remote
257 bundlecaps = None
255 bundlecaps = None
258 # create a changegroup from local
256 # create a changegroup from local
259 if pushop.revs is None and not (outgoing.excluded
257 if pushop.revs is None and not (outgoing.excluded
260 or pushop.repo.changelog.filteredrevs):
258 or pushop.repo.changelog.filteredrevs):
261 # push everything,
259 # push everything,
262 # use the fast path, no race possible on push
260 # use the fast path, no race possible on push
263 bundler = changegroup.bundle10(pushop.repo, bundlecaps)
261 bundler = changegroup.bundle10(pushop.repo, bundlecaps)
264 cg = changegroup.getsubset(pushop.repo,
262 cg = changegroup.getsubset(pushop.repo,
265 outgoing,
263 outgoing,
266 bundler,
264 bundler,
267 'push',
265 'push',
268 fastpath=True)
266 fastpath=True)
269 else:
267 else:
270 cg = changegroup.getlocalbundle(pushop.repo, 'push', outgoing,
268 cg = changegroup.getlocalbundle(pushop.repo, 'push', outgoing,
271 bundlecaps)
269 bundlecaps)
272
270
273 # apply changegroup to remote
271 # apply changegroup to remote
274 if unbundle:
272 if unbundle:
275 # local repo finds heads on server, finds out what
273 # local repo finds heads on server, finds out what
276 # revs it must push. once revs transferred, if server
274 # revs it must push. once revs transferred, if server
277 # finds it has different heads (someone else won
275 # finds it has different heads (someone else won
278 # commit/push race), server aborts.
276 # commit/push race), server aborts.
279 if pushop.force:
277 if pushop.force:
280 remoteheads = ['force']
278 remoteheads = ['force']
281 else:
279 else:
282 remoteheads = pushop.remoteheads
280 remoteheads = pushop.remoteheads
283 # ssh: return remote's addchangegroup()
281 # ssh: return remote's addchangegroup()
284 # http: return remote's addchangegroup() or 0 for error
282 # http: return remote's addchangegroup() or 0 for error
285 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
283 pushop.ret = pushop.remote.unbundle(cg, remoteheads,
286 'push')
284 'push')
287 else:
285 else:
288 # we return an integer indicating remote head count
286 # we return an integer indicating remote head count
289 # change
287 # change
290 pushop.ret = pushop.remote.addchangegroup(cg, 'push', pushop.repo.url())
288 pushop.ret = pushop.remote.addchangegroup(cg, 'push', pushop.repo.url())
291
289
292 def _pushcomputecommonheads(pushop):
290 def _pushcomputecommonheads(pushop):
293 unfi = pushop.repo.unfiltered()
291 unfi = pushop.repo.unfiltered()
294 if pushop.ret:
292 if pushop.ret:
295 # push succeed, synchronize target of the push
293 # push succeed, synchronize target of the push
296 cheads = pushop.outgoing.missingheads
294 cheads = pushop.outgoing.missingheads
297 elif pushop.revs is None:
295 elif pushop.revs is None:
298 # All out push fails. synchronize all common
296 # All out push fails. synchronize all common
299 cheads = pushop.outgoing.commonheads
297 cheads = pushop.outgoing.commonheads
300 else:
298 else:
301 # I want cheads = heads(::missingheads and ::commonheads)
299 # I want cheads = heads(::missingheads and ::commonheads)
302 # (missingheads is revs with secret changeset filtered out)
300 # (missingheads is revs with secret changeset filtered out)
303 #
301 #
304 # This can be expressed as:
302 # This can be expressed as:
305 # cheads = ( (missingheads and ::commonheads)
303 # cheads = ( (missingheads and ::commonheads)
306 # + (commonheads and ::missingheads))"
304 # + (commonheads and ::missingheads))"
307 # )
305 # )
308 #
306 #
309 # while trying to push we already computed the following:
307 # while trying to push we already computed the following:
310 # common = (::commonheads)
308 # common = (::commonheads)
311 # missing = ((commonheads::missingheads) - commonheads)
309 # missing = ((commonheads::missingheads) - commonheads)
312 #
310 #
313 # We can pick:
311 # We can pick:
314 # * missingheads part of common (::commonheads)
312 # * missingheads part of common (::commonheads)
315 common = set(pushop.outgoing.common)
313 common = set(pushop.outgoing.common)
316 nm = pushop.repo.changelog.nodemap
314 nm = pushop.repo.changelog.nodemap
317 cheads = [node for node in pushop.revs if nm[node] in common]
315 cheads = [node for node in pushop.revs if nm[node] in common]
318 # and
316 # and
319 # * commonheads parents on missing
317 # * commonheads parents on missing
320 revset = unfi.set('%ln and parents(roots(%ln))',
318 revset = unfi.set('%ln and parents(roots(%ln))',
321 pushop.outgoing.commonheads,
319 pushop.outgoing.commonheads,
322 pushop.outgoing.missing)
320 pushop.outgoing.missing)
323 cheads.extend(c.node() for c in revset)
321 cheads.extend(c.node() for c in revset)
324 pushop.commonheads = cheads
322 pushop.commonheads = cheads
325
323
326 def _pushsyncphase(pushop):
324 def _pushsyncphase(pushop):
327 """synchronise phase information locally and remotely"""
325 """synchronise phase information locally and remotely"""
328 unfi = pushop.repo.unfiltered()
326 unfi = pushop.repo.unfiltered()
329 cheads = pushop.commonheads
327 cheads = pushop.commonheads
330 # even when we don't push, exchanging phase data is useful
328 # even when we don't push, exchanging phase data is useful
331 remotephases = pushop.remote.listkeys('phases')
329 remotephases = pushop.remote.listkeys('phases')
332 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
330 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
333 and remotephases # server supports phases
331 and remotephases # server supports phases
334 and pushop.ret is None # nothing was pushed
332 and pushop.ret is None # nothing was pushed
335 and remotephases.get('publishing', False)):
333 and remotephases.get('publishing', False)):
336 # When:
334 # When:
337 # - this is a subrepo push
335 # - this is a subrepo push
338 # - and remote support phase
336 # - and remote support phase
339 # - and no changeset was pushed
337 # - and no changeset was pushed
340 # - and remote is publishing
338 # - and remote is publishing
341 # We may be in issue 3871 case!
339 # We may be in issue 3871 case!
342 # We drop the possible phase synchronisation done by
340 # We drop the possible phase synchronisation done by
343 # courtesy to publish changesets possibly locally draft
341 # courtesy to publish changesets possibly locally draft
344 # on the remote.
342 # on the remote.
345 remotephases = {'publishing': 'True'}
343 remotephases = {'publishing': 'True'}
346 if not remotephases: # old server or public only reply from non-publishing
344 if not remotephases: # old server or public only reply from non-publishing
347 _localphasemove(pushop, cheads)
345 _localphasemove(pushop, cheads)
348 # don't push any phase data as there is nothing to push
346 # don't push any phase data as there is nothing to push
349 else:
347 else:
350 ana = phases.analyzeremotephases(pushop.repo, cheads,
348 ana = phases.analyzeremotephases(pushop.repo, cheads,
351 remotephases)
349 remotephases)
352 pheads, droots = ana
350 pheads, droots = ana
353 ### Apply remote phase on local
351 ### Apply remote phase on local
354 if remotephases.get('publishing', False):
352 if remotephases.get('publishing', False):
355 _localphasemove(pushop, cheads)
353 _localphasemove(pushop, cheads)
356 else: # publish = False
354 else: # publish = False
357 _localphasemove(pushop, pheads)
355 _localphasemove(pushop, pheads)
358 _localphasemove(pushop, cheads, phases.draft)
356 _localphasemove(pushop, cheads, phases.draft)
359 ### Apply local phase on remote
357 ### Apply local phase on remote
360
358
361 # Get the list of all revs draft on remote by public here.
359 # Get the list of all revs draft on remote by public here.
362 # XXX Beware that revset break if droots is not strictly
360 # XXX Beware that revset break if droots is not strictly
363 # XXX root we may want to ensure it is but it is costly
361 # XXX root we may want to ensure it is but it is costly
364 outdated = unfi.set('heads((%ln::%ln) and public())',
362 outdated = unfi.set('heads((%ln::%ln) and public())',
365 droots, cheads)
363 droots, cheads)
366 for newremotehead in outdated:
364 for newremotehead in outdated:
367 r = pushop.remote.pushkey('phases',
365 r = pushop.remote.pushkey('phases',
368 newremotehead.hex(),
366 newremotehead.hex(),
369 str(phases.draft),
367 str(phases.draft),
370 str(phases.public))
368 str(phases.public))
371 if not r:
369 if not r:
372 pushop.ui.warn(_('updating %s to public failed!\n')
370 pushop.ui.warn(_('updating %s to public failed!\n')
373 % newremotehead)
371 % newremotehead)
374
372
375 def _localphasemove(pushop, nodes, phase=phases.public):
373 def _localphasemove(pushop, nodes, phase=phases.public):
376 """move <nodes> to <phase> in the local source repo"""
374 """move <nodes> to <phase> in the local source repo"""
377 if pushop.locallocked:
375 if pushop.locallocked:
378 phases.advanceboundary(pushop.repo, phase, nodes)
376 phases.advanceboundary(pushop.repo, phase, nodes)
379 else:
377 else:
380 # repo is not locked, do not change any phases!
378 # repo is not locked, do not change any phases!
381 # Informs the user that phases should have been moved when
379 # Informs the user that phases should have been moved when
382 # applicable.
380 # applicable.
383 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
381 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
384 phasestr = phases.phasenames[phase]
382 phasestr = phases.phasenames[phase]
385 if actualmoves:
383 if actualmoves:
386 pushop.ui.status(_('cannot lock source repo, skipping '
384 pushop.ui.status(_('cannot lock source repo, skipping '
387 'local %s phase update\n') % phasestr)
385 'local %s phase update\n') % phasestr)
388
386
389 def _pushobsolete(pushop):
387 def _pushobsolete(pushop):
390 """utility function to push obsolete markers to a remote"""
388 """utility function to push obsolete markers to a remote"""
391 pushop.ui.debug('try to push obsolete markers to remote\n')
389 pushop.ui.debug('try to push obsolete markers to remote\n')
392 repo = pushop.repo
390 repo = pushop.repo
393 remote = pushop.remote
391 remote = pushop.remote
394 if (obsolete._enabled and repo.obsstore and
392 if (obsolete._enabled and repo.obsstore and
395 'obsolete' in remote.listkeys('namespaces')):
393 'obsolete' in remote.listkeys('namespaces')):
396 rslts = []
394 rslts = []
397 remotedata = repo.listkeys('obsolete')
395 remotedata = repo.listkeys('obsolete')
398 for key in sorted(remotedata, reverse=True):
396 for key in sorted(remotedata, reverse=True):
399 # reverse sort to ensure we end with dump0
397 # reverse sort to ensure we end with dump0
400 data = remotedata[key]
398 data = remotedata[key]
401 rslts.append(remote.pushkey('obsolete', key, '', data))
399 rslts.append(remote.pushkey('obsolete', key, '', data))
402 if [r for r in rslts if not r]:
400 if [r for r in rslts if not r]:
403 msg = _('failed to push some obsolete markers!\n')
401 msg = _('failed to push some obsolete markers!\n')
404 repo.ui.warn(msg)
402 repo.ui.warn(msg)
405
403
406 def _pushbookmark(pushop):
404 def _pushbookmark(pushop):
407 """Update bookmark position on remote"""
405 """Update bookmark position on remote"""
408 ui = pushop.ui
406 ui = pushop.ui
409 repo = pushop.repo.unfiltered()
407 repo = pushop.repo.unfiltered()
410 remote = pushop.remote
408 remote = pushop.remote
411 ui.debug("checking for updated bookmarks\n")
409 ui.debug("checking for updated bookmarks\n")
412 revnums = map(repo.changelog.rev, pushop.revs or [])
410 revnums = map(repo.changelog.rev, pushop.revs or [])
413 ancestors = [a for a in repo.changelog.ancestors(revnums, inclusive=True)]
411 ancestors = [a for a in repo.changelog.ancestors(revnums, inclusive=True)]
414 (addsrc, adddst, advsrc, advdst, diverge, differ, invalid
412 (addsrc, adddst, advsrc, advdst, diverge, differ, invalid
415 ) = bookmarks.compare(repo, repo._bookmarks, remote.listkeys('bookmarks'),
413 ) = bookmarks.compare(repo, repo._bookmarks, remote.listkeys('bookmarks'),
416 srchex=hex)
414 srchex=hex)
417
415
418 for b, scid, dcid in advsrc:
416 for b, scid, dcid in advsrc:
419 if ancestors and repo[scid].rev() not in ancestors:
417 if ancestors and repo[scid].rev() not in ancestors:
420 continue
418 continue
421 if remote.pushkey('bookmarks', b, dcid, scid):
419 if remote.pushkey('bookmarks', b, dcid, scid):
422 ui.status(_("updating bookmark %s\n") % b)
420 ui.status(_("updating bookmark %s\n") % b)
423 else:
421 else:
424 ui.warn(_('updating bookmark %s failed!\n') % b)
422 ui.warn(_('updating bookmark %s failed!\n') % b)
425
423
426 class pulloperation(object):
424 class pulloperation(object):
427 """A object that represent a single pull operation
425 """A object that represent a single pull operation
428
426
429 It purpose is to carry push related state and very common operation.
427 It purpose is to carry push related state and very common operation.
430
428
431 A new should be created at the beginning of each pull and discarded
429 A new should be created at the beginning of each pull and discarded
432 afterward.
430 afterward.
433 """
431 """
434
432
435 def __init__(self, repo, remote, heads=None, force=False):
433 def __init__(self, repo, remote, heads=None, force=False):
436 # repo we pull into
434 # repo we pull into
437 self.repo = repo
435 self.repo = repo
438 # repo we pull from
436 # repo we pull from
439 self.remote = remote
437 self.remote = remote
440 # revision we try to pull (None is "all")
438 # revision we try to pull (None is "all")
441 self.heads = heads
439 self.heads = heads
442 # do we force pull?
440 # do we force pull?
443 self.force = force
441 self.force = force
444 # the name the pull transaction
442 # the name the pull transaction
445 self._trname = 'pull\n' + util.hidepassword(remote.url())
443 self._trname = 'pull\n' + util.hidepassword(remote.url())
446 # hold the transaction once created
444 # hold the transaction once created
447 self._tr = None
445 self._tr = None
448 # set of common changeset between local and remote before pull
446 # set of common changeset between local and remote before pull
449 self.common = None
447 self.common = None
450 # set of pulled head
448 # set of pulled head
451 self.rheads = None
449 self.rheads = None
452 # list of missing changeset to fetch remotely
450 # list of missing changeset to fetch remotely
453 self.fetch = None
451 self.fetch = None
454 # result of changegroup pulling (used as return code by pull)
452 # result of changegroup pulling (used as return code by pull)
455 self.cgresult = None
453 self.cgresult = None
456 # list of step remaining todo (related to future bundle2 usage)
454 # list of step remaining todo (related to future bundle2 usage)
457 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
455 self.todosteps = set(['changegroup', 'phases', 'obsmarkers'])
458
456
459 @util.propertycache
457 @util.propertycache
460 def pulledsubset(self):
458 def pulledsubset(self):
461 """heads of the set of changeset target by the pull"""
459 """heads of the set of changeset target by the pull"""
462 # compute target subset
460 # compute target subset
463 if self.heads is None:
461 if self.heads is None:
464 # We pulled every thing possible
462 # We pulled every thing possible
465 # sync on everything common
463 # sync on everything common
466 c = set(self.common)
464 c = set(self.common)
467 ret = list(self.common)
465 ret = list(self.common)
468 for n in self.rheads:
466 for n in self.rheads:
469 if n not in c:
467 if n not in c:
470 ret.append(n)
468 ret.append(n)
471 return ret
469 return ret
472 else:
470 else:
473 # We pulled a specific subset
471 # We pulled a specific subset
474 # sync on this subset
472 # sync on this subset
475 return self.heads
473 return self.heads
476
474
477 def gettransaction(self):
475 def gettransaction(self):
478 """get appropriate pull transaction, creating it if needed"""
476 """get appropriate pull transaction, creating it if needed"""
479 if self._tr is None:
477 if self._tr is None:
480 self._tr = self.repo.transaction(self._trname)
478 self._tr = self.repo.transaction(self._trname)
481 return self._tr
479 return self._tr
482
480
483 def closetransaction(self):
481 def closetransaction(self):
484 """close transaction if created"""
482 """close transaction if created"""
485 if self._tr is not None:
483 if self._tr is not None:
486 self._tr.close()
484 self._tr.close()
487
485
488 def releasetransaction(self):
486 def releasetransaction(self):
489 """release transaction if created"""
487 """release transaction if created"""
490 if self._tr is not None:
488 if self._tr is not None:
491 self._tr.release()
489 self._tr.release()
492
490
493 def pull(repo, remote, heads=None, force=False):
491 def pull(repo, remote, heads=None, force=False):
494 pullop = pulloperation(repo, remote, heads, force)
492 pullop = pulloperation(repo, remote, heads, force)
495 if pullop.remote.local():
493 if pullop.remote.local():
496 missing = set(pullop.remote.requirements) - pullop.repo.supported
494 missing = set(pullop.remote.requirements) - pullop.repo.supported
497 if missing:
495 if missing:
498 msg = _("required features are not"
496 msg = _("required features are not"
499 " supported in the destination:"
497 " supported in the destination:"
500 " %s") % (', '.join(sorted(missing)))
498 " %s") % (', '.join(sorted(missing)))
501 raise util.Abort(msg)
499 raise util.Abort(msg)
502
500
503 lock = pullop.repo.lock()
501 lock = pullop.repo.lock()
504 try:
502 try:
505 _pulldiscovery(pullop)
503 _pulldiscovery(pullop)
506 if (pullop.repo.ui.configbool('experimental', 'bundle2-exp', False)
504 if (pullop.repo.ui.configbool('experimental', 'bundle2-exp', False)
507 and pullop.remote.capable('bundle2-exp')):
505 and pullop.remote.capable('bundle2-exp')):
508 _pullbundle2(pullop)
506 _pullbundle2(pullop)
509 if 'changegroup' in pullop.todosteps:
507 if 'changegroup' in pullop.todosteps:
510 _pullchangeset(pullop)
508 _pullchangeset(pullop)
511 if 'phases' in pullop.todosteps:
509 if 'phases' in pullop.todosteps:
512 _pullphase(pullop)
510 _pullphase(pullop)
513 if 'obsmarkers' in pullop.todosteps:
511 if 'obsmarkers' in pullop.todosteps:
514 _pullobsolete(pullop)
512 _pullobsolete(pullop)
515 pullop.closetransaction()
513 pullop.closetransaction()
516 finally:
514 finally:
517 pullop.releasetransaction()
515 pullop.releasetransaction()
518 lock.release()
516 lock.release()
519
517
520 return pullop.cgresult
518 return pullop.cgresult
521
519
522 def _pulldiscovery(pullop):
520 def _pulldiscovery(pullop):
523 """discovery phase for the pull
521 """discovery phase for the pull
524
522
525 Current handle changeset discovery only, will change handle all discovery
523 Current handle changeset discovery only, will change handle all discovery
526 at some point."""
524 at some point."""
527 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
525 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
528 pullop.remote,
526 pullop.remote,
529 heads=pullop.heads,
527 heads=pullop.heads,
530 force=pullop.force)
528 force=pullop.force)
531 pullop.common, pullop.fetch, pullop.rheads = tmp
529 pullop.common, pullop.fetch, pullop.rheads = tmp
532
530
533 def _pullbundle2(pullop):
531 def _pullbundle2(pullop):
534 """pull data using bundle2
532 """pull data using bundle2
535
533
536 For now, the only supported data are changegroup."""
534 For now, the only supported data are changegroup."""
537 kwargs = {'bundlecaps': set(['HG2X'])}
535 kwargs = {'bundlecaps': set(['HG2X'])}
538 capsblob = bundle2.encodecaps(pullop.repo.bundle2caps)
536 capsblob = bundle2.encodecaps(pullop.repo.bundle2caps)
539 kwargs['bundlecaps'].add('bundle2=' + urllib.quote(capsblob))
537 kwargs['bundlecaps'].add('bundle2=' + urllib.quote(capsblob))
540 # pulling changegroup
538 # pulling changegroup
541 pullop.todosteps.remove('changegroup')
539 pullop.todosteps.remove('changegroup')
542
540
543 kwargs['common'] = pullop.common
541 kwargs['common'] = pullop.common
544 kwargs['heads'] = pullop.heads or pullop.rheads
542 kwargs['heads'] = pullop.heads or pullop.rheads
545 if not pullop.fetch:
543 if not pullop.fetch:
546 pullop.repo.ui.status(_("no changes found\n"))
544 pullop.repo.ui.status(_("no changes found\n"))
547 pullop.cgresult = 0
545 pullop.cgresult = 0
548 else:
546 else:
549 if pullop.heads is None and list(pullop.common) == [nullid]:
547 if pullop.heads is None and list(pullop.common) == [nullid]:
550 pullop.repo.ui.status(_("requesting all changes\n"))
548 pullop.repo.ui.status(_("requesting all changes\n"))
551 _pullbundle2extraprepare(pullop, kwargs)
549 _pullbundle2extraprepare(pullop, kwargs)
552 if kwargs.keys() == ['format']:
550 if kwargs.keys() == ['format']:
553 return # nothing to pull
551 return # nothing to pull
554 bundle = pullop.remote.getbundle('pull', **kwargs)
552 bundle = pullop.remote.getbundle('pull', **kwargs)
555 try:
553 try:
556 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
554 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
557 except error.BundleValueError, exc:
555 except error.BundleValueError, exc:
558 raise util.Abort('missing support for %s' % exc)
556 raise util.Abort('missing support for %s' % exc)
559
557
560 if pullop.fetch:
558 if pullop.fetch:
561 assert len(op.records['changegroup']) == 1
559 assert len(op.records['changegroup']) == 1
562 pullop.cgresult = op.records['changegroup'][0]['return']
560 pullop.cgresult = op.records['changegroup'][0]['return']
563
561
564 def _pullbundle2extraprepare(pullop, kwargs):
562 def _pullbundle2extraprepare(pullop, kwargs):
565 """hook function so that extensions can extend the getbundle call"""
563 """hook function so that extensions can extend the getbundle call"""
566 pass
564 pass
567
565
568 def _pullchangeset(pullop):
566 def _pullchangeset(pullop):
569 """pull changeset from unbundle into the local repo"""
567 """pull changeset from unbundle into the local repo"""
570 # We delay the open of the transaction as late as possible so we
568 # We delay the open of the transaction as late as possible so we
571 # don't open transaction for nothing or you break future useful
569 # don't open transaction for nothing or you break future useful
572 # rollback call
570 # rollback call
573 pullop.todosteps.remove('changegroup')
571 pullop.todosteps.remove('changegroup')
574 if not pullop.fetch:
572 if not pullop.fetch:
575 pullop.repo.ui.status(_("no changes found\n"))
573 pullop.repo.ui.status(_("no changes found\n"))
576 pullop.cgresult = 0
574 pullop.cgresult = 0
577 return
575 return
578 pullop.gettransaction()
576 pullop.gettransaction()
579 if pullop.heads is None and list(pullop.common) == [nullid]:
577 if pullop.heads is None and list(pullop.common) == [nullid]:
580 pullop.repo.ui.status(_("requesting all changes\n"))
578 pullop.repo.ui.status(_("requesting all changes\n"))
581 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
579 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
582 # issue1320, avoid a race if remote changed after discovery
580 # issue1320, avoid a race if remote changed after discovery
583 pullop.heads = pullop.rheads
581 pullop.heads = pullop.rheads
584
582
585 if pullop.remote.capable('getbundle'):
583 if pullop.remote.capable('getbundle'):
586 # TODO: get bundlecaps from remote
584 # TODO: get bundlecaps from remote
587 cg = pullop.remote.getbundle('pull', common=pullop.common,
585 cg = pullop.remote.getbundle('pull', common=pullop.common,
588 heads=pullop.heads or pullop.rheads)
586 heads=pullop.heads or pullop.rheads)
589 elif pullop.heads is None:
587 elif pullop.heads is None:
590 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
588 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
591 elif not pullop.remote.capable('changegroupsubset'):
589 elif not pullop.remote.capable('changegroupsubset'):
592 raise util.Abort(_("partial pull cannot be done because "
590 raise util.Abort(_("partial pull cannot be done because "
593 "other repository doesn't support "
591 "other repository doesn't support "
594 "changegroupsubset."))
592 "changegroupsubset."))
595 else:
593 else:
596 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
594 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
597 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
595 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
598 pullop.remote.url())
596 pullop.remote.url())
599
597
600 def _pullphase(pullop):
598 def _pullphase(pullop):
601 # Get remote phases data from remote
599 # Get remote phases data from remote
602 pullop.todosteps.remove('phases')
600 pullop.todosteps.remove('phases')
603 remotephases = pullop.remote.listkeys('phases')
601 remotephases = pullop.remote.listkeys('phases')
604 publishing = bool(remotephases.get('publishing', False))
602 publishing = bool(remotephases.get('publishing', False))
605 if remotephases and not publishing:
603 if remotephases and not publishing:
606 # remote is new and unpublishing
604 # remote is new and unpublishing
607 pheads, _dr = phases.analyzeremotephases(pullop.repo,
605 pheads, _dr = phases.analyzeremotephases(pullop.repo,
608 pullop.pulledsubset,
606 pullop.pulledsubset,
609 remotephases)
607 remotephases)
610 phases.advanceboundary(pullop.repo, phases.public, pheads)
608 phases.advanceboundary(pullop.repo, phases.public, pheads)
611 phases.advanceboundary(pullop.repo, phases.draft,
609 phases.advanceboundary(pullop.repo, phases.draft,
612 pullop.pulledsubset)
610 pullop.pulledsubset)
613 else:
611 else:
614 # Remote is old or publishing all common changesets
612 # Remote is old or publishing all common changesets
615 # should be seen as public
613 # should be seen as public
616 phases.advanceboundary(pullop.repo, phases.public,
614 phases.advanceboundary(pullop.repo, phases.public,
617 pullop.pulledsubset)
615 pullop.pulledsubset)
618
616
619 def _pullobsolete(pullop):
617 def _pullobsolete(pullop):
620 """utility function to pull obsolete markers from a remote
618 """utility function to pull obsolete markers from a remote
621
619
622 The `gettransaction` is function that return the pull transaction, creating
620 The `gettransaction` is function that return the pull transaction, creating
623 one if necessary. We return the transaction to inform the calling code that
621 one if necessary. We return the transaction to inform the calling code that
624 a new transaction have been created (when applicable).
622 a new transaction have been created (when applicable).
625
623
626 Exists mostly to allow overriding for experimentation purpose"""
624 Exists mostly to allow overriding for experimentation purpose"""
627 pullop.todosteps.remove('obsmarkers')
625 pullop.todosteps.remove('obsmarkers')
628 tr = None
626 tr = None
629 if obsolete._enabled:
627 if obsolete._enabled:
630 pullop.repo.ui.debug('fetching remote obsolete markers\n')
628 pullop.repo.ui.debug('fetching remote obsolete markers\n')
631 remoteobs = pullop.remote.listkeys('obsolete')
629 remoteobs = pullop.remote.listkeys('obsolete')
632 if 'dump0' in remoteobs:
630 if 'dump0' in remoteobs:
633 tr = pullop.gettransaction()
631 tr = pullop.gettransaction()
634 for key in sorted(remoteobs, reverse=True):
632 for key in sorted(remoteobs, reverse=True):
635 if key.startswith('dump'):
633 if key.startswith('dump'):
636 data = base85.b85decode(remoteobs[key])
634 data = base85.b85decode(remoteobs[key])
637 pullop.repo.obsstore.mergemarkers(tr, data)
635 pullop.repo.obsstore.mergemarkers(tr, data)
638 pullop.repo.invalidatevolatilesets()
636 pullop.repo.invalidatevolatilesets()
639 return tr
637 return tr
640
638
641 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
639 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
642 **kwargs):
640 **kwargs):
643 """return a full bundle (with potentially multiple kind of parts)
641 """return a full bundle (with potentially multiple kind of parts)
644
642
645 Could be a bundle HG10 or a bundle HG2X depending on bundlecaps
643 Could be a bundle HG10 or a bundle HG2X depending on bundlecaps
646 passed. For now, the bundle can contain only changegroup, but this will
644 passed. For now, the bundle can contain only changegroup, but this will
647 changes when more part type will be available for bundle2.
645 changes when more part type will be available for bundle2.
648
646
649 This is different from changegroup.getbundle that only returns an HG10
647 This is different from changegroup.getbundle that only returns an HG10
650 changegroup bundle. They may eventually get reunited in the future when we
648 changegroup bundle. They may eventually get reunited in the future when we
651 have a clearer idea of the API we what to query different data.
649 have a clearer idea of the API we what to query different data.
652
650
653 The implementation is at a very early stage and will get massive rework
651 The implementation is at a very early stage and will get massive rework
654 when the API of bundle is refined.
652 when the API of bundle is refined.
655 """
653 """
656 # build changegroup bundle here.
654 # build changegroup bundle here.
657 cg = changegroup.getbundle(repo, source, heads=heads,
655 cg = changegroup.getbundle(repo, source, heads=heads,
658 common=common, bundlecaps=bundlecaps)
656 common=common, bundlecaps=bundlecaps)
659 if bundlecaps is None or 'HG2X' not in bundlecaps:
657 if bundlecaps is None or 'HG2X' not in bundlecaps:
660 return cg
658 return cg
661 # very crude first implementation,
659 # very crude first implementation,
662 # the bundle API will change and the generation will be done lazily.
660 # the bundle API will change and the generation will be done lazily.
663 b2caps = {}
661 b2caps = {}
664 for bcaps in bundlecaps:
662 for bcaps in bundlecaps:
665 if bcaps.startswith('bundle2='):
663 if bcaps.startswith('bundle2='):
666 blob = urllib.unquote(bcaps[len('bundle2='):])
664 blob = urllib.unquote(bcaps[len('bundle2='):])
667 b2caps.update(bundle2.decodecaps(blob))
665 b2caps.update(bundle2.decodecaps(blob))
668 bundler = bundle2.bundle20(repo.ui, b2caps)
666 bundler = bundle2.bundle20(repo.ui, b2caps)
669 if cg:
667 if cg:
670 bundler.newpart('b2x:changegroup', data=cg.getchunks())
668 bundler.newpart('b2x:changegroup', data=cg.getchunks())
671 _getbundleextrapart(bundler, repo, source, heads=heads, common=common,
669 _getbundleextrapart(bundler, repo, source, heads=heads, common=common,
672 bundlecaps=bundlecaps, **kwargs)
670 bundlecaps=bundlecaps, **kwargs)
673 return util.chunkbuffer(bundler.getchunks())
671 return util.chunkbuffer(bundler.getchunks())
674
672
675 def _getbundleextrapart(bundler, repo, source, heads=None, common=None,
673 def _getbundleextrapart(bundler, repo, source, heads=None, common=None,
676 bundlecaps=None, **kwargs):
674 bundlecaps=None, **kwargs):
677 """hook function to let extensions add parts to the requested bundle"""
675 """hook function to let extensions add parts to the requested bundle"""
678 pass
676 pass
679
677
680 def check_heads(repo, their_heads, context):
678 def check_heads(repo, their_heads, context):
681 """check if the heads of a repo have been modified
679 """check if the heads of a repo have been modified
682
680
683 Used by peer for unbundling.
681 Used by peer for unbundling.
684 """
682 """
685 heads = repo.heads()
683 heads = repo.heads()
686 heads_hash = util.sha1(''.join(sorted(heads))).digest()
684 heads_hash = util.sha1(''.join(sorted(heads))).digest()
687 if not (their_heads == ['force'] or their_heads == heads or
685 if not (their_heads == ['force'] or their_heads == heads or
688 their_heads == ['hashed', heads_hash]):
686 their_heads == ['hashed', heads_hash]):
689 # someone else committed/pushed/unbundled while we
687 # someone else committed/pushed/unbundled while we
690 # were transferring data
688 # were transferring data
691 raise error.PushRaced('repository changed while %s - '
689 raise error.PushRaced('repository changed while %s - '
692 'please try again' % context)
690 'please try again' % context)
693
691
694 def unbundle(repo, cg, heads, source, url):
692 def unbundle(repo, cg, heads, source, url):
695 """Apply a bundle to a repo.
693 """Apply a bundle to a repo.
696
694
697 this function makes sure the repo is locked during the application and have
695 this function makes sure the repo is locked during the application and have
698 mechanism to check that no push race occurred between the creation of the
696 mechanism to check that no push race occurred between the creation of the
699 bundle and its application.
697 bundle and its application.
700
698
701 If the push was raced as PushRaced exception is raised."""
699 If the push was raced as PushRaced exception is raised."""
702 r = 0
700 r = 0
703 # need a transaction when processing a bundle2 stream
701 # need a transaction when processing a bundle2 stream
704 tr = None
702 tr = None
705 lock = repo.lock()
703 lock = repo.lock()
706 try:
704 try:
707 check_heads(repo, heads, 'uploading changes')
705 check_heads(repo, heads, 'uploading changes')
708 # push can proceed
706 # push can proceed
709 if util.safehasattr(cg, 'params'):
707 if util.safehasattr(cg, 'params'):
710 try:
708 try:
711 tr = repo.transaction('unbundle')
709 tr = repo.transaction('unbundle')
712 tr.hookargs['bundle2-exp'] = '1'
710 tr.hookargs['bundle2-exp'] = '1'
713 r = bundle2.processbundle(repo, cg, lambda: tr).reply
711 r = bundle2.processbundle(repo, cg, lambda: tr).reply
714 cl = repo.unfiltered().changelog
712 cl = repo.unfiltered().changelog
715 p = cl.writepending() and repo.root or ""
713 p = cl.writepending() and repo.root or ""
716 repo.hook('b2x-pretransactionclose', throw=True, source=source,
714 repo.hook('b2x-pretransactionclose', throw=True, source=source,
717 url=url, pending=p, **tr.hookargs)
715 url=url, pending=p, **tr.hookargs)
718 tr.close()
716 tr.close()
719 repo.hook('b2x-transactionclose', source=source, url=url,
717 repo.hook('b2x-transactionclose', source=source, url=url,
720 **tr.hookargs)
718 **tr.hookargs)
721 except Exception, exc:
719 except Exception, exc:
722 exc.duringunbundle2 = True
720 exc.duringunbundle2 = True
723 raise
721 raise
724 else:
722 else:
725 r = changegroup.addchangegroup(repo, cg, source, url)
723 r = changegroup.addchangegroup(repo, cg, source, url)
726 finally:
724 finally:
727 if tr is not None:
725 if tr is not None:
728 tr.release()
726 tr.release()
729 lock.release()
727 lock.release()
730 return r
728 return r
General Comments 0
You need to be logged in to leave comments. Login now