##// END OF EJS Templates
changegroup: stop returning and recording added nodes in 'cg.apply'...
Boris Feld -
r33461:bb72031f default
parent child Browse files
Show More
@@ -1,1854 +1,1853 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 phases,
161 phases,
162 pushkey,
162 pushkey,
163 pycompat,
163 pycompat,
164 scmutil,
164 scmutil,
165 tags,
165 tags,
166 url,
166 url,
167 util,
167 util,
168 )
168 )
169
169
170 urlerr = util.urlerr
170 urlerr = util.urlerr
171 urlreq = util.urlreq
171 urlreq = util.urlreq
172
172
173 _pack = struct.pack
173 _pack = struct.pack
174 _unpack = struct.unpack
174 _unpack = struct.unpack
175
175
176 _fstreamparamsize = '>i'
176 _fstreamparamsize = '>i'
177 _fpartheadersize = '>i'
177 _fpartheadersize = '>i'
178 _fparttypesize = '>B'
178 _fparttypesize = '>B'
179 _fpartid = '>I'
179 _fpartid = '>I'
180 _fpayloadsize = '>i'
180 _fpayloadsize = '>i'
181 _fpartparamcount = '>BB'
181 _fpartparamcount = '>BB'
182
182
183 _fphasesentry = '>i20s'
183 _fphasesentry = '>i20s'
184
184
185 preferedchunksize = 4096
185 preferedchunksize = 4096
186
186
187 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
188
188
189 def outdebug(ui, message):
189 def outdebug(ui, message):
190 """debug regarding output stream (bundling)"""
190 """debug regarding output stream (bundling)"""
191 if ui.configbool('devel', 'bundle2.debug'):
191 if ui.configbool('devel', 'bundle2.debug'):
192 ui.debug('bundle2-output: %s\n' % message)
192 ui.debug('bundle2-output: %s\n' % message)
193
193
194 def indebug(ui, message):
194 def indebug(ui, message):
195 """debug on input stream (unbundling)"""
195 """debug on input stream (unbundling)"""
196 if ui.configbool('devel', 'bundle2.debug'):
196 if ui.configbool('devel', 'bundle2.debug'):
197 ui.debug('bundle2-input: %s\n' % message)
197 ui.debug('bundle2-input: %s\n' % message)
198
198
199 def validateparttype(parttype):
199 def validateparttype(parttype):
200 """raise ValueError if a parttype contains invalid character"""
200 """raise ValueError if a parttype contains invalid character"""
201 if _parttypeforbidden.search(parttype):
201 if _parttypeforbidden.search(parttype):
202 raise ValueError(parttype)
202 raise ValueError(parttype)
203
203
204 def _makefpartparamsizes(nbparams):
204 def _makefpartparamsizes(nbparams):
205 """return a struct format to read part parameter sizes
205 """return a struct format to read part parameter sizes
206
206
207 The number parameters is variable so we need to build that format
207 The number parameters is variable so we need to build that format
208 dynamically.
208 dynamically.
209 """
209 """
210 return '>'+('BB'*nbparams)
210 return '>'+('BB'*nbparams)
211
211
212 parthandlermapping = {}
212 parthandlermapping = {}
213
213
214 def parthandler(parttype, params=()):
214 def parthandler(parttype, params=()):
215 """decorator that register a function as a bundle2 part handler
215 """decorator that register a function as a bundle2 part handler
216
216
217 eg::
217 eg::
218
218
219 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
220 def myparttypehandler(...):
220 def myparttypehandler(...):
221 '''process a part of type "my part".'''
221 '''process a part of type "my part".'''
222 ...
222 ...
223 """
223 """
224 validateparttype(parttype)
224 validateparttype(parttype)
225 def _decorator(func):
225 def _decorator(func):
226 lparttype = parttype.lower() # enforce lower case matching.
226 lparttype = parttype.lower() # enforce lower case matching.
227 assert lparttype not in parthandlermapping
227 assert lparttype not in parthandlermapping
228 parthandlermapping[lparttype] = func
228 parthandlermapping[lparttype] = func
229 func.params = frozenset(params)
229 func.params = frozenset(params)
230 return func
230 return func
231 return _decorator
231 return _decorator
232
232
233 class unbundlerecords(object):
233 class unbundlerecords(object):
234 """keep record of what happens during and unbundle
234 """keep record of what happens during and unbundle
235
235
236 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 New records are added using `records.add('cat', obj)`. Where 'cat' is a
237 category of record and obj is an arbitrary object.
237 category of record and obj is an arbitrary object.
238
238
239 `records['cat']` will return all entries of this category 'cat'.
239 `records['cat']` will return all entries of this category 'cat'.
240
240
241 Iterating on the object itself will yield `('category', obj)` tuples
241 Iterating on the object itself will yield `('category', obj)` tuples
242 for all entries.
242 for all entries.
243
243
244 All iterations happens in chronological order.
244 All iterations happens in chronological order.
245 """
245 """
246
246
247 def __init__(self):
247 def __init__(self):
248 self._categories = {}
248 self._categories = {}
249 self._sequences = []
249 self._sequences = []
250 self._replies = {}
250 self._replies = {}
251
251
252 def add(self, category, entry, inreplyto=None):
252 def add(self, category, entry, inreplyto=None):
253 """add a new record of a given category.
253 """add a new record of a given category.
254
254
255 The entry can then be retrieved in the list returned by
255 The entry can then be retrieved in the list returned by
256 self['category']."""
256 self['category']."""
257 self._categories.setdefault(category, []).append(entry)
257 self._categories.setdefault(category, []).append(entry)
258 self._sequences.append((category, entry))
258 self._sequences.append((category, entry))
259 if inreplyto is not None:
259 if inreplyto is not None:
260 self.getreplies(inreplyto).add(category, entry)
260 self.getreplies(inreplyto).add(category, entry)
261
261
262 def getreplies(self, partid):
262 def getreplies(self, partid):
263 """get the records that are replies to a specific part"""
263 """get the records that are replies to a specific part"""
264 return self._replies.setdefault(partid, unbundlerecords())
264 return self._replies.setdefault(partid, unbundlerecords())
265
265
266 def __getitem__(self, cat):
266 def __getitem__(self, cat):
267 return tuple(self._categories.get(cat, ()))
267 return tuple(self._categories.get(cat, ()))
268
268
269 def __iter__(self):
269 def __iter__(self):
270 return iter(self._sequences)
270 return iter(self._sequences)
271
271
272 def __len__(self):
272 def __len__(self):
273 return len(self._sequences)
273 return len(self._sequences)
274
274
275 def __nonzero__(self):
275 def __nonzero__(self):
276 return bool(self._sequences)
276 return bool(self._sequences)
277
277
278 __bool__ = __nonzero__
278 __bool__ = __nonzero__
279
279
280 class bundleoperation(object):
280 class bundleoperation(object):
281 """an object that represents a single bundling process
281 """an object that represents a single bundling process
282
282
283 Its purpose is to carry unbundle-related objects and states.
283 Its purpose is to carry unbundle-related objects and states.
284
284
285 A new object should be created at the beginning of each bundle processing.
285 A new object should be created at the beginning of each bundle processing.
286 The object is to be returned by the processing function.
286 The object is to be returned by the processing function.
287
287
288 The object has very little content now it will ultimately contain:
288 The object has very little content now it will ultimately contain:
289 * an access to the repo the bundle is applied to,
289 * an access to the repo the bundle is applied to,
290 * a ui object,
290 * a ui object,
291 * a way to retrieve a transaction to add changes to the repo,
291 * a way to retrieve a transaction to add changes to the repo,
292 * a way to record the result of processing each part,
292 * a way to record the result of processing each part,
293 * a way to construct a bundle response when applicable.
293 * a way to construct a bundle response when applicable.
294 """
294 """
295
295
296 def __init__(self, repo, transactiongetter, captureoutput=True):
296 def __init__(self, repo, transactiongetter, captureoutput=True):
297 self.repo = repo
297 self.repo = repo
298 self.ui = repo.ui
298 self.ui = repo.ui
299 self.records = unbundlerecords()
299 self.records = unbundlerecords()
300 self.gettransaction = transactiongetter
300 self.gettransaction = transactiongetter
301 self.reply = None
301 self.reply = None
302 self.captureoutput = captureoutput
302 self.captureoutput = captureoutput
303
303
304 class TransactionUnavailable(RuntimeError):
304 class TransactionUnavailable(RuntimeError):
305 pass
305 pass
306
306
307 def _notransaction():
307 def _notransaction():
308 """default method to get a transaction while processing a bundle
308 """default method to get a transaction while processing a bundle
309
309
310 Raise an exception to highlight the fact that no transaction was expected
310 Raise an exception to highlight the fact that no transaction was expected
311 to be created"""
311 to be created"""
312 raise TransactionUnavailable()
312 raise TransactionUnavailable()
313
313
314 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
314 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
315 # transform me into unbundler.apply() as soon as the freeze is lifted
315 # transform me into unbundler.apply() as soon as the freeze is lifted
316 if isinstance(unbundler, unbundle20):
316 if isinstance(unbundler, unbundle20):
317 tr.hookargs['bundle2'] = '1'
317 tr.hookargs['bundle2'] = '1'
318 if source is not None and 'source' not in tr.hookargs:
318 if source is not None and 'source' not in tr.hookargs:
319 tr.hookargs['source'] = source
319 tr.hookargs['source'] = source
320 if url is not None and 'url' not in tr.hookargs:
320 if url is not None and 'url' not in tr.hookargs:
321 tr.hookargs['url'] = url
321 tr.hookargs['url'] = url
322 return processbundle(repo, unbundler, lambda: tr)
322 return processbundle(repo, unbundler, lambda: tr)
323 else:
323 else:
324 # the transactiongetter won't be used, but we might as well set it
324 # the transactiongetter won't be used, but we might as well set it
325 op = bundleoperation(repo, lambda: tr)
325 op = bundleoperation(repo, lambda: tr)
326 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
326 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
327 return op
327 return op
328
328
329 def processbundle(repo, unbundler, transactiongetter=None, op=None):
329 def processbundle(repo, unbundler, transactiongetter=None, op=None):
330 """This function process a bundle, apply effect to/from a repo
330 """This function process a bundle, apply effect to/from a repo
331
331
332 It iterates over each part then searches for and uses the proper handling
332 It iterates over each part then searches for and uses the proper handling
333 code to process the part. Parts are processed in order.
333 code to process the part. Parts are processed in order.
334
334
335 Unknown Mandatory part will abort the process.
335 Unknown Mandatory part will abort the process.
336
336
337 It is temporarily possible to provide a prebuilt bundleoperation to the
337 It is temporarily possible to provide a prebuilt bundleoperation to the
338 function. This is used to ensure output is properly propagated in case of
338 function. This is used to ensure output is properly propagated in case of
339 an error during the unbundling. This output capturing part will likely be
339 an error during the unbundling. This output capturing part will likely be
340 reworked and this ability will probably go away in the process.
340 reworked and this ability will probably go away in the process.
341 """
341 """
342 if op is None:
342 if op is None:
343 if transactiongetter is None:
343 if transactiongetter is None:
344 transactiongetter = _notransaction
344 transactiongetter = _notransaction
345 op = bundleoperation(repo, transactiongetter)
345 op = bundleoperation(repo, transactiongetter)
346 # todo:
346 # todo:
347 # - replace this is a init function soon.
347 # - replace this is a init function soon.
348 # - exception catching
348 # - exception catching
349 unbundler.params
349 unbundler.params
350 if repo.ui.debugflag:
350 if repo.ui.debugflag:
351 msg = ['bundle2-input-bundle:']
351 msg = ['bundle2-input-bundle:']
352 if unbundler.params:
352 if unbundler.params:
353 msg.append(' %i params' % len(unbundler.params))
353 msg.append(' %i params' % len(unbundler.params))
354 if op.gettransaction is None or op.gettransaction is _notransaction:
354 if op.gettransaction is None or op.gettransaction is _notransaction:
355 msg.append(' no-transaction')
355 msg.append(' no-transaction')
356 else:
356 else:
357 msg.append(' with-transaction')
357 msg.append(' with-transaction')
358 msg.append('\n')
358 msg.append('\n')
359 repo.ui.debug(''.join(msg))
359 repo.ui.debug(''.join(msg))
360 iterparts = enumerate(unbundler.iterparts())
360 iterparts = enumerate(unbundler.iterparts())
361 part = None
361 part = None
362 nbpart = 0
362 nbpart = 0
363 try:
363 try:
364 for nbpart, part in iterparts:
364 for nbpart, part in iterparts:
365 _processpart(op, part)
365 _processpart(op, part)
366 except Exception as exc:
366 except Exception as exc:
367 # Any exceptions seeking to the end of the bundle at this point are
367 # Any exceptions seeking to the end of the bundle at this point are
368 # almost certainly related to the underlying stream being bad.
368 # almost certainly related to the underlying stream being bad.
369 # And, chances are that the exception we're handling is related to
369 # And, chances are that the exception we're handling is related to
370 # getting in that bad state. So, we swallow the seeking error and
370 # getting in that bad state. So, we swallow the seeking error and
371 # re-raise the original error.
371 # re-raise the original error.
372 seekerror = False
372 seekerror = False
373 try:
373 try:
374 for nbpart, part in iterparts:
374 for nbpart, part in iterparts:
375 # consume the bundle content
375 # consume the bundle content
376 part.seek(0, 2)
376 part.seek(0, 2)
377 except Exception:
377 except Exception:
378 seekerror = True
378 seekerror = True
379
379
380 # Small hack to let caller code distinguish exceptions from bundle2
380 # Small hack to let caller code distinguish exceptions from bundle2
381 # processing from processing the old format. This is mostly
381 # processing from processing the old format. This is mostly
382 # needed to handle different return codes to unbundle according to the
382 # needed to handle different return codes to unbundle according to the
383 # type of bundle. We should probably clean up or drop this return code
383 # type of bundle. We should probably clean up or drop this return code
384 # craziness in a future version.
384 # craziness in a future version.
385 exc.duringunbundle2 = True
385 exc.duringunbundle2 = True
386 salvaged = []
386 salvaged = []
387 replycaps = None
387 replycaps = None
388 if op.reply is not None:
388 if op.reply is not None:
389 salvaged = op.reply.salvageoutput()
389 salvaged = op.reply.salvageoutput()
390 replycaps = op.reply.capabilities
390 replycaps = op.reply.capabilities
391 exc._replycaps = replycaps
391 exc._replycaps = replycaps
392 exc._bundle2salvagedoutput = salvaged
392 exc._bundle2salvagedoutput = salvaged
393
393
394 # Re-raising from a variable loses the original stack. So only use
394 # Re-raising from a variable loses the original stack. So only use
395 # that form if we need to.
395 # that form if we need to.
396 if seekerror:
396 if seekerror:
397 raise exc
397 raise exc
398 else:
398 else:
399 raise
399 raise
400 finally:
400 finally:
401 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
401 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
402
402
403 return op
403 return op
404
404
405 def _processchangegroup(op, cg, tr, source, url, **kwargs):
405 def _processchangegroup(op, cg, tr, source, url, **kwargs):
406 ret, addednodes = cg.apply(op.repo, tr, source, url, **kwargs)
406 ret = cg.apply(op.repo, tr, source, url, **kwargs)
407 op.records.add('changegroup', {
407 op.records.add('changegroup', {
408 'return': ret,
408 'return': ret,
409 'addednodes': addednodes,
410 })
409 })
411 return ret
410 return ret
412
411
413 def _processpart(op, part):
412 def _processpart(op, part):
414 """process a single part from a bundle
413 """process a single part from a bundle
415
414
416 The part is guaranteed to have been fully consumed when the function exits
415 The part is guaranteed to have been fully consumed when the function exits
417 (even if an exception is raised)."""
416 (even if an exception is raised)."""
418 status = 'unknown' # used by debug output
417 status = 'unknown' # used by debug output
419 hardabort = False
418 hardabort = False
420 try:
419 try:
421 try:
420 try:
422 handler = parthandlermapping.get(part.type)
421 handler = parthandlermapping.get(part.type)
423 if handler is None:
422 if handler is None:
424 status = 'unsupported-type'
423 status = 'unsupported-type'
425 raise error.BundleUnknownFeatureError(parttype=part.type)
424 raise error.BundleUnknownFeatureError(parttype=part.type)
426 indebug(op.ui, 'found a handler for part %r' % part.type)
425 indebug(op.ui, 'found a handler for part %r' % part.type)
427 unknownparams = part.mandatorykeys - handler.params
426 unknownparams = part.mandatorykeys - handler.params
428 if unknownparams:
427 if unknownparams:
429 unknownparams = list(unknownparams)
428 unknownparams = list(unknownparams)
430 unknownparams.sort()
429 unknownparams.sort()
431 status = 'unsupported-params (%s)' % unknownparams
430 status = 'unsupported-params (%s)' % unknownparams
432 raise error.BundleUnknownFeatureError(parttype=part.type,
431 raise error.BundleUnknownFeatureError(parttype=part.type,
433 params=unknownparams)
432 params=unknownparams)
434 status = 'supported'
433 status = 'supported'
435 except error.BundleUnknownFeatureError as exc:
434 except error.BundleUnknownFeatureError as exc:
436 if part.mandatory: # mandatory parts
435 if part.mandatory: # mandatory parts
437 raise
436 raise
438 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
437 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
439 return # skip to part processing
438 return # skip to part processing
440 finally:
439 finally:
441 if op.ui.debugflag:
440 if op.ui.debugflag:
442 msg = ['bundle2-input-part: "%s"' % part.type]
441 msg = ['bundle2-input-part: "%s"' % part.type]
443 if not part.mandatory:
442 if not part.mandatory:
444 msg.append(' (advisory)')
443 msg.append(' (advisory)')
445 nbmp = len(part.mandatorykeys)
444 nbmp = len(part.mandatorykeys)
446 nbap = len(part.params) - nbmp
445 nbap = len(part.params) - nbmp
447 if nbmp or nbap:
446 if nbmp or nbap:
448 msg.append(' (params:')
447 msg.append(' (params:')
449 if nbmp:
448 if nbmp:
450 msg.append(' %i mandatory' % nbmp)
449 msg.append(' %i mandatory' % nbmp)
451 if nbap:
450 if nbap:
452 msg.append(' %i advisory' % nbmp)
451 msg.append(' %i advisory' % nbmp)
453 msg.append(')')
452 msg.append(')')
454 msg.append(' %s\n' % status)
453 msg.append(' %s\n' % status)
455 op.ui.debug(''.join(msg))
454 op.ui.debug(''.join(msg))
456
455
457 # handler is called outside the above try block so that we don't
456 # handler is called outside the above try block so that we don't
458 # risk catching KeyErrors from anything other than the
457 # risk catching KeyErrors from anything other than the
459 # parthandlermapping lookup (any KeyError raised by handler()
458 # parthandlermapping lookup (any KeyError raised by handler()
460 # itself represents a defect of a different variety).
459 # itself represents a defect of a different variety).
461 output = None
460 output = None
462 if op.captureoutput and op.reply is not None:
461 if op.captureoutput and op.reply is not None:
463 op.ui.pushbuffer(error=True, subproc=True)
462 op.ui.pushbuffer(error=True, subproc=True)
464 output = ''
463 output = ''
465 try:
464 try:
466 handler(op, part)
465 handler(op, part)
467 finally:
466 finally:
468 if output is not None:
467 if output is not None:
469 output = op.ui.popbuffer()
468 output = op.ui.popbuffer()
470 if output:
469 if output:
471 outpart = op.reply.newpart('output', data=output,
470 outpart = op.reply.newpart('output', data=output,
472 mandatory=False)
471 mandatory=False)
473 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
472 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
474 # If exiting or interrupted, do not attempt to seek the stream in the
473 # If exiting or interrupted, do not attempt to seek the stream in the
475 # finally block below. This makes abort faster.
474 # finally block below. This makes abort faster.
476 except (SystemExit, KeyboardInterrupt):
475 except (SystemExit, KeyboardInterrupt):
477 hardabort = True
476 hardabort = True
478 raise
477 raise
479 finally:
478 finally:
480 # consume the part content to not corrupt the stream.
479 # consume the part content to not corrupt the stream.
481 if not hardabort:
480 if not hardabort:
482 part.seek(0, 2)
481 part.seek(0, 2)
483
482
484
483
485 def decodecaps(blob):
484 def decodecaps(blob):
486 """decode a bundle2 caps bytes blob into a dictionary
485 """decode a bundle2 caps bytes blob into a dictionary
487
486
488 The blob is a list of capabilities (one per line)
487 The blob is a list of capabilities (one per line)
489 Capabilities may have values using a line of the form::
488 Capabilities may have values using a line of the form::
490
489
491 capability=value1,value2,value3
490 capability=value1,value2,value3
492
491
493 The values are always a list."""
492 The values are always a list."""
494 caps = {}
493 caps = {}
495 for line in blob.splitlines():
494 for line in blob.splitlines():
496 if not line:
495 if not line:
497 continue
496 continue
498 if '=' not in line:
497 if '=' not in line:
499 key, vals = line, ()
498 key, vals = line, ()
500 else:
499 else:
501 key, vals = line.split('=', 1)
500 key, vals = line.split('=', 1)
502 vals = vals.split(',')
501 vals = vals.split(',')
503 key = urlreq.unquote(key)
502 key = urlreq.unquote(key)
504 vals = [urlreq.unquote(v) for v in vals]
503 vals = [urlreq.unquote(v) for v in vals]
505 caps[key] = vals
504 caps[key] = vals
506 return caps
505 return caps
507
506
508 def encodecaps(caps):
507 def encodecaps(caps):
509 """encode a bundle2 caps dictionary into a bytes blob"""
508 """encode a bundle2 caps dictionary into a bytes blob"""
510 chunks = []
509 chunks = []
511 for ca in sorted(caps):
510 for ca in sorted(caps):
512 vals = caps[ca]
511 vals = caps[ca]
513 ca = urlreq.quote(ca)
512 ca = urlreq.quote(ca)
514 vals = [urlreq.quote(v) for v in vals]
513 vals = [urlreq.quote(v) for v in vals]
515 if vals:
514 if vals:
516 ca = "%s=%s" % (ca, ','.join(vals))
515 ca = "%s=%s" % (ca, ','.join(vals))
517 chunks.append(ca)
516 chunks.append(ca)
518 return '\n'.join(chunks)
517 return '\n'.join(chunks)
519
518
520 bundletypes = {
519 bundletypes = {
521 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
520 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
522 # since the unification ssh accepts a header but there
521 # since the unification ssh accepts a header but there
523 # is no capability signaling it.
522 # is no capability signaling it.
524 "HG20": (), # special-cased below
523 "HG20": (), # special-cased below
525 "HG10UN": ("HG10UN", 'UN'),
524 "HG10UN": ("HG10UN", 'UN'),
526 "HG10BZ": ("HG10", 'BZ'),
525 "HG10BZ": ("HG10", 'BZ'),
527 "HG10GZ": ("HG10GZ", 'GZ'),
526 "HG10GZ": ("HG10GZ", 'GZ'),
528 }
527 }
529
528
530 # hgweb uses this list to communicate its preferred type
529 # hgweb uses this list to communicate its preferred type
531 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
530 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
532
531
533 class bundle20(object):
532 class bundle20(object):
534 """represent an outgoing bundle2 container
533 """represent an outgoing bundle2 container
535
534
536 Use the `addparam` method to add stream level parameter. and `newpart` to
535 Use the `addparam` method to add stream level parameter. and `newpart` to
537 populate it. Then call `getchunks` to retrieve all the binary chunks of
536 populate it. Then call `getchunks` to retrieve all the binary chunks of
538 data that compose the bundle2 container."""
537 data that compose the bundle2 container."""
539
538
540 _magicstring = 'HG20'
539 _magicstring = 'HG20'
541
540
542 def __init__(self, ui, capabilities=()):
541 def __init__(self, ui, capabilities=()):
543 self.ui = ui
542 self.ui = ui
544 self._params = []
543 self._params = []
545 self._parts = []
544 self._parts = []
546 self.capabilities = dict(capabilities)
545 self.capabilities = dict(capabilities)
547 self._compengine = util.compengines.forbundletype('UN')
546 self._compengine = util.compengines.forbundletype('UN')
548 self._compopts = None
547 self._compopts = None
549
548
550 def setcompression(self, alg, compopts=None):
549 def setcompression(self, alg, compopts=None):
551 """setup core part compression to <alg>"""
550 """setup core part compression to <alg>"""
552 if alg in (None, 'UN'):
551 if alg in (None, 'UN'):
553 return
552 return
554 assert not any(n.lower() == 'compression' for n, v in self._params)
553 assert not any(n.lower() == 'compression' for n, v in self._params)
555 self.addparam('Compression', alg)
554 self.addparam('Compression', alg)
556 self._compengine = util.compengines.forbundletype(alg)
555 self._compengine = util.compengines.forbundletype(alg)
557 self._compopts = compopts
556 self._compopts = compopts
558
557
559 @property
558 @property
560 def nbparts(self):
559 def nbparts(self):
561 """total number of parts added to the bundler"""
560 """total number of parts added to the bundler"""
562 return len(self._parts)
561 return len(self._parts)
563
562
564 # methods used to defines the bundle2 content
563 # methods used to defines the bundle2 content
565 def addparam(self, name, value=None):
564 def addparam(self, name, value=None):
566 """add a stream level parameter"""
565 """add a stream level parameter"""
567 if not name:
566 if not name:
568 raise ValueError('empty parameter name')
567 raise ValueError('empty parameter name')
569 if name[0] not in string.letters:
568 if name[0] not in string.letters:
570 raise ValueError('non letter first character: %r' % name)
569 raise ValueError('non letter first character: %r' % name)
571 self._params.append((name, value))
570 self._params.append((name, value))
572
571
573 def addpart(self, part):
572 def addpart(self, part):
574 """add a new part to the bundle2 container
573 """add a new part to the bundle2 container
575
574
576 Parts contains the actual applicative payload."""
575 Parts contains the actual applicative payload."""
577 assert part.id is None
576 assert part.id is None
578 part.id = len(self._parts) # very cheap counter
577 part.id = len(self._parts) # very cheap counter
579 self._parts.append(part)
578 self._parts.append(part)
580
579
581 def newpart(self, typeid, *args, **kwargs):
580 def newpart(self, typeid, *args, **kwargs):
582 """create a new part and add it to the containers
581 """create a new part and add it to the containers
583
582
584 As the part is directly added to the containers. For now, this means
583 As the part is directly added to the containers. For now, this means
585 that any failure to properly initialize the part after calling
584 that any failure to properly initialize the part after calling
586 ``newpart`` should result in a failure of the whole bundling process.
585 ``newpart`` should result in a failure of the whole bundling process.
587
586
588 You can still fall back to manually create and add if you need better
587 You can still fall back to manually create and add if you need better
589 control."""
588 control."""
590 part = bundlepart(typeid, *args, **kwargs)
589 part = bundlepart(typeid, *args, **kwargs)
591 self.addpart(part)
590 self.addpart(part)
592 return part
591 return part
593
592
594 # methods used to generate the bundle2 stream
593 # methods used to generate the bundle2 stream
595 def getchunks(self):
594 def getchunks(self):
596 if self.ui.debugflag:
595 if self.ui.debugflag:
597 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
596 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
598 if self._params:
597 if self._params:
599 msg.append(' (%i params)' % len(self._params))
598 msg.append(' (%i params)' % len(self._params))
600 msg.append(' %i parts total\n' % len(self._parts))
599 msg.append(' %i parts total\n' % len(self._parts))
601 self.ui.debug(''.join(msg))
600 self.ui.debug(''.join(msg))
602 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
601 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
603 yield self._magicstring
602 yield self._magicstring
604 param = self._paramchunk()
603 param = self._paramchunk()
605 outdebug(self.ui, 'bundle parameter: %s' % param)
604 outdebug(self.ui, 'bundle parameter: %s' % param)
606 yield _pack(_fstreamparamsize, len(param))
605 yield _pack(_fstreamparamsize, len(param))
607 if param:
606 if param:
608 yield param
607 yield param
609 for chunk in self._compengine.compressstream(self._getcorechunk(),
608 for chunk in self._compengine.compressstream(self._getcorechunk(),
610 self._compopts):
609 self._compopts):
611 yield chunk
610 yield chunk
612
611
613 def _paramchunk(self):
612 def _paramchunk(self):
614 """return a encoded version of all stream parameters"""
613 """return a encoded version of all stream parameters"""
615 blocks = []
614 blocks = []
616 for par, value in self._params:
615 for par, value in self._params:
617 par = urlreq.quote(par)
616 par = urlreq.quote(par)
618 if value is not None:
617 if value is not None:
619 value = urlreq.quote(value)
618 value = urlreq.quote(value)
620 par = '%s=%s' % (par, value)
619 par = '%s=%s' % (par, value)
621 blocks.append(par)
620 blocks.append(par)
622 return ' '.join(blocks)
621 return ' '.join(blocks)
623
622
624 def _getcorechunk(self):
623 def _getcorechunk(self):
625 """yield chunk for the core part of the bundle
624 """yield chunk for the core part of the bundle
626
625
627 (all but headers and parameters)"""
626 (all but headers and parameters)"""
628 outdebug(self.ui, 'start of parts')
627 outdebug(self.ui, 'start of parts')
629 for part in self._parts:
628 for part in self._parts:
630 outdebug(self.ui, 'bundle part: "%s"' % part.type)
629 outdebug(self.ui, 'bundle part: "%s"' % part.type)
631 for chunk in part.getchunks(ui=self.ui):
630 for chunk in part.getchunks(ui=self.ui):
632 yield chunk
631 yield chunk
633 outdebug(self.ui, 'end of bundle')
632 outdebug(self.ui, 'end of bundle')
634 yield _pack(_fpartheadersize, 0)
633 yield _pack(_fpartheadersize, 0)
635
634
636
635
637 def salvageoutput(self):
636 def salvageoutput(self):
638 """return a list with a copy of all output parts in the bundle
637 """return a list with a copy of all output parts in the bundle
639
638
640 This is meant to be used during error handling to make sure we preserve
639 This is meant to be used during error handling to make sure we preserve
641 server output"""
640 server output"""
642 salvaged = []
641 salvaged = []
643 for part in self._parts:
642 for part in self._parts:
644 if part.type.startswith('output'):
643 if part.type.startswith('output'):
645 salvaged.append(part.copy())
644 salvaged.append(part.copy())
646 return salvaged
645 return salvaged
647
646
648
647
649 class unpackermixin(object):
648 class unpackermixin(object):
650 """A mixin to extract bytes and struct data from a stream"""
649 """A mixin to extract bytes and struct data from a stream"""
651
650
652 def __init__(self, fp):
651 def __init__(self, fp):
653 self._fp = fp
652 self._fp = fp
654
653
655 def _unpack(self, format):
654 def _unpack(self, format):
656 """unpack this struct format from the stream
655 """unpack this struct format from the stream
657
656
658 This method is meant for internal usage by the bundle2 protocol only.
657 This method is meant for internal usage by the bundle2 protocol only.
659 They directly manipulate the low level stream including bundle2 level
658 They directly manipulate the low level stream including bundle2 level
660 instruction.
659 instruction.
661
660
662 Do not use it to implement higher-level logic or methods."""
661 Do not use it to implement higher-level logic or methods."""
663 data = self._readexact(struct.calcsize(format))
662 data = self._readexact(struct.calcsize(format))
664 return _unpack(format, data)
663 return _unpack(format, data)
665
664
666 def _readexact(self, size):
665 def _readexact(self, size):
667 """read exactly <size> bytes from the stream
666 """read exactly <size> bytes from the stream
668
667
669 This method is meant for internal usage by the bundle2 protocol only.
668 This method is meant for internal usage by the bundle2 protocol only.
670 They directly manipulate the low level stream including bundle2 level
669 They directly manipulate the low level stream including bundle2 level
671 instruction.
670 instruction.
672
671
673 Do not use it to implement higher-level logic or methods."""
672 Do not use it to implement higher-level logic or methods."""
674 return changegroup.readexactly(self._fp, size)
673 return changegroup.readexactly(self._fp, size)
675
674
676 def getunbundler(ui, fp, magicstring=None):
675 def getunbundler(ui, fp, magicstring=None):
677 """return a valid unbundler object for a given magicstring"""
676 """return a valid unbundler object for a given magicstring"""
678 if magicstring is None:
677 if magicstring is None:
679 magicstring = changegroup.readexactly(fp, 4)
678 magicstring = changegroup.readexactly(fp, 4)
680 magic, version = magicstring[0:2], magicstring[2:4]
679 magic, version = magicstring[0:2], magicstring[2:4]
681 if magic != 'HG':
680 if magic != 'HG':
682 ui.debug(
681 ui.debug(
683 "error: invalid magic: %r (version %r), should be 'HG'\n"
682 "error: invalid magic: %r (version %r), should be 'HG'\n"
684 % (magic, version))
683 % (magic, version))
685 raise error.Abort(_('not a Mercurial bundle'))
684 raise error.Abort(_('not a Mercurial bundle'))
686 unbundlerclass = formatmap.get(version)
685 unbundlerclass = formatmap.get(version)
687 if unbundlerclass is None:
686 if unbundlerclass is None:
688 raise error.Abort(_('unknown bundle version %s') % version)
687 raise error.Abort(_('unknown bundle version %s') % version)
689 unbundler = unbundlerclass(ui, fp)
688 unbundler = unbundlerclass(ui, fp)
690 indebug(ui, 'start processing of %s stream' % magicstring)
689 indebug(ui, 'start processing of %s stream' % magicstring)
691 return unbundler
690 return unbundler
692
691
693 class unbundle20(unpackermixin):
692 class unbundle20(unpackermixin):
694 """interpret a bundle2 stream
693 """interpret a bundle2 stream
695
694
696 This class is fed with a binary stream and yields parts through its
695 This class is fed with a binary stream and yields parts through its
697 `iterparts` methods."""
696 `iterparts` methods."""
698
697
699 _magicstring = 'HG20'
698 _magicstring = 'HG20'
700
699
701 def __init__(self, ui, fp):
700 def __init__(self, ui, fp):
702 """If header is specified, we do not read it out of the stream."""
701 """If header is specified, we do not read it out of the stream."""
703 self.ui = ui
702 self.ui = ui
704 self._compengine = util.compengines.forbundletype('UN')
703 self._compengine = util.compengines.forbundletype('UN')
705 self._compressed = None
704 self._compressed = None
706 super(unbundle20, self).__init__(fp)
705 super(unbundle20, self).__init__(fp)
707
706
708 @util.propertycache
707 @util.propertycache
709 def params(self):
708 def params(self):
710 """dictionary of stream level parameters"""
709 """dictionary of stream level parameters"""
711 indebug(self.ui, 'reading bundle2 stream parameters')
710 indebug(self.ui, 'reading bundle2 stream parameters')
712 params = {}
711 params = {}
713 paramssize = self._unpack(_fstreamparamsize)[0]
712 paramssize = self._unpack(_fstreamparamsize)[0]
714 if paramssize < 0:
713 if paramssize < 0:
715 raise error.BundleValueError('negative bundle param size: %i'
714 raise error.BundleValueError('negative bundle param size: %i'
716 % paramssize)
715 % paramssize)
717 if paramssize:
716 if paramssize:
718 params = self._readexact(paramssize)
717 params = self._readexact(paramssize)
719 params = self._processallparams(params)
718 params = self._processallparams(params)
720 return params
719 return params
721
720
722 def _processallparams(self, paramsblock):
721 def _processallparams(self, paramsblock):
723 """"""
722 """"""
724 params = util.sortdict()
723 params = util.sortdict()
725 for p in paramsblock.split(' '):
724 for p in paramsblock.split(' '):
726 p = p.split('=', 1)
725 p = p.split('=', 1)
727 p = [urlreq.unquote(i) for i in p]
726 p = [urlreq.unquote(i) for i in p]
728 if len(p) < 2:
727 if len(p) < 2:
729 p.append(None)
728 p.append(None)
730 self._processparam(*p)
729 self._processparam(*p)
731 params[p[0]] = p[1]
730 params[p[0]] = p[1]
732 return params
731 return params
733
732
734
733
735 def _processparam(self, name, value):
734 def _processparam(self, name, value):
736 """process a parameter, applying its effect if needed
735 """process a parameter, applying its effect if needed
737
736
738 Parameter starting with a lower case letter are advisory and will be
737 Parameter starting with a lower case letter are advisory and will be
739 ignored when unknown. Those starting with an upper case letter are
738 ignored when unknown. Those starting with an upper case letter are
740 mandatory and will this function will raise a KeyError when unknown.
739 mandatory and will this function will raise a KeyError when unknown.
741
740
742 Note: no option are currently supported. Any input will be either
741 Note: no option are currently supported. Any input will be either
743 ignored or failing.
742 ignored or failing.
744 """
743 """
745 if not name:
744 if not name:
746 raise ValueError('empty parameter name')
745 raise ValueError('empty parameter name')
747 if name[0] not in string.letters:
746 if name[0] not in string.letters:
748 raise ValueError('non letter first character: %r' % name)
747 raise ValueError('non letter first character: %r' % name)
749 try:
748 try:
750 handler = b2streamparamsmap[name.lower()]
749 handler = b2streamparamsmap[name.lower()]
751 except KeyError:
750 except KeyError:
752 if name[0].islower():
751 if name[0].islower():
753 indebug(self.ui, "ignoring unknown parameter %r" % name)
752 indebug(self.ui, "ignoring unknown parameter %r" % name)
754 else:
753 else:
755 raise error.BundleUnknownFeatureError(params=(name,))
754 raise error.BundleUnknownFeatureError(params=(name,))
756 else:
755 else:
757 handler(self, name, value)
756 handler(self, name, value)
758
757
759 def _forwardchunks(self):
758 def _forwardchunks(self):
760 """utility to transfer a bundle2 as binary
759 """utility to transfer a bundle2 as binary
761
760
762 This is made necessary by the fact the 'getbundle' command over 'ssh'
761 This is made necessary by the fact the 'getbundle' command over 'ssh'
763 have no way to know then the reply end, relying on the bundle to be
762 have no way to know then the reply end, relying on the bundle to be
764 interpreted to know its end. This is terrible and we are sorry, but we
763 interpreted to know its end. This is terrible and we are sorry, but we
765 needed to move forward to get general delta enabled.
764 needed to move forward to get general delta enabled.
766 """
765 """
767 yield self._magicstring
766 yield self._magicstring
768 assert 'params' not in vars(self)
767 assert 'params' not in vars(self)
769 paramssize = self._unpack(_fstreamparamsize)[0]
768 paramssize = self._unpack(_fstreamparamsize)[0]
770 if paramssize < 0:
769 if paramssize < 0:
771 raise error.BundleValueError('negative bundle param size: %i'
770 raise error.BundleValueError('negative bundle param size: %i'
772 % paramssize)
771 % paramssize)
773 yield _pack(_fstreamparamsize, paramssize)
772 yield _pack(_fstreamparamsize, paramssize)
774 if paramssize:
773 if paramssize:
775 params = self._readexact(paramssize)
774 params = self._readexact(paramssize)
776 self._processallparams(params)
775 self._processallparams(params)
777 yield params
776 yield params
778 assert self._compengine.bundletype == 'UN'
777 assert self._compengine.bundletype == 'UN'
779 # From there, payload might need to be decompressed
778 # From there, payload might need to be decompressed
780 self._fp = self._compengine.decompressorreader(self._fp)
779 self._fp = self._compengine.decompressorreader(self._fp)
781 emptycount = 0
780 emptycount = 0
782 while emptycount < 2:
781 while emptycount < 2:
783 # so we can brainlessly loop
782 # so we can brainlessly loop
784 assert _fpartheadersize == _fpayloadsize
783 assert _fpartheadersize == _fpayloadsize
785 size = self._unpack(_fpartheadersize)[0]
784 size = self._unpack(_fpartheadersize)[0]
786 yield _pack(_fpartheadersize, size)
785 yield _pack(_fpartheadersize, size)
787 if size:
786 if size:
788 emptycount = 0
787 emptycount = 0
789 else:
788 else:
790 emptycount += 1
789 emptycount += 1
791 continue
790 continue
792 if size == flaginterrupt:
791 if size == flaginterrupt:
793 continue
792 continue
794 elif size < 0:
793 elif size < 0:
795 raise error.BundleValueError('negative chunk size: %i')
794 raise error.BundleValueError('negative chunk size: %i')
796 yield self._readexact(size)
795 yield self._readexact(size)
797
796
798
797
799 def iterparts(self):
798 def iterparts(self):
800 """yield all parts contained in the stream"""
799 """yield all parts contained in the stream"""
801 # make sure param have been loaded
800 # make sure param have been loaded
802 self.params
801 self.params
803 # From there, payload need to be decompressed
802 # From there, payload need to be decompressed
804 self._fp = self._compengine.decompressorreader(self._fp)
803 self._fp = self._compengine.decompressorreader(self._fp)
805 indebug(self.ui, 'start extraction of bundle2 parts')
804 indebug(self.ui, 'start extraction of bundle2 parts')
806 headerblock = self._readpartheader()
805 headerblock = self._readpartheader()
807 while headerblock is not None:
806 while headerblock is not None:
808 part = unbundlepart(self.ui, headerblock, self._fp)
807 part = unbundlepart(self.ui, headerblock, self._fp)
809 yield part
808 yield part
810 part.seek(0, 2)
809 part.seek(0, 2)
811 headerblock = self._readpartheader()
810 headerblock = self._readpartheader()
812 indebug(self.ui, 'end of bundle2 stream')
811 indebug(self.ui, 'end of bundle2 stream')
813
812
814 def _readpartheader(self):
813 def _readpartheader(self):
815 """reads a part header size and return the bytes blob
814 """reads a part header size and return the bytes blob
816
815
817 returns None if empty"""
816 returns None if empty"""
818 headersize = self._unpack(_fpartheadersize)[0]
817 headersize = self._unpack(_fpartheadersize)[0]
819 if headersize < 0:
818 if headersize < 0:
820 raise error.BundleValueError('negative part header size: %i'
819 raise error.BundleValueError('negative part header size: %i'
821 % headersize)
820 % headersize)
822 indebug(self.ui, 'part header size: %i' % headersize)
821 indebug(self.ui, 'part header size: %i' % headersize)
823 if headersize:
822 if headersize:
824 return self._readexact(headersize)
823 return self._readexact(headersize)
825 return None
824 return None
826
825
827 def compressed(self):
826 def compressed(self):
828 self.params # load params
827 self.params # load params
829 return self._compressed
828 return self._compressed
830
829
831 def close(self):
830 def close(self):
832 """close underlying file"""
831 """close underlying file"""
833 if util.safehasattr(self._fp, 'close'):
832 if util.safehasattr(self._fp, 'close'):
834 return self._fp.close()
833 return self._fp.close()
835
834
836 formatmap = {'20': unbundle20}
835 formatmap = {'20': unbundle20}
837
836
838 b2streamparamsmap = {}
837 b2streamparamsmap = {}
839
838
840 def b2streamparamhandler(name):
839 def b2streamparamhandler(name):
841 """register a handler for a stream level parameter"""
840 """register a handler for a stream level parameter"""
842 def decorator(func):
841 def decorator(func):
843 assert name not in formatmap
842 assert name not in formatmap
844 b2streamparamsmap[name] = func
843 b2streamparamsmap[name] = func
845 return func
844 return func
846 return decorator
845 return decorator
847
846
848 @b2streamparamhandler('compression')
847 @b2streamparamhandler('compression')
849 def processcompression(unbundler, param, value):
848 def processcompression(unbundler, param, value):
850 """read compression parameter and install payload decompression"""
849 """read compression parameter and install payload decompression"""
851 if value not in util.compengines.supportedbundletypes:
850 if value not in util.compengines.supportedbundletypes:
852 raise error.BundleUnknownFeatureError(params=(param,),
851 raise error.BundleUnknownFeatureError(params=(param,),
853 values=(value,))
852 values=(value,))
854 unbundler._compengine = util.compengines.forbundletype(value)
853 unbundler._compengine = util.compengines.forbundletype(value)
855 if value is not None:
854 if value is not None:
856 unbundler._compressed = True
855 unbundler._compressed = True
857
856
858 class bundlepart(object):
857 class bundlepart(object):
859 """A bundle2 part contains application level payload
858 """A bundle2 part contains application level payload
860
859
861 The part `type` is used to route the part to the application level
860 The part `type` is used to route the part to the application level
862 handler.
861 handler.
863
862
864 The part payload is contained in ``part.data``. It could be raw bytes or a
863 The part payload is contained in ``part.data``. It could be raw bytes or a
865 generator of byte chunks.
864 generator of byte chunks.
866
865
867 You can add parameters to the part using the ``addparam`` method.
866 You can add parameters to the part using the ``addparam`` method.
868 Parameters can be either mandatory (default) or advisory. Remote side
867 Parameters can be either mandatory (default) or advisory. Remote side
869 should be able to safely ignore the advisory ones.
868 should be able to safely ignore the advisory ones.
870
869
871 Both data and parameters cannot be modified after the generation has begun.
870 Both data and parameters cannot be modified after the generation has begun.
872 """
871 """
873
872
874 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
873 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
875 data='', mandatory=True):
874 data='', mandatory=True):
876 validateparttype(parttype)
875 validateparttype(parttype)
877 self.id = None
876 self.id = None
878 self.type = parttype
877 self.type = parttype
879 self._data = data
878 self._data = data
880 self._mandatoryparams = list(mandatoryparams)
879 self._mandatoryparams = list(mandatoryparams)
881 self._advisoryparams = list(advisoryparams)
880 self._advisoryparams = list(advisoryparams)
882 # checking for duplicated entries
881 # checking for duplicated entries
883 self._seenparams = set()
882 self._seenparams = set()
884 for pname, __ in self._mandatoryparams + self._advisoryparams:
883 for pname, __ in self._mandatoryparams + self._advisoryparams:
885 if pname in self._seenparams:
884 if pname in self._seenparams:
886 raise error.ProgrammingError('duplicated params: %s' % pname)
885 raise error.ProgrammingError('duplicated params: %s' % pname)
887 self._seenparams.add(pname)
886 self._seenparams.add(pname)
888 # status of the part's generation:
887 # status of the part's generation:
889 # - None: not started,
888 # - None: not started,
890 # - False: currently generated,
889 # - False: currently generated,
891 # - True: generation done.
890 # - True: generation done.
892 self._generated = None
891 self._generated = None
893 self.mandatory = mandatory
892 self.mandatory = mandatory
894
893
895 def __repr__(self):
894 def __repr__(self):
896 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
895 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
897 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
896 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
898 % (cls, id(self), self.id, self.type, self.mandatory))
897 % (cls, id(self), self.id, self.type, self.mandatory))
899
898
900 def copy(self):
899 def copy(self):
901 """return a copy of the part
900 """return a copy of the part
902
901
903 The new part have the very same content but no partid assigned yet.
902 The new part have the very same content but no partid assigned yet.
904 Parts with generated data cannot be copied."""
903 Parts with generated data cannot be copied."""
905 assert not util.safehasattr(self.data, 'next')
904 assert not util.safehasattr(self.data, 'next')
906 return self.__class__(self.type, self._mandatoryparams,
905 return self.__class__(self.type, self._mandatoryparams,
907 self._advisoryparams, self._data, self.mandatory)
906 self._advisoryparams, self._data, self.mandatory)
908
907
909 # methods used to defines the part content
908 # methods used to defines the part content
910 @property
909 @property
911 def data(self):
910 def data(self):
912 return self._data
911 return self._data
913
912
914 @data.setter
913 @data.setter
915 def data(self, data):
914 def data(self, data):
916 if self._generated is not None:
915 if self._generated is not None:
917 raise error.ReadOnlyPartError('part is being generated')
916 raise error.ReadOnlyPartError('part is being generated')
918 self._data = data
917 self._data = data
919
918
920 @property
919 @property
921 def mandatoryparams(self):
920 def mandatoryparams(self):
922 # make it an immutable tuple to force people through ``addparam``
921 # make it an immutable tuple to force people through ``addparam``
923 return tuple(self._mandatoryparams)
922 return tuple(self._mandatoryparams)
924
923
925 @property
924 @property
926 def advisoryparams(self):
925 def advisoryparams(self):
927 # make it an immutable tuple to force people through ``addparam``
926 # make it an immutable tuple to force people through ``addparam``
928 return tuple(self._advisoryparams)
927 return tuple(self._advisoryparams)
929
928
930 def addparam(self, name, value='', mandatory=True):
929 def addparam(self, name, value='', mandatory=True):
931 """add a parameter to the part
930 """add a parameter to the part
932
931
933 If 'mandatory' is set to True, the remote handler must claim support
932 If 'mandatory' is set to True, the remote handler must claim support
934 for this parameter or the unbundling will be aborted.
933 for this parameter or the unbundling will be aborted.
935
934
936 The 'name' and 'value' cannot exceed 255 bytes each.
935 The 'name' and 'value' cannot exceed 255 bytes each.
937 """
936 """
938 if self._generated is not None:
937 if self._generated is not None:
939 raise error.ReadOnlyPartError('part is being generated')
938 raise error.ReadOnlyPartError('part is being generated')
940 if name in self._seenparams:
939 if name in self._seenparams:
941 raise ValueError('duplicated params: %s' % name)
940 raise ValueError('duplicated params: %s' % name)
942 self._seenparams.add(name)
941 self._seenparams.add(name)
943 params = self._advisoryparams
942 params = self._advisoryparams
944 if mandatory:
943 if mandatory:
945 params = self._mandatoryparams
944 params = self._mandatoryparams
946 params.append((name, value))
945 params.append((name, value))
947
946
948 # methods used to generates the bundle2 stream
947 # methods used to generates the bundle2 stream
949 def getchunks(self, ui):
948 def getchunks(self, ui):
950 if self._generated is not None:
949 if self._generated is not None:
951 raise error.ProgrammingError('part can only be consumed once')
950 raise error.ProgrammingError('part can only be consumed once')
952 self._generated = False
951 self._generated = False
953
952
954 if ui.debugflag:
953 if ui.debugflag:
955 msg = ['bundle2-output-part: "%s"' % self.type]
954 msg = ['bundle2-output-part: "%s"' % self.type]
956 if not self.mandatory:
955 if not self.mandatory:
957 msg.append(' (advisory)')
956 msg.append(' (advisory)')
958 nbmp = len(self.mandatoryparams)
957 nbmp = len(self.mandatoryparams)
959 nbap = len(self.advisoryparams)
958 nbap = len(self.advisoryparams)
960 if nbmp or nbap:
959 if nbmp or nbap:
961 msg.append(' (params:')
960 msg.append(' (params:')
962 if nbmp:
961 if nbmp:
963 msg.append(' %i mandatory' % nbmp)
962 msg.append(' %i mandatory' % nbmp)
964 if nbap:
963 if nbap:
965 msg.append(' %i advisory' % nbmp)
964 msg.append(' %i advisory' % nbmp)
966 msg.append(')')
965 msg.append(')')
967 if not self.data:
966 if not self.data:
968 msg.append(' empty payload')
967 msg.append(' empty payload')
969 elif util.safehasattr(self.data, 'next'):
968 elif util.safehasattr(self.data, 'next'):
970 msg.append(' streamed payload')
969 msg.append(' streamed payload')
971 else:
970 else:
972 msg.append(' %i bytes payload' % len(self.data))
971 msg.append(' %i bytes payload' % len(self.data))
973 msg.append('\n')
972 msg.append('\n')
974 ui.debug(''.join(msg))
973 ui.debug(''.join(msg))
975
974
976 #### header
975 #### header
977 if self.mandatory:
976 if self.mandatory:
978 parttype = self.type.upper()
977 parttype = self.type.upper()
979 else:
978 else:
980 parttype = self.type.lower()
979 parttype = self.type.lower()
981 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
980 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
982 ## parttype
981 ## parttype
983 header = [_pack(_fparttypesize, len(parttype)),
982 header = [_pack(_fparttypesize, len(parttype)),
984 parttype, _pack(_fpartid, self.id),
983 parttype, _pack(_fpartid, self.id),
985 ]
984 ]
986 ## parameters
985 ## parameters
987 # count
986 # count
988 manpar = self.mandatoryparams
987 manpar = self.mandatoryparams
989 advpar = self.advisoryparams
988 advpar = self.advisoryparams
990 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
989 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
991 # size
990 # size
992 parsizes = []
991 parsizes = []
993 for key, value in manpar:
992 for key, value in manpar:
994 parsizes.append(len(key))
993 parsizes.append(len(key))
995 parsizes.append(len(value))
994 parsizes.append(len(value))
996 for key, value in advpar:
995 for key, value in advpar:
997 parsizes.append(len(key))
996 parsizes.append(len(key))
998 parsizes.append(len(value))
997 parsizes.append(len(value))
999 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
998 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
1000 header.append(paramsizes)
999 header.append(paramsizes)
1001 # key, value
1000 # key, value
1002 for key, value in manpar:
1001 for key, value in manpar:
1003 header.append(key)
1002 header.append(key)
1004 header.append(value)
1003 header.append(value)
1005 for key, value in advpar:
1004 for key, value in advpar:
1006 header.append(key)
1005 header.append(key)
1007 header.append(value)
1006 header.append(value)
1008 ## finalize header
1007 ## finalize header
1009 headerchunk = ''.join(header)
1008 headerchunk = ''.join(header)
1010 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1009 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1011 yield _pack(_fpartheadersize, len(headerchunk))
1010 yield _pack(_fpartheadersize, len(headerchunk))
1012 yield headerchunk
1011 yield headerchunk
1013 ## payload
1012 ## payload
1014 try:
1013 try:
1015 for chunk in self._payloadchunks():
1014 for chunk in self._payloadchunks():
1016 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1015 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1017 yield _pack(_fpayloadsize, len(chunk))
1016 yield _pack(_fpayloadsize, len(chunk))
1018 yield chunk
1017 yield chunk
1019 except GeneratorExit:
1018 except GeneratorExit:
1020 # GeneratorExit means that nobody is listening for our
1019 # GeneratorExit means that nobody is listening for our
1021 # results anyway, so just bail quickly rather than trying
1020 # results anyway, so just bail quickly rather than trying
1022 # to produce an error part.
1021 # to produce an error part.
1023 ui.debug('bundle2-generatorexit\n')
1022 ui.debug('bundle2-generatorexit\n')
1024 raise
1023 raise
1025 except BaseException as exc:
1024 except BaseException as exc:
1026 # backup exception data for later
1025 # backup exception data for later
1027 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1026 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1028 % exc)
1027 % exc)
1029 tb = sys.exc_info()[2]
1028 tb = sys.exc_info()[2]
1030 msg = 'unexpected error: %s' % exc
1029 msg = 'unexpected error: %s' % exc
1031 interpart = bundlepart('error:abort', [('message', msg)],
1030 interpart = bundlepart('error:abort', [('message', msg)],
1032 mandatory=False)
1031 mandatory=False)
1033 interpart.id = 0
1032 interpart.id = 0
1034 yield _pack(_fpayloadsize, -1)
1033 yield _pack(_fpayloadsize, -1)
1035 for chunk in interpart.getchunks(ui=ui):
1034 for chunk in interpart.getchunks(ui=ui):
1036 yield chunk
1035 yield chunk
1037 outdebug(ui, 'closing payload chunk')
1036 outdebug(ui, 'closing payload chunk')
1038 # abort current part payload
1037 # abort current part payload
1039 yield _pack(_fpayloadsize, 0)
1038 yield _pack(_fpayloadsize, 0)
1040 pycompat.raisewithtb(exc, tb)
1039 pycompat.raisewithtb(exc, tb)
1041 # end of payload
1040 # end of payload
1042 outdebug(ui, 'closing payload chunk')
1041 outdebug(ui, 'closing payload chunk')
1043 yield _pack(_fpayloadsize, 0)
1042 yield _pack(_fpayloadsize, 0)
1044 self._generated = True
1043 self._generated = True
1045
1044
1046 def _payloadchunks(self):
1045 def _payloadchunks(self):
1047 """yield chunks of a the part payload
1046 """yield chunks of a the part payload
1048
1047
1049 Exists to handle the different methods to provide data to a part."""
1048 Exists to handle the different methods to provide data to a part."""
1050 # we only support fixed size data now.
1049 # we only support fixed size data now.
1051 # This will be improved in the future.
1050 # This will be improved in the future.
1052 if util.safehasattr(self.data, 'next'):
1051 if util.safehasattr(self.data, 'next'):
1053 buff = util.chunkbuffer(self.data)
1052 buff = util.chunkbuffer(self.data)
1054 chunk = buff.read(preferedchunksize)
1053 chunk = buff.read(preferedchunksize)
1055 while chunk:
1054 while chunk:
1056 yield chunk
1055 yield chunk
1057 chunk = buff.read(preferedchunksize)
1056 chunk = buff.read(preferedchunksize)
1058 elif len(self.data):
1057 elif len(self.data):
1059 yield self.data
1058 yield self.data
1060
1059
1061
1060
1062 flaginterrupt = -1
1061 flaginterrupt = -1
1063
1062
1064 class interrupthandler(unpackermixin):
1063 class interrupthandler(unpackermixin):
1065 """read one part and process it with restricted capability
1064 """read one part and process it with restricted capability
1066
1065
1067 This allows to transmit exception raised on the producer size during part
1066 This allows to transmit exception raised on the producer size during part
1068 iteration while the consumer is reading a part.
1067 iteration while the consumer is reading a part.
1069
1068
1070 Part processed in this manner only have access to a ui object,"""
1069 Part processed in this manner only have access to a ui object,"""
1071
1070
1072 def __init__(self, ui, fp):
1071 def __init__(self, ui, fp):
1073 super(interrupthandler, self).__init__(fp)
1072 super(interrupthandler, self).__init__(fp)
1074 self.ui = ui
1073 self.ui = ui
1075
1074
1076 def _readpartheader(self):
1075 def _readpartheader(self):
1077 """reads a part header size and return the bytes blob
1076 """reads a part header size and return the bytes blob
1078
1077
1079 returns None if empty"""
1078 returns None if empty"""
1080 headersize = self._unpack(_fpartheadersize)[0]
1079 headersize = self._unpack(_fpartheadersize)[0]
1081 if headersize < 0:
1080 if headersize < 0:
1082 raise error.BundleValueError('negative part header size: %i'
1081 raise error.BundleValueError('negative part header size: %i'
1083 % headersize)
1082 % headersize)
1084 indebug(self.ui, 'part header size: %i\n' % headersize)
1083 indebug(self.ui, 'part header size: %i\n' % headersize)
1085 if headersize:
1084 if headersize:
1086 return self._readexact(headersize)
1085 return self._readexact(headersize)
1087 return None
1086 return None
1088
1087
1089 def __call__(self):
1088 def __call__(self):
1090
1089
1091 self.ui.debug('bundle2-input-stream-interrupt:'
1090 self.ui.debug('bundle2-input-stream-interrupt:'
1092 ' opening out of band context\n')
1091 ' opening out of band context\n')
1093 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1092 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1094 headerblock = self._readpartheader()
1093 headerblock = self._readpartheader()
1095 if headerblock is None:
1094 if headerblock is None:
1096 indebug(self.ui, 'no part found during interruption.')
1095 indebug(self.ui, 'no part found during interruption.')
1097 return
1096 return
1098 part = unbundlepart(self.ui, headerblock, self._fp)
1097 part = unbundlepart(self.ui, headerblock, self._fp)
1099 op = interruptoperation(self.ui)
1098 op = interruptoperation(self.ui)
1100 _processpart(op, part)
1099 _processpart(op, part)
1101 self.ui.debug('bundle2-input-stream-interrupt:'
1100 self.ui.debug('bundle2-input-stream-interrupt:'
1102 ' closing out of band context\n')
1101 ' closing out of band context\n')
1103
1102
1104 class interruptoperation(object):
1103 class interruptoperation(object):
1105 """A limited operation to be use by part handler during interruption
1104 """A limited operation to be use by part handler during interruption
1106
1105
1107 It only have access to an ui object.
1106 It only have access to an ui object.
1108 """
1107 """
1109
1108
1110 def __init__(self, ui):
1109 def __init__(self, ui):
1111 self.ui = ui
1110 self.ui = ui
1112 self.reply = None
1111 self.reply = None
1113 self.captureoutput = False
1112 self.captureoutput = False
1114
1113
1115 @property
1114 @property
1116 def repo(self):
1115 def repo(self):
1117 raise error.ProgrammingError('no repo access from stream interruption')
1116 raise error.ProgrammingError('no repo access from stream interruption')
1118
1117
1119 def gettransaction(self):
1118 def gettransaction(self):
1120 raise TransactionUnavailable('no repo access from stream interruption')
1119 raise TransactionUnavailable('no repo access from stream interruption')
1121
1120
1122 class unbundlepart(unpackermixin):
1121 class unbundlepart(unpackermixin):
1123 """a bundle part read from a bundle"""
1122 """a bundle part read from a bundle"""
1124
1123
1125 def __init__(self, ui, header, fp):
1124 def __init__(self, ui, header, fp):
1126 super(unbundlepart, self).__init__(fp)
1125 super(unbundlepart, self).__init__(fp)
1127 self._seekable = (util.safehasattr(fp, 'seek') and
1126 self._seekable = (util.safehasattr(fp, 'seek') and
1128 util.safehasattr(fp, 'tell'))
1127 util.safehasattr(fp, 'tell'))
1129 self.ui = ui
1128 self.ui = ui
1130 # unbundle state attr
1129 # unbundle state attr
1131 self._headerdata = header
1130 self._headerdata = header
1132 self._headeroffset = 0
1131 self._headeroffset = 0
1133 self._initialized = False
1132 self._initialized = False
1134 self.consumed = False
1133 self.consumed = False
1135 # part data
1134 # part data
1136 self.id = None
1135 self.id = None
1137 self.type = None
1136 self.type = None
1138 self.mandatoryparams = None
1137 self.mandatoryparams = None
1139 self.advisoryparams = None
1138 self.advisoryparams = None
1140 self.params = None
1139 self.params = None
1141 self.mandatorykeys = ()
1140 self.mandatorykeys = ()
1142 self._payloadstream = None
1141 self._payloadstream = None
1143 self._readheader()
1142 self._readheader()
1144 self._mandatory = None
1143 self._mandatory = None
1145 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1144 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1146 self._pos = 0
1145 self._pos = 0
1147
1146
1148 def _fromheader(self, size):
1147 def _fromheader(self, size):
1149 """return the next <size> byte from the header"""
1148 """return the next <size> byte from the header"""
1150 offset = self._headeroffset
1149 offset = self._headeroffset
1151 data = self._headerdata[offset:(offset + size)]
1150 data = self._headerdata[offset:(offset + size)]
1152 self._headeroffset = offset + size
1151 self._headeroffset = offset + size
1153 return data
1152 return data
1154
1153
1155 def _unpackheader(self, format):
1154 def _unpackheader(self, format):
1156 """read given format from header
1155 """read given format from header
1157
1156
1158 This automatically compute the size of the format to read."""
1157 This automatically compute the size of the format to read."""
1159 data = self._fromheader(struct.calcsize(format))
1158 data = self._fromheader(struct.calcsize(format))
1160 return _unpack(format, data)
1159 return _unpack(format, data)
1161
1160
1162 def _initparams(self, mandatoryparams, advisoryparams):
1161 def _initparams(self, mandatoryparams, advisoryparams):
1163 """internal function to setup all logic related parameters"""
1162 """internal function to setup all logic related parameters"""
1164 # make it read only to prevent people touching it by mistake.
1163 # make it read only to prevent people touching it by mistake.
1165 self.mandatoryparams = tuple(mandatoryparams)
1164 self.mandatoryparams = tuple(mandatoryparams)
1166 self.advisoryparams = tuple(advisoryparams)
1165 self.advisoryparams = tuple(advisoryparams)
1167 # user friendly UI
1166 # user friendly UI
1168 self.params = util.sortdict(self.mandatoryparams)
1167 self.params = util.sortdict(self.mandatoryparams)
1169 self.params.update(self.advisoryparams)
1168 self.params.update(self.advisoryparams)
1170 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1169 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1171
1170
1172 def _payloadchunks(self, chunknum=0):
1171 def _payloadchunks(self, chunknum=0):
1173 '''seek to specified chunk and start yielding data'''
1172 '''seek to specified chunk and start yielding data'''
1174 if len(self._chunkindex) == 0:
1173 if len(self._chunkindex) == 0:
1175 assert chunknum == 0, 'Must start with chunk 0'
1174 assert chunknum == 0, 'Must start with chunk 0'
1176 self._chunkindex.append((0, self._tellfp()))
1175 self._chunkindex.append((0, self._tellfp()))
1177 else:
1176 else:
1178 assert chunknum < len(self._chunkindex), \
1177 assert chunknum < len(self._chunkindex), \
1179 'Unknown chunk %d' % chunknum
1178 'Unknown chunk %d' % chunknum
1180 self._seekfp(self._chunkindex[chunknum][1])
1179 self._seekfp(self._chunkindex[chunknum][1])
1181
1180
1182 pos = self._chunkindex[chunknum][0]
1181 pos = self._chunkindex[chunknum][0]
1183 payloadsize = self._unpack(_fpayloadsize)[0]
1182 payloadsize = self._unpack(_fpayloadsize)[0]
1184 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1183 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1185 while payloadsize:
1184 while payloadsize:
1186 if payloadsize == flaginterrupt:
1185 if payloadsize == flaginterrupt:
1187 # interruption detection, the handler will now read a
1186 # interruption detection, the handler will now read a
1188 # single part and process it.
1187 # single part and process it.
1189 interrupthandler(self.ui, self._fp)()
1188 interrupthandler(self.ui, self._fp)()
1190 elif payloadsize < 0:
1189 elif payloadsize < 0:
1191 msg = 'negative payload chunk size: %i' % payloadsize
1190 msg = 'negative payload chunk size: %i' % payloadsize
1192 raise error.BundleValueError(msg)
1191 raise error.BundleValueError(msg)
1193 else:
1192 else:
1194 result = self._readexact(payloadsize)
1193 result = self._readexact(payloadsize)
1195 chunknum += 1
1194 chunknum += 1
1196 pos += payloadsize
1195 pos += payloadsize
1197 if chunknum == len(self._chunkindex):
1196 if chunknum == len(self._chunkindex):
1198 self._chunkindex.append((pos, self._tellfp()))
1197 self._chunkindex.append((pos, self._tellfp()))
1199 yield result
1198 yield result
1200 payloadsize = self._unpack(_fpayloadsize)[0]
1199 payloadsize = self._unpack(_fpayloadsize)[0]
1201 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1200 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1202
1201
1203 def _findchunk(self, pos):
1202 def _findchunk(self, pos):
1204 '''for a given payload position, return a chunk number and offset'''
1203 '''for a given payload position, return a chunk number and offset'''
1205 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1204 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1206 if ppos == pos:
1205 if ppos == pos:
1207 return chunk, 0
1206 return chunk, 0
1208 elif ppos > pos:
1207 elif ppos > pos:
1209 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1208 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1210 raise ValueError('Unknown chunk')
1209 raise ValueError('Unknown chunk')
1211
1210
1212 def _readheader(self):
1211 def _readheader(self):
1213 """read the header and setup the object"""
1212 """read the header and setup the object"""
1214 typesize = self._unpackheader(_fparttypesize)[0]
1213 typesize = self._unpackheader(_fparttypesize)[0]
1215 self.type = self._fromheader(typesize)
1214 self.type = self._fromheader(typesize)
1216 indebug(self.ui, 'part type: "%s"' % self.type)
1215 indebug(self.ui, 'part type: "%s"' % self.type)
1217 self.id = self._unpackheader(_fpartid)[0]
1216 self.id = self._unpackheader(_fpartid)[0]
1218 indebug(self.ui, 'part id: "%s"' % self.id)
1217 indebug(self.ui, 'part id: "%s"' % self.id)
1219 # extract mandatory bit from type
1218 # extract mandatory bit from type
1220 self.mandatory = (self.type != self.type.lower())
1219 self.mandatory = (self.type != self.type.lower())
1221 self.type = self.type.lower()
1220 self.type = self.type.lower()
1222 ## reading parameters
1221 ## reading parameters
1223 # param count
1222 # param count
1224 mancount, advcount = self._unpackheader(_fpartparamcount)
1223 mancount, advcount = self._unpackheader(_fpartparamcount)
1225 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1224 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1226 # param size
1225 # param size
1227 fparamsizes = _makefpartparamsizes(mancount + advcount)
1226 fparamsizes = _makefpartparamsizes(mancount + advcount)
1228 paramsizes = self._unpackheader(fparamsizes)
1227 paramsizes = self._unpackheader(fparamsizes)
1229 # make it a list of couple again
1228 # make it a list of couple again
1230 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1229 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1231 # split mandatory from advisory
1230 # split mandatory from advisory
1232 mansizes = paramsizes[:mancount]
1231 mansizes = paramsizes[:mancount]
1233 advsizes = paramsizes[mancount:]
1232 advsizes = paramsizes[mancount:]
1234 # retrieve param value
1233 # retrieve param value
1235 manparams = []
1234 manparams = []
1236 for key, value in mansizes:
1235 for key, value in mansizes:
1237 manparams.append((self._fromheader(key), self._fromheader(value)))
1236 manparams.append((self._fromheader(key), self._fromheader(value)))
1238 advparams = []
1237 advparams = []
1239 for key, value in advsizes:
1238 for key, value in advsizes:
1240 advparams.append((self._fromheader(key), self._fromheader(value)))
1239 advparams.append((self._fromheader(key), self._fromheader(value)))
1241 self._initparams(manparams, advparams)
1240 self._initparams(manparams, advparams)
1242 ## part payload
1241 ## part payload
1243 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1242 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1244 # we read the data, tell it
1243 # we read the data, tell it
1245 self._initialized = True
1244 self._initialized = True
1246
1245
1247 def read(self, size=None):
1246 def read(self, size=None):
1248 """read payload data"""
1247 """read payload data"""
1249 if not self._initialized:
1248 if not self._initialized:
1250 self._readheader()
1249 self._readheader()
1251 if size is None:
1250 if size is None:
1252 data = self._payloadstream.read()
1251 data = self._payloadstream.read()
1253 else:
1252 else:
1254 data = self._payloadstream.read(size)
1253 data = self._payloadstream.read(size)
1255 self._pos += len(data)
1254 self._pos += len(data)
1256 if size is None or len(data) < size:
1255 if size is None or len(data) < size:
1257 if not self.consumed and self._pos:
1256 if not self.consumed and self._pos:
1258 self.ui.debug('bundle2-input-part: total payload size %i\n'
1257 self.ui.debug('bundle2-input-part: total payload size %i\n'
1259 % self._pos)
1258 % self._pos)
1260 self.consumed = True
1259 self.consumed = True
1261 return data
1260 return data
1262
1261
1263 def tell(self):
1262 def tell(self):
1264 return self._pos
1263 return self._pos
1265
1264
1266 def seek(self, offset, whence=0):
1265 def seek(self, offset, whence=0):
1267 if whence == 0:
1266 if whence == 0:
1268 newpos = offset
1267 newpos = offset
1269 elif whence == 1:
1268 elif whence == 1:
1270 newpos = self._pos + offset
1269 newpos = self._pos + offset
1271 elif whence == 2:
1270 elif whence == 2:
1272 if not self.consumed:
1271 if not self.consumed:
1273 self.read()
1272 self.read()
1274 newpos = self._chunkindex[-1][0] - offset
1273 newpos = self._chunkindex[-1][0] - offset
1275 else:
1274 else:
1276 raise ValueError('Unknown whence value: %r' % (whence,))
1275 raise ValueError('Unknown whence value: %r' % (whence,))
1277
1276
1278 if newpos > self._chunkindex[-1][0] and not self.consumed:
1277 if newpos > self._chunkindex[-1][0] and not self.consumed:
1279 self.read()
1278 self.read()
1280 if not 0 <= newpos <= self._chunkindex[-1][0]:
1279 if not 0 <= newpos <= self._chunkindex[-1][0]:
1281 raise ValueError('Offset out of range')
1280 raise ValueError('Offset out of range')
1282
1281
1283 if self._pos != newpos:
1282 if self._pos != newpos:
1284 chunk, internaloffset = self._findchunk(newpos)
1283 chunk, internaloffset = self._findchunk(newpos)
1285 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1284 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1286 adjust = self.read(internaloffset)
1285 adjust = self.read(internaloffset)
1287 if len(adjust) != internaloffset:
1286 if len(adjust) != internaloffset:
1288 raise error.Abort(_('Seek failed\n'))
1287 raise error.Abort(_('Seek failed\n'))
1289 self._pos = newpos
1288 self._pos = newpos
1290
1289
1291 def _seekfp(self, offset, whence=0):
1290 def _seekfp(self, offset, whence=0):
1292 """move the underlying file pointer
1291 """move the underlying file pointer
1293
1292
1294 This method is meant for internal usage by the bundle2 protocol only.
1293 This method is meant for internal usage by the bundle2 protocol only.
1295 They directly manipulate the low level stream including bundle2 level
1294 They directly manipulate the low level stream including bundle2 level
1296 instruction.
1295 instruction.
1297
1296
1298 Do not use it to implement higher-level logic or methods."""
1297 Do not use it to implement higher-level logic or methods."""
1299 if self._seekable:
1298 if self._seekable:
1300 return self._fp.seek(offset, whence)
1299 return self._fp.seek(offset, whence)
1301 else:
1300 else:
1302 raise NotImplementedError(_('File pointer is not seekable'))
1301 raise NotImplementedError(_('File pointer is not seekable'))
1303
1302
1304 def _tellfp(self):
1303 def _tellfp(self):
1305 """return the file offset, or None if file is not seekable
1304 """return the file offset, or None if file is not seekable
1306
1305
1307 This method is meant for internal usage by the bundle2 protocol only.
1306 This method is meant for internal usage by the bundle2 protocol only.
1308 They directly manipulate the low level stream including bundle2 level
1307 They directly manipulate the low level stream including bundle2 level
1309 instruction.
1308 instruction.
1310
1309
1311 Do not use it to implement higher-level logic or methods."""
1310 Do not use it to implement higher-level logic or methods."""
1312 if self._seekable:
1311 if self._seekable:
1313 try:
1312 try:
1314 return self._fp.tell()
1313 return self._fp.tell()
1315 except IOError as e:
1314 except IOError as e:
1316 if e.errno == errno.ESPIPE:
1315 if e.errno == errno.ESPIPE:
1317 self._seekable = False
1316 self._seekable = False
1318 else:
1317 else:
1319 raise
1318 raise
1320 return None
1319 return None
1321
1320
1322 # These are only the static capabilities.
1321 # These are only the static capabilities.
1323 # Check the 'getrepocaps' function for the rest.
1322 # Check the 'getrepocaps' function for the rest.
1324 capabilities = {'HG20': (),
1323 capabilities = {'HG20': (),
1325 'error': ('abort', 'unsupportedcontent', 'pushraced',
1324 'error': ('abort', 'unsupportedcontent', 'pushraced',
1326 'pushkey'),
1325 'pushkey'),
1327 'listkeys': (),
1326 'listkeys': (),
1328 'pushkey': (),
1327 'pushkey': (),
1329 'digests': tuple(sorted(util.DIGESTS.keys())),
1328 'digests': tuple(sorted(util.DIGESTS.keys())),
1330 'remote-changegroup': ('http', 'https'),
1329 'remote-changegroup': ('http', 'https'),
1331 'hgtagsfnodes': (),
1330 'hgtagsfnodes': (),
1332 }
1331 }
1333
1332
1334 def getrepocaps(repo, allowpushback=False):
1333 def getrepocaps(repo, allowpushback=False):
1335 """return the bundle2 capabilities for a given repo
1334 """return the bundle2 capabilities for a given repo
1336
1335
1337 Exists to allow extensions (like evolution) to mutate the capabilities.
1336 Exists to allow extensions (like evolution) to mutate the capabilities.
1338 """
1337 """
1339 caps = capabilities.copy()
1338 caps = capabilities.copy()
1340 caps['changegroup'] = tuple(sorted(
1339 caps['changegroup'] = tuple(sorted(
1341 changegroup.supportedincomingversions(repo)))
1340 changegroup.supportedincomingversions(repo)))
1342 if obsolete.isenabled(repo, obsolete.exchangeopt):
1341 if obsolete.isenabled(repo, obsolete.exchangeopt):
1343 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1342 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1344 caps['obsmarkers'] = supportedformat
1343 caps['obsmarkers'] = supportedformat
1345 if allowpushback:
1344 if allowpushback:
1346 caps['pushback'] = ()
1345 caps['pushback'] = ()
1347 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1346 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1348 if cpmode == 'check-related':
1347 if cpmode == 'check-related':
1349 caps['checkheads'] = ('related',)
1348 caps['checkheads'] = ('related',)
1350 return caps
1349 return caps
1351
1350
1352 def bundle2caps(remote):
1351 def bundle2caps(remote):
1353 """return the bundle capabilities of a peer as dict"""
1352 """return the bundle capabilities of a peer as dict"""
1354 raw = remote.capable('bundle2')
1353 raw = remote.capable('bundle2')
1355 if not raw and raw != '':
1354 if not raw and raw != '':
1356 return {}
1355 return {}
1357 capsblob = urlreq.unquote(remote.capable('bundle2'))
1356 capsblob = urlreq.unquote(remote.capable('bundle2'))
1358 return decodecaps(capsblob)
1357 return decodecaps(capsblob)
1359
1358
1360 def obsmarkersversion(caps):
1359 def obsmarkersversion(caps):
1361 """extract the list of supported obsmarkers versions from a bundle2caps dict
1360 """extract the list of supported obsmarkers versions from a bundle2caps dict
1362 """
1361 """
1363 obscaps = caps.get('obsmarkers', ())
1362 obscaps = caps.get('obsmarkers', ())
1364 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1363 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1365
1364
1366 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1365 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1367 vfs=None, compression=None, compopts=None):
1366 vfs=None, compression=None, compopts=None):
1368 if bundletype.startswith('HG10'):
1367 if bundletype.startswith('HG10'):
1369 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1368 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1370 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1369 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1371 compression=compression, compopts=compopts)
1370 compression=compression, compopts=compopts)
1372 elif not bundletype.startswith('HG20'):
1371 elif not bundletype.startswith('HG20'):
1373 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1372 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1374
1373
1375 caps = {}
1374 caps = {}
1376 if 'obsolescence' in opts:
1375 if 'obsolescence' in opts:
1377 caps['obsmarkers'] = ('V1',)
1376 caps['obsmarkers'] = ('V1',)
1378 bundle = bundle20(ui, caps)
1377 bundle = bundle20(ui, caps)
1379 bundle.setcompression(compression, compopts)
1378 bundle.setcompression(compression, compopts)
1380 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1379 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1381 chunkiter = bundle.getchunks()
1380 chunkiter = bundle.getchunks()
1382
1381
1383 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1382 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1384
1383
1385 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1384 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1386 # We should eventually reconcile this logic with the one behind
1385 # We should eventually reconcile this logic with the one behind
1387 # 'exchange.getbundle2partsgenerator'.
1386 # 'exchange.getbundle2partsgenerator'.
1388 #
1387 #
1389 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1388 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1390 # different right now. So we keep them separated for now for the sake of
1389 # different right now. So we keep them separated for now for the sake of
1391 # simplicity.
1390 # simplicity.
1392
1391
1393 # we always want a changegroup in such bundle
1392 # we always want a changegroup in such bundle
1394 cgversion = opts.get('cg.version')
1393 cgversion = opts.get('cg.version')
1395 if cgversion is None:
1394 if cgversion is None:
1396 cgversion = changegroup.safeversion(repo)
1395 cgversion = changegroup.safeversion(repo)
1397 cg = changegroup.getchangegroup(repo, source, outgoing,
1396 cg = changegroup.getchangegroup(repo, source, outgoing,
1398 version=cgversion)
1397 version=cgversion)
1399 part = bundler.newpart('changegroup', data=cg.getchunks())
1398 part = bundler.newpart('changegroup', data=cg.getchunks())
1400 part.addparam('version', cg.version)
1399 part.addparam('version', cg.version)
1401 if 'clcount' in cg.extras:
1400 if 'clcount' in cg.extras:
1402 part.addparam('nbchanges', str(cg.extras['clcount']),
1401 part.addparam('nbchanges', str(cg.extras['clcount']),
1403 mandatory=False)
1402 mandatory=False)
1404 if opts.get('phases') and repo.revs('%ln and secret()',
1403 if opts.get('phases') and repo.revs('%ln and secret()',
1405 outgoing.missingheads):
1404 outgoing.missingheads):
1406 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1405 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1407
1406
1408 addparttagsfnodescache(repo, bundler, outgoing)
1407 addparttagsfnodescache(repo, bundler, outgoing)
1409
1408
1410 if opts.get('obsolescence', False):
1409 if opts.get('obsolescence', False):
1411 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1410 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1412 buildobsmarkerspart(bundler, obsmarkers)
1411 buildobsmarkerspart(bundler, obsmarkers)
1413
1412
1414 if opts.get('phases', False):
1413 if opts.get('phases', False):
1415 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1414 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1416 phasedata = []
1415 phasedata = []
1417 for phase in phases.allphases:
1416 for phase in phases.allphases:
1418 for head in headsbyphase[phase]:
1417 for head in headsbyphase[phase]:
1419 phasedata.append(_pack(_fphasesentry, phase, head))
1418 phasedata.append(_pack(_fphasesentry, phase, head))
1420 bundler.newpart('phase-heads', data=''.join(phasedata))
1419 bundler.newpart('phase-heads', data=''.join(phasedata))
1421
1420
1422 def addparttagsfnodescache(repo, bundler, outgoing):
1421 def addparttagsfnodescache(repo, bundler, outgoing):
1423 # we include the tags fnode cache for the bundle changeset
1422 # we include the tags fnode cache for the bundle changeset
1424 # (as an optional parts)
1423 # (as an optional parts)
1425 cache = tags.hgtagsfnodescache(repo.unfiltered())
1424 cache = tags.hgtagsfnodescache(repo.unfiltered())
1426 chunks = []
1425 chunks = []
1427
1426
1428 # .hgtags fnodes are only relevant for head changesets. While we could
1427 # .hgtags fnodes are only relevant for head changesets. While we could
1429 # transfer values for all known nodes, there will likely be little to
1428 # transfer values for all known nodes, there will likely be little to
1430 # no benefit.
1429 # no benefit.
1431 #
1430 #
1432 # We don't bother using a generator to produce output data because
1431 # We don't bother using a generator to produce output data because
1433 # a) we only have 40 bytes per head and even esoteric numbers of heads
1432 # a) we only have 40 bytes per head and even esoteric numbers of heads
1434 # consume little memory (1M heads is 40MB) b) we don't want to send the
1433 # consume little memory (1M heads is 40MB) b) we don't want to send the
1435 # part if we don't have entries and knowing if we have entries requires
1434 # part if we don't have entries and knowing if we have entries requires
1436 # cache lookups.
1435 # cache lookups.
1437 for node in outgoing.missingheads:
1436 for node in outgoing.missingheads:
1438 # Don't compute missing, as this may slow down serving.
1437 # Don't compute missing, as this may slow down serving.
1439 fnode = cache.getfnode(node, computemissing=False)
1438 fnode = cache.getfnode(node, computemissing=False)
1440 if fnode is not None:
1439 if fnode is not None:
1441 chunks.extend([node, fnode])
1440 chunks.extend([node, fnode])
1442
1441
1443 if chunks:
1442 if chunks:
1444 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1443 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1445
1444
1446 def buildobsmarkerspart(bundler, markers):
1445 def buildobsmarkerspart(bundler, markers):
1447 """add an obsmarker part to the bundler with <markers>
1446 """add an obsmarker part to the bundler with <markers>
1448
1447
1449 No part is created if markers is empty.
1448 No part is created if markers is empty.
1450 Raises ValueError if the bundler doesn't support any known obsmarker format.
1449 Raises ValueError if the bundler doesn't support any known obsmarker format.
1451 """
1450 """
1452 if not markers:
1451 if not markers:
1453 return None
1452 return None
1454
1453
1455 remoteversions = obsmarkersversion(bundler.capabilities)
1454 remoteversions = obsmarkersversion(bundler.capabilities)
1456 version = obsolete.commonversion(remoteversions)
1455 version = obsolete.commonversion(remoteversions)
1457 if version is None:
1456 if version is None:
1458 raise ValueError('bundler does not support common obsmarker format')
1457 raise ValueError('bundler does not support common obsmarker format')
1459 stream = obsolete.encodemarkers(markers, True, version=version)
1458 stream = obsolete.encodemarkers(markers, True, version=version)
1460 return bundler.newpart('obsmarkers', data=stream)
1459 return bundler.newpart('obsmarkers', data=stream)
1461
1460
1462 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1461 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1463 compopts=None):
1462 compopts=None):
1464 """Write a bundle file and return its filename.
1463 """Write a bundle file and return its filename.
1465
1464
1466 Existing files will not be overwritten.
1465 Existing files will not be overwritten.
1467 If no filename is specified, a temporary file is created.
1466 If no filename is specified, a temporary file is created.
1468 bz2 compression can be turned off.
1467 bz2 compression can be turned off.
1469 The bundle file will be deleted in case of errors.
1468 The bundle file will be deleted in case of errors.
1470 """
1469 """
1471
1470
1472 if bundletype == "HG20":
1471 if bundletype == "HG20":
1473 bundle = bundle20(ui)
1472 bundle = bundle20(ui)
1474 bundle.setcompression(compression, compopts)
1473 bundle.setcompression(compression, compopts)
1475 part = bundle.newpart('changegroup', data=cg.getchunks())
1474 part = bundle.newpart('changegroup', data=cg.getchunks())
1476 part.addparam('version', cg.version)
1475 part.addparam('version', cg.version)
1477 if 'clcount' in cg.extras:
1476 if 'clcount' in cg.extras:
1478 part.addparam('nbchanges', str(cg.extras['clcount']),
1477 part.addparam('nbchanges', str(cg.extras['clcount']),
1479 mandatory=False)
1478 mandatory=False)
1480 chunkiter = bundle.getchunks()
1479 chunkiter = bundle.getchunks()
1481 else:
1480 else:
1482 # compression argument is only for the bundle2 case
1481 # compression argument is only for the bundle2 case
1483 assert compression is None
1482 assert compression is None
1484 if cg.version != '01':
1483 if cg.version != '01':
1485 raise error.Abort(_('old bundle types only supports v1 '
1484 raise error.Abort(_('old bundle types only supports v1 '
1486 'changegroups'))
1485 'changegroups'))
1487 header, comp = bundletypes[bundletype]
1486 header, comp = bundletypes[bundletype]
1488 if comp not in util.compengines.supportedbundletypes:
1487 if comp not in util.compengines.supportedbundletypes:
1489 raise error.Abort(_('unknown stream compression type: %s')
1488 raise error.Abort(_('unknown stream compression type: %s')
1490 % comp)
1489 % comp)
1491 compengine = util.compengines.forbundletype(comp)
1490 compengine = util.compengines.forbundletype(comp)
1492 def chunkiter():
1491 def chunkiter():
1493 yield header
1492 yield header
1494 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1493 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1495 yield chunk
1494 yield chunk
1496 chunkiter = chunkiter()
1495 chunkiter = chunkiter()
1497
1496
1498 # parse the changegroup data, otherwise we will block
1497 # parse the changegroup data, otherwise we will block
1499 # in case of sshrepo because we don't know the end of the stream
1498 # in case of sshrepo because we don't know the end of the stream
1500 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1499 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1501
1500
1502 def combinechangegroupresults(op):
1501 def combinechangegroupresults(op):
1503 """logic to combine 0 or more addchangegroup results into one"""
1502 """logic to combine 0 or more addchangegroup results into one"""
1504 results = [r.get('return', 0)
1503 results = [r.get('return', 0)
1505 for r in op.records['changegroup']]
1504 for r in op.records['changegroup']]
1506 changedheads = 0
1505 changedheads = 0
1507 result = 1
1506 result = 1
1508 for ret in results:
1507 for ret in results:
1509 # If any changegroup result is 0, return 0
1508 # If any changegroup result is 0, return 0
1510 if ret == 0:
1509 if ret == 0:
1511 result = 0
1510 result = 0
1512 break
1511 break
1513 if ret < -1:
1512 if ret < -1:
1514 changedheads += ret + 1
1513 changedheads += ret + 1
1515 elif ret > 1:
1514 elif ret > 1:
1516 changedheads += ret - 1
1515 changedheads += ret - 1
1517 if changedheads > 0:
1516 if changedheads > 0:
1518 result = 1 + changedheads
1517 result = 1 + changedheads
1519 elif changedheads < 0:
1518 elif changedheads < 0:
1520 result = -1 + changedheads
1519 result = -1 + changedheads
1521 return result
1520 return result
1522
1521
1523 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1522 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1524 'targetphase'))
1523 'targetphase'))
1525 def handlechangegroup(op, inpart):
1524 def handlechangegroup(op, inpart):
1526 """apply a changegroup part on the repo
1525 """apply a changegroup part on the repo
1527
1526
1528 This is a very early implementation that will massive rework before being
1527 This is a very early implementation that will massive rework before being
1529 inflicted to any end-user.
1528 inflicted to any end-user.
1530 """
1529 """
1531 tr = op.gettransaction()
1530 tr = op.gettransaction()
1532 unpackerversion = inpart.params.get('version', '01')
1531 unpackerversion = inpart.params.get('version', '01')
1533 # We should raise an appropriate exception here
1532 # We should raise an appropriate exception here
1534 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1533 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1535 # the source and url passed here are overwritten by the one contained in
1534 # the source and url passed here are overwritten by the one contained in
1536 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1535 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1537 nbchangesets = None
1536 nbchangesets = None
1538 if 'nbchanges' in inpart.params:
1537 if 'nbchanges' in inpart.params:
1539 nbchangesets = int(inpart.params.get('nbchanges'))
1538 nbchangesets = int(inpart.params.get('nbchanges'))
1540 if ('treemanifest' in inpart.params and
1539 if ('treemanifest' in inpart.params and
1541 'treemanifest' not in op.repo.requirements):
1540 'treemanifest' not in op.repo.requirements):
1542 if len(op.repo.changelog) != 0:
1541 if len(op.repo.changelog) != 0:
1543 raise error.Abort(_(
1542 raise error.Abort(_(
1544 "bundle contains tree manifests, but local repo is "
1543 "bundle contains tree manifests, but local repo is "
1545 "non-empty and does not use tree manifests"))
1544 "non-empty and does not use tree manifests"))
1546 op.repo.requirements.add('treemanifest')
1545 op.repo.requirements.add('treemanifest')
1547 op.repo._applyopenerreqs()
1546 op.repo._applyopenerreqs()
1548 op.repo._writerequirements()
1547 op.repo._writerequirements()
1549 extrakwargs = {}
1548 extrakwargs = {}
1550 targetphase = inpart.params.get('targetphase')
1549 targetphase = inpart.params.get('targetphase')
1551 if targetphase is not None:
1550 if targetphase is not None:
1552 extrakwargs['targetphase'] = int(targetphase)
1551 extrakwargs['targetphase'] = int(targetphase)
1553 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1552 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1554 expectedtotal=nbchangesets, **extrakwargs)
1553 expectedtotal=nbchangesets, **extrakwargs)
1555 if op.reply is not None:
1554 if op.reply is not None:
1556 # This is definitely not the final form of this
1555 # This is definitely not the final form of this
1557 # return. But one need to start somewhere.
1556 # return. But one need to start somewhere.
1558 part = op.reply.newpart('reply:changegroup', mandatory=False)
1557 part = op.reply.newpart('reply:changegroup', mandatory=False)
1559 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1558 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1560 part.addparam('return', '%i' % ret, mandatory=False)
1559 part.addparam('return', '%i' % ret, mandatory=False)
1561 assert not inpart.read()
1560 assert not inpart.read()
1562
1561
1563 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1562 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1564 ['digest:%s' % k for k in util.DIGESTS.keys()])
1563 ['digest:%s' % k for k in util.DIGESTS.keys()])
1565 @parthandler('remote-changegroup', _remotechangegroupparams)
1564 @parthandler('remote-changegroup', _remotechangegroupparams)
1566 def handleremotechangegroup(op, inpart):
1565 def handleremotechangegroup(op, inpart):
1567 """apply a bundle10 on the repo, given an url and validation information
1566 """apply a bundle10 on the repo, given an url and validation information
1568
1567
1569 All the information about the remote bundle to import are given as
1568 All the information about the remote bundle to import are given as
1570 parameters. The parameters include:
1569 parameters. The parameters include:
1571 - url: the url to the bundle10.
1570 - url: the url to the bundle10.
1572 - size: the bundle10 file size. It is used to validate what was
1571 - size: the bundle10 file size. It is used to validate what was
1573 retrieved by the client matches the server knowledge about the bundle.
1572 retrieved by the client matches the server knowledge about the bundle.
1574 - digests: a space separated list of the digest types provided as
1573 - digests: a space separated list of the digest types provided as
1575 parameters.
1574 parameters.
1576 - digest:<digest-type>: the hexadecimal representation of the digest with
1575 - digest:<digest-type>: the hexadecimal representation of the digest with
1577 that name. Like the size, it is used to validate what was retrieved by
1576 that name. Like the size, it is used to validate what was retrieved by
1578 the client matches what the server knows about the bundle.
1577 the client matches what the server knows about the bundle.
1579
1578
1580 When multiple digest types are given, all of them are checked.
1579 When multiple digest types are given, all of them are checked.
1581 """
1580 """
1582 try:
1581 try:
1583 raw_url = inpart.params['url']
1582 raw_url = inpart.params['url']
1584 except KeyError:
1583 except KeyError:
1585 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1584 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1586 parsed_url = util.url(raw_url)
1585 parsed_url = util.url(raw_url)
1587 if parsed_url.scheme not in capabilities['remote-changegroup']:
1586 if parsed_url.scheme not in capabilities['remote-changegroup']:
1588 raise error.Abort(_('remote-changegroup does not support %s urls') %
1587 raise error.Abort(_('remote-changegroup does not support %s urls') %
1589 parsed_url.scheme)
1588 parsed_url.scheme)
1590
1589
1591 try:
1590 try:
1592 size = int(inpart.params['size'])
1591 size = int(inpart.params['size'])
1593 except ValueError:
1592 except ValueError:
1594 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1593 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1595 % 'size')
1594 % 'size')
1596 except KeyError:
1595 except KeyError:
1597 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1596 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1598
1597
1599 digests = {}
1598 digests = {}
1600 for typ in inpart.params.get('digests', '').split():
1599 for typ in inpart.params.get('digests', '').split():
1601 param = 'digest:%s' % typ
1600 param = 'digest:%s' % typ
1602 try:
1601 try:
1603 value = inpart.params[param]
1602 value = inpart.params[param]
1604 except KeyError:
1603 except KeyError:
1605 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1604 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1606 param)
1605 param)
1607 digests[typ] = value
1606 digests[typ] = value
1608
1607
1609 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1608 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1610
1609
1611 tr = op.gettransaction()
1610 tr = op.gettransaction()
1612 from . import exchange
1611 from . import exchange
1613 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1612 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1614 if not isinstance(cg, changegroup.cg1unpacker):
1613 if not isinstance(cg, changegroup.cg1unpacker):
1615 raise error.Abort(_('%s: not a bundle version 1.0') %
1614 raise error.Abort(_('%s: not a bundle version 1.0') %
1616 util.hidepassword(raw_url))
1615 util.hidepassword(raw_url))
1617 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1616 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1618 if op.reply is not None:
1617 if op.reply is not None:
1619 # This is definitely not the final form of this
1618 # This is definitely not the final form of this
1620 # return. But one need to start somewhere.
1619 # return. But one need to start somewhere.
1621 part = op.reply.newpart('reply:changegroup')
1620 part = op.reply.newpart('reply:changegroup')
1622 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1621 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1623 part.addparam('return', '%i' % ret, mandatory=False)
1622 part.addparam('return', '%i' % ret, mandatory=False)
1624 try:
1623 try:
1625 real_part.validate()
1624 real_part.validate()
1626 except error.Abort as e:
1625 except error.Abort as e:
1627 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1626 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1628 (util.hidepassword(raw_url), str(e)))
1627 (util.hidepassword(raw_url), str(e)))
1629 assert not inpart.read()
1628 assert not inpart.read()
1630
1629
1631 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1630 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1632 def handlereplychangegroup(op, inpart):
1631 def handlereplychangegroup(op, inpart):
1633 ret = int(inpart.params['return'])
1632 ret = int(inpart.params['return'])
1634 replyto = int(inpart.params['in-reply-to'])
1633 replyto = int(inpart.params['in-reply-to'])
1635 op.records.add('changegroup', {'return': ret}, replyto)
1634 op.records.add('changegroup', {'return': ret}, replyto)
1636
1635
1637 @parthandler('check:heads')
1636 @parthandler('check:heads')
1638 def handlecheckheads(op, inpart):
1637 def handlecheckheads(op, inpart):
1639 """check that head of the repo did not change
1638 """check that head of the repo did not change
1640
1639
1641 This is used to detect a push race when using unbundle.
1640 This is used to detect a push race when using unbundle.
1642 This replaces the "heads" argument of unbundle."""
1641 This replaces the "heads" argument of unbundle."""
1643 h = inpart.read(20)
1642 h = inpart.read(20)
1644 heads = []
1643 heads = []
1645 while len(h) == 20:
1644 while len(h) == 20:
1646 heads.append(h)
1645 heads.append(h)
1647 h = inpart.read(20)
1646 h = inpart.read(20)
1648 assert not h
1647 assert not h
1649 # Trigger a transaction so that we are guaranteed to have the lock now.
1648 # Trigger a transaction so that we are guaranteed to have the lock now.
1650 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1649 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1651 op.gettransaction()
1650 op.gettransaction()
1652 if sorted(heads) != sorted(op.repo.heads()):
1651 if sorted(heads) != sorted(op.repo.heads()):
1653 raise error.PushRaced('repository changed while pushing - '
1652 raise error.PushRaced('repository changed while pushing - '
1654 'please try again')
1653 'please try again')
1655
1654
1656 @parthandler('check:updated-heads')
1655 @parthandler('check:updated-heads')
1657 def handlecheckupdatedheads(op, inpart):
1656 def handlecheckupdatedheads(op, inpart):
1658 """check for race on the heads touched by a push
1657 """check for race on the heads touched by a push
1659
1658
1660 This is similar to 'check:heads' but focus on the heads actually updated
1659 This is similar to 'check:heads' but focus on the heads actually updated
1661 during the push. If other activities happen on unrelated heads, it is
1660 during the push. If other activities happen on unrelated heads, it is
1662 ignored.
1661 ignored.
1663
1662
1664 This allow server with high traffic to avoid push contention as long as
1663 This allow server with high traffic to avoid push contention as long as
1665 unrelated parts of the graph are involved."""
1664 unrelated parts of the graph are involved."""
1666 h = inpart.read(20)
1665 h = inpart.read(20)
1667 heads = []
1666 heads = []
1668 while len(h) == 20:
1667 while len(h) == 20:
1669 heads.append(h)
1668 heads.append(h)
1670 h = inpart.read(20)
1669 h = inpart.read(20)
1671 assert not h
1670 assert not h
1672 # trigger a transaction so that we are guaranteed to have the lock now.
1671 # trigger a transaction so that we are guaranteed to have the lock now.
1673 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1672 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1674 op.gettransaction()
1673 op.gettransaction()
1675
1674
1676 currentheads = set()
1675 currentheads = set()
1677 for ls in op.repo.branchmap().itervalues():
1676 for ls in op.repo.branchmap().itervalues():
1678 currentheads.update(ls)
1677 currentheads.update(ls)
1679
1678
1680 for h in heads:
1679 for h in heads:
1681 if h not in currentheads:
1680 if h not in currentheads:
1682 raise error.PushRaced('repository changed while pushing - '
1681 raise error.PushRaced('repository changed while pushing - '
1683 'please try again')
1682 'please try again')
1684
1683
1685 @parthandler('output')
1684 @parthandler('output')
1686 def handleoutput(op, inpart):
1685 def handleoutput(op, inpart):
1687 """forward output captured on the server to the client"""
1686 """forward output captured on the server to the client"""
1688 for line in inpart.read().splitlines():
1687 for line in inpart.read().splitlines():
1689 op.ui.status(_('remote: %s\n') % line)
1688 op.ui.status(_('remote: %s\n') % line)
1690
1689
1691 @parthandler('replycaps')
1690 @parthandler('replycaps')
1692 def handlereplycaps(op, inpart):
1691 def handlereplycaps(op, inpart):
1693 """Notify that a reply bundle should be created
1692 """Notify that a reply bundle should be created
1694
1693
1695 The payload contains the capabilities information for the reply"""
1694 The payload contains the capabilities information for the reply"""
1696 caps = decodecaps(inpart.read())
1695 caps = decodecaps(inpart.read())
1697 if op.reply is None:
1696 if op.reply is None:
1698 op.reply = bundle20(op.ui, caps)
1697 op.reply = bundle20(op.ui, caps)
1699
1698
1700 class AbortFromPart(error.Abort):
1699 class AbortFromPart(error.Abort):
1701 """Sub-class of Abort that denotes an error from a bundle2 part."""
1700 """Sub-class of Abort that denotes an error from a bundle2 part."""
1702
1701
1703 @parthandler('error:abort', ('message', 'hint'))
1702 @parthandler('error:abort', ('message', 'hint'))
1704 def handleerrorabort(op, inpart):
1703 def handleerrorabort(op, inpart):
1705 """Used to transmit abort error over the wire"""
1704 """Used to transmit abort error over the wire"""
1706 raise AbortFromPart(inpart.params['message'],
1705 raise AbortFromPart(inpart.params['message'],
1707 hint=inpart.params.get('hint'))
1706 hint=inpart.params.get('hint'))
1708
1707
1709 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1708 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1710 'in-reply-to'))
1709 'in-reply-to'))
1711 def handleerrorpushkey(op, inpart):
1710 def handleerrorpushkey(op, inpart):
1712 """Used to transmit failure of a mandatory pushkey over the wire"""
1711 """Used to transmit failure of a mandatory pushkey over the wire"""
1713 kwargs = {}
1712 kwargs = {}
1714 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1713 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1715 value = inpart.params.get(name)
1714 value = inpart.params.get(name)
1716 if value is not None:
1715 if value is not None:
1717 kwargs[name] = value
1716 kwargs[name] = value
1718 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1717 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1719
1718
1720 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1719 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1721 def handleerrorunsupportedcontent(op, inpart):
1720 def handleerrorunsupportedcontent(op, inpart):
1722 """Used to transmit unknown content error over the wire"""
1721 """Used to transmit unknown content error over the wire"""
1723 kwargs = {}
1722 kwargs = {}
1724 parttype = inpart.params.get('parttype')
1723 parttype = inpart.params.get('parttype')
1725 if parttype is not None:
1724 if parttype is not None:
1726 kwargs['parttype'] = parttype
1725 kwargs['parttype'] = parttype
1727 params = inpart.params.get('params')
1726 params = inpart.params.get('params')
1728 if params is not None:
1727 if params is not None:
1729 kwargs['params'] = params.split('\0')
1728 kwargs['params'] = params.split('\0')
1730
1729
1731 raise error.BundleUnknownFeatureError(**kwargs)
1730 raise error.BundleUnknownFeatureError(**kwargs)
1732
1731
1733 @parthandler('error:pushraced', ('message',))
1732 @parthandler('error:pushraced', ('message',))
1734 def handleerrorpushraced(op, inpart):
1733 def handleerrorpushraced(op, inpart):
1735 """Used to transmit push race error over the wire"""
1734 """Used to transmit push race error over the wire"""
1736 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1735 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1737
1736
1738 @parthandler('listkeys', ('namespace',))
1737 @parthandler('listkeys', ('namespace',))
1739 def handlelistkeys(op, inpart):
1738 def handlelistkeys(op, inpart):
1740 """retrieve pushkey namespace content stored in a bundle2"""
1739 """retrieve pushkey namespace content stored in a bundle2"""
1741 namespace = inpart.params['namespace']
1740 namespace = inpart.params['namespace']
1742 r = pushkey.decodekeys(inpart.read())
1741 r = pushkey.decodekeys(inpart.read())
1743 op.records.add('listkeys', (namespace, r))
1742 op.records.add('listkeys', (namespace, r))
1744
1743
1745 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1744 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1746 def handlepushkey(op, inpart):
1745 def handlepushkey(op, inpart):
1747 """process a pushkey request"""
1746 """process a pushkey request"""
1748 dec = pushkey.decode
1747 dec = pushkey.decode
1749 namespace = dec(inpart.params['namespace'])
1748 namespace = dec(inpart.params['namespace'])
1750 key = dec(inpart.params['key'])
1749 key = dec(inpart.params['key'])
1751 old = dec(inpart.params['old'])
1750 old = dec(inpart.params['old'])
1752 new = dec(inpart.params['new'])
1751 new = dec(inpart.params['new'])
1753 # Grab the transaction to ensure that we have the lock before performing the
1752 # Grab the transaction to ensure that we have the lock before performing the
1754 # pushkey.
1753 # pushkey.
1755 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1754 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1756 op.gettransaction()
1755 op.gettransaction()
1757 ret = op.repo.pushkey(namespace, key, old, new)
1756 ret = op.repo.pushkey(namespace, key, old, new)
1758 record = {'namespace': namespace,
1757 record = {'namespace': namespace,
1759 'key': key,
1758 'key': key,
1760 'old': old,
1759 'old': old,
1761 'new': new}
1760 'new': new}
1762 op.records.add('pushkey', record)
1761 op.records.add('pushkey', record)
1763 if op.reply is not None:
1762 if op.reply is not None:
1764 rpart = op.reply.newpart('reply:pushkey')
1763 rpart = op.reply.newpart('reply:pushkey')
1765 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1764 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1766 rpart.addparam('return', '%i' % ret, mandatory=False)
1765 rpart.addparam('return', '%i' % ret, mandatory=False)
1767 if inpart.mandatory and not ret:
1766 if inpart.mandatory and not ret:
1768 kwargs = {}
1767 kwargs = {}
1769 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1768 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1770 if key in inpart.params:
1769 if key in inpart.params:
1771 kwargs[key] = inpart.params[key]
1770 kwargs[key] = inpart.params[key]
1772 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1771 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1773
1772
1774 def _readphaseheads(inpart):
1773 def _readphaseheads(inpart):
1775 headsbyphase = [[] for i in phases.allphases]
1774 headsbyphase = [[] for i in phases.allphases]
1776 entrysize = struct.calcsize(_fphasesentry)
1775 entrysize = struct.calcsize(_fphasesentry)
1777 while True:
1776 while True:
1778 entry = inpart.read(entrysize)
1777 entry = inpart.read(entrysize)
1779 if len(entry) < entrysize:
1778 if len(entry) < entrysize:
1780 if entry:
1779 if entry:
1781 raise error.Abort(_('bad phase-heads bundle part'))
1780 raise error.Abort(_('bad phase-heads bundle part'))
1782 break
1781 break
1783 phase, node = struct.unpack(_fphasesentry, entry)
1782 phase, node = struct.unpack(_fphasesentry, entry)
1784 headsbyphase[phase].append(node)
1783 headsbyphase[phase].append(node)
1785 return headsbyphase
1784 return headsbyphase
1786
1785
1787 @parthandler('phase-heads')
1786 @parthandler('phase-heads')
1788 def handlephases(op, inpart):
1787 def handlephases(op, inpart):
1789 """apply phases from bundle part to repo"""
1788 """apply phases from bundle part to repo"""
1790 headsbyphase = _readphaseheads(inpart)
1789 headsbyphase = _readphaseheads(inpart)
1791 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1790 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1792
1791
1793 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1792 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1794 def handlepushkeyreply(op, inpart):
1793 def handlepushkeyreply(op, inpart):
1795 """retrieve the result of a pushkey request"""
1794 """retrieve the result of a pushkey request"""
1796 ret = int(inpart.params['return'])
1795 ret = int(inpart.params['return'])
1797 partid = int(inpart.params['in-reply-to'])
1796 partid = int(inpart.params['in-reply-to'])
1798 op.records.add('pushkey', {'return': ret}, partid)
1797 op.records.add('pushkey', {'return': ret}, partid)
1799
1798
1800 @parthandler('obsmarkers')
1799 @parthandler('obsmarkers')
1801 def handleobsmarker(op, inpart):
1800 def handleobsmarker(op, inpart):
1802 """add a stream of obsmarkers to the repo"""
1801 """add a stream of obsmarkers to the repo"""
1803 tr = op.gettransaction()
1802 tr = op.gettransaction()
1804 markerdata = inpart.read()
1803 markerdata = inpart.read()
1805 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1804 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1806 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1805 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1807 % len(markerdata))
1806 % len(markerdata))
1808 # The mergemarkers call will crash if marker creation is not enabled.
1807 # The mergemarkers call will crash if marker creation is not enabled.
1809 # we want to avoid this if the part is advisory.
1808 # we want to avoid this if the part is advisory.
1810 if not inpart.mandatory and op.repo.obsstore.readonly:
1809 if not inpart.mandatory and op.repo.obsstore.readonly:
1811 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1810 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1812 return
1811 return
1813 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1812 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1814 op.repo.invalidatevolatilesets()
1813 op.repo.invalidatevolatilesets()
1815 if new:
1814 if new:
1816 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1815 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1817 op.records.add('obsmarkers', {'new': new})
1816 op.records.add('obsmarkers', {'new': new})
1818 scmutil.registersummarycallback(op.repo, tr)
1817 scmutil.registersummarycallback(op.repo, tr)
1819 if op.reply is not None:
1818 if op.reply is not None:
1820 rpart = op.reply.newpart('reply:obsmarkers')
1819 rpart = op.reply.newpart('reply:obsmarkers')
1821 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1820 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1822 rpart.addparam('new', '%i' % new, mandatory=False)
1821 rpart.addparam('new', '%i' % new, mandatory=False)
1823
1822
1824
1823
1825 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1824 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1826 def handleobsmarkerreply(op, inpart):
1825 def handleobsmarkerreply(op, inpart):
1827 """retrieve the result of a pushkey request"""
1826 """retrieve the result of a pushkey request"""
1828 ret = int(inpart.params['new'])
1827 ret = int(inpart.params['new'])
1829 partid = int(inpart.params['in-reply-to'])
1828 partid = int(inpart.params['in-reply-to'])
1830 op.records.add('obsmarkers', {'new': ret}, partid)
1829 op.records.add('obsmarkers', {'new': ret}, partid)
1831
1830
1832 @parthandler('hgtagsfnodes')
1831 @parthandler('hgtagsfnodes')
1833 def handlehgtagsfnodes(op, inpart):
1832 def handlehgtagsfnodes(op, inpart):
1834 """Applies .hgtags fnodes cache entries to the local repo.
1833 """Applies .hgtags fnodes cache entries to the local repo.
1835
1834
1836 Payload is pairs of 20 byte changeset nodes and filenodes.
1835 Payload is pairs of 20 byte changeset nodes and filenodes.
1837 """
1836 """
1838 # Grab the transaction so we ensure that we have the lock at this point.
1837 # Grab the transaction so we ensure that we have the lock at this point.
1839 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1838 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1840 op.gettransaction()
1839 op.gettransaction()
1841 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1840 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1842
1841
1843 count = 0
1842 count = 0
1844 while True:
1843 while True:
1845 node = inpart.read(20)
1844 node = inpart.read(20)
1846 fnode = inpart.read(20)
1845 fnode = inpart.read(20)
1847 if len(node) < 20 or len(fnode) < 20:
1846 if len(node) < 20 or len(fnode) < 20:
1848 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1847 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1849 break
1848 break
1850 cache.setfnode(node, fnode)
1849 cache.setfnode(node, fnode)
1851 count += 1
1850 count += 1
1852
1851
1853 cache.write()
1852 cache.write()
1854 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1853 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
@@ -1,1009 +1,1009 b''
1 # changegroup.py - Mercurial changegroup manipulation functions
1 # changegroup.py - Mercurial changegroup manipulation functions
2 #
2 #
3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import os
10 import os
11 import struct
11 import struct
12 import tempfile
12 import tempfile
13 import weakref
13 import weakref
14
14
15 from .i18n import _
15 from .i18n import _
16 from .node import (
16 from .node import (
17 hex,
17 hex,
18 nullrev,
18 nullrev,
19 short,
19 short,
20 )
20 )
21
21
22 from . import (
22 from . import (
23 dagutil,
23 dagutil,
24 discovery,
24 discovery,
25 error,
25 error,
26 mdiff,
26 mdiff,
27 phases,
27 phases,
28 pycompat,
28 pycompat,
29 util,
29 util,
30 )
30 )
31
31
32 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
32 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
33 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
33 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
34 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
34 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
35
35
36 def readexactly(stream, n):
36 def readexactly(stream, n):
37 '''read n bytes from stream.read and abort if less was available'''
37 '''read n bytes from stream.read and abort if less was available'''
38 s = stream.read(n)
38 s = stream.read(n)
39 if len(s) < n:
39 if len(s) < n:
40 raise error.Abort(_("stream ended unexpectedly"
40 raise error.Abort(_("stream ended unexpectedly"
41 " (got %d bytes, expected %d)")
41 " (got %d bytes, expected %d)")
42 % (len(s), n))
42 % (len(s), n))
43 return s
43 return s
44
44
45 def getchunk(stream):
45 def getchunk(stream):
46 """return the next chunk from stream as a string"""
46 """return the next chunk from stream as a string"""
47 d = readexactly(stream, 4)
47 d = readexactly(stream, 4)
48 l = struct.unpack(">l", d)[0]
48 l = struct.unpack(">l", d)[0]
49 if l <= 4:
49 if l <= 4:
50 if l:
50 if l:
51 raise error.Abort(_("invalid chunk length %d") % l)
51 raise error.Abort(_("invalid chunk length %d") % l)
52 return ""
52 return ""
53 return readexactly(stream, l - 4)
53 return readexactly(stream, l - 4)
54
54
55 def chunkheader(length):
55 def chunkheader(length):
56 """return a changegroup chunk header (string)"""
56 """return a changegroup chunk header (string)"""
57 return struct.pack(">l", length + 4)
57 return struct.pack(">l", length + 4)
58
58
59 def closechunk():
59 def closechunk():
60 """return a changegroup chunk header (string) for a zero-length chunk"""
60 """return a changegroup chunk header (string) for a zero-length chunk"""
61 return struct.pack(">l", 0)
61 return struct.pack(">l", 0)
62
62
63 def writechunks(ui, chunks, filename, vfs=None):
63 def writechunks(ui, chunks, filename, vfs=None):
64 """Write chunks to a file and return its filename.
64 """Write chunks to a file and return its filename.
65
65
66 The stream is assumed to be a bundle file.
66 The stream is assumed to be a bundle file.
67 Existing files will not be overwritten.
67 Existing files will not be overwritten.
68 If no filename is specified, a temporary file is created.
68 If no filename is specified, a temporary file is created.
69 """
69 """
70 fh = None
70 fh = None
71 cleanup = None
71 cleanup = None
72 try:
72 try:
73 if filename:
73 if filename:
74 if vfs:
74 if vfs:
75 fh = vfs.open(filename, "wb")
75 fh = vfs.open(filename, "wb")
76 else:
76 else:
77 # Increase default buffer size because default is usually
77 # Increase default buffer size because default is usually
78 # small (4k is common on Linux).
78 # small (4k is common on Linux).
79 fh = open(filename, "wb", 131072)
79 fh = open(filename, "wb", 131072)
80 else:
80 else:
81 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
81 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
82 fh = os.fdopen(fd, pycompat.sysstr("wb"))
82 fh = os.fdopen(fd, pycompat.sysstr("wb"))
83 cleanup = filename
83 cleanup = filename
84 for c in chunks:
84 for c in chunks:
85 fh.write(c)
85 fh.write(c)
86 cleanup = None
86 cleanup = None
87 return filename
87 return filename
88 finally:
88 finally:
89 if fh is not None:
89 if fh is not None:
90 fh.close()
90 fh.close()
91 if cleanup is not None:
91 if cleanup is not None:
92 if filename and vfs:
92 if filename and vfs:
93 vfs.unlink(cleanup)
93 vfs.unlink(cleanup)
94 else:
94 else:
95 os.unlink(cleanup)
95 os.unlink(cleanup)
96
96
97 class cg1unpacker(object):
97 class cg1unpacker(object):
98 """Unpacker for cg1 changegroup streams.
98 """Unpacker for cg1 changegroup streams.
99
99
100 A changegroup unpacker handles the framing of the revision data in
100 A changegroup unpacker handles the framing of the revision data in
101 the wire format. Most consumers will want to use the apply()
101 the wire format. Most consumers will want to use the apply()
102 method to add the changes from the changegroup to a repository.
102 method to add the changes from the changegroup to a repository.
103
103
104 If you're forwarding a changegroup unmodified to another consumer,
104 If you're forwarding a changegroup unmodified to another consumer,
105 use getchunks(), which returns an iterator of changegroup
105 use getchunks(), which returns an iterator of changegroup
106 chunks. This is mostly useful for cases where you need to know the
106 chunks. This is mostly useful for cases where you need to know the
107 data stream has ended by observing the end of the changegroup.
107 data stream has ended by observing the end of the changegroup.
108
108
109 deltachunk() is useful only if you're applying delta data. Most
109 deltachunk() is useful only if you're applying delta data. Most
110 consumers should prefer apply() instead.
110 consumers should prefer apply() instead.
111
111
112 A few other public methods exist. Those are used only for
112 A few other public methods exist. Those are used only for
113 bundlerepo and some debug commands - their use is discouraged.
113 bundlerepo and some debug commands - their use is discouraged.
114 """
114 """
115 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
115 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
116 deltaheadersize = struct.calcsize(deltaheader)
116 deltaheadersize = struct.calcsize(deltaheader)
117 version = '01'
117 version = '01'
118 _grouplistcount = 1 # One list of files after the manifests
118 _grouplistcount = 1 # One list of files after the manifests
119
119
120 def __init__(self, fh, alg, extras=None):
120 def __init__(self, fh, alg, extras=None):
121 if alg is None:
121 if alg is None:
122 alg = 'UN'
122 alg = 'UN'
123 if alg not in util.compengines.supportedbundletypes:
123 if alg not in util.compengines.supportedbundletypes:
124 raise error.Abort(_('unknown stream compression type: %s')
124 raise error.Abort(_('unknown stream compression type: %s')
125 % alg)
125 % alg)
126 if alg == 'BZ':
126 if alg == 'BZ':
127 alg = '_truncatedBZ'
127 alg = '_truncatedBZ'
128
128
129 compengine = util.compengines.forbundletype(alg)
129 compengine = util.compengines.forbundletype(alg)
130 self._stream = compengine.decompressorreader(fh)
130 self._stream = compengine.decompressorreader(fh)
131 self._type = alg
131 self._type = alg
132 self.extras = extras or {}
132 self.extras = extras or {}
133 self.callback = None
133 self.callback = None
134
134
135 # These methods (compressed, read, seek, tell) all appear to only
135 # These methods (compressed, read, seek, tell) all appear to only
136 # be used by bundlerepo, but it's a little hard to tell.
136 # be used by bundlerepo, but it's a little hard to tell.
137 def compressed(self):
137 def compressed(self):
138 return self._type is not None and self._type != 'UN'
138 return self._type is not None and self._type != 'UN'
139 def read(self, l):
139 def read(self, l):
140 return self._stream.read(l)
140 return self._stream.read(l)
141 def seek(self, pos):
141 def seek(self, pos):
142 return self._stream.seek(pos)
142 return self._stream.seek(pos)
143 def tell(self):
143 def tell(self):
144 return self._stream.tell()
144 return self._stream.tell()
145 def close(self):
145 def close(self):
146 return self._stream.close()
146 return self._stream.close()
147
147
148 def _chunklength(self):
148 def _chunklength(self):
149 d = readexactly(self._stream, 4)
149 d = readexactly(self._stream, 4)
150 l = struct.unpack(">l", d)[0]
150 l = struct.unpack(">l", d)[0]
151 if l <= 4:
151 if l <= 4:
152 if l:
152 if l:
153 raise error.Abort(_("invalid chunk length %d") % l)
153 raise error.Abort(_("invalid chunk length %d") % l)
154 return 0
154 return 0
155 if self.callback:
155 if self.callback:
156 self.callback()
156 self.callback()
157 return l - 4
157 return l - 4
158
158
159 def changelogheader(self):
159 def changelogheader(self):
160 """v10 does not have a changelog header chunk"""
160 """v10 does not have a changelog header chunk"""
161 return {}
161 return {}
162
162
163 def manifestheader(self):
163 def manifestheader(self):
164 """v10 does not have a manifest header chunk"""
164 """v10 does not have a manifest header chunk"""
165 return {}
165 return {}
166
166
167 def filelogheader(self):
167 def filelogheader(self):
168 """return the header of the filelogs chunk, v10 only has the filename"""
168 """return the header of the filelogs chunk, v10 only has the filename"""
169 l = self._chunklength()
169 l = self._chunklength()
170 if not l:
170 if not l:
171 return {}
171 return {}
172 fname = readexactly(self._stream, l)
172 fname = readexactly(self._stream, l)
173 return {'filename': fname}
173 return {'filename': fname}
174
174
175 def _deltaheader(self, headertuple, prevnode):
175 def _deltaheader(self, headertuple, prevnode):
176 node, p1, p2, cs = headertuple
176 node, p1, p2, cs = headertuple
177 if prevnode is None:
177 if prevnode is None:
178 deltabase = p1
178 deltabase = p1
179 else:
179 else:
180 deltabase = prevnode
180 deltabase = prevnode
181 flags = 0
181 flags = 0
182 return node, p1, p2, deltabase, cs, flags
182 return node, p1, p2, deltabase, cs, flags
183
183
184 def deltachunk(self, prevnode):
184 def deltachunk(self, prevnode):
185 l = self._chunklength()
185 l = self._chunklength()
186 if not l:
186 if not l:
187 return {}
187 return {}
188 headerdata = readexactly(self._stream, self.deltaheadersize)
188 headerdata = readexactly(self._stream, self.deltaheadersize)
189 header = struct.unpack(self.deltaheader, headerdata)
189 header = struct.unpack(self.deltaheader, headerdata)
190 delta = readexactly(self._stream, l - self.deltaheadersize)
190 delta = readexactly(self._stream, l - self.deltaheadersize)
191 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
191 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
192 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
192 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
193 'deltabase': deltabase, 'delta': delta, 'flags': flags}
193 'deltabase': deltabase, 'delta': delta, 'flags': flags}
194
194
195 def getchunks(self):
195 def getchunks(self):
196 """returns all the chunks contains in the bundle
196 """returns all the chunks contains in the bundle
197
197
198 Used when you need to forward the binary stream to a file or another
198 Used when you need to forward the binary stream to a file or another
199 network API. To do so, it parse the changegroup data, otherwise it will
199 network API. To do so, it parse the changegroup data, otherwise it will
200 block in case of sshrepo because it don't know the end of the stream.
200 block in case of sshrepo because it don't know the end of the stream.
201 """
201 """
202 # an empty chunkgroup is the end of the changegroup
202 # an empty chunkgroup is the end of the changegroup
203 # a changegroup has at least 2 chunkgroups (changelog and manifest).
203 # a changegroup has at least 2 chunkgroups (changelog and manifest).
204 # after that, changegroup versions 1 and 2 have a series of groups
204 # after that, changegroup versions 1 and 2 have a series of groups
205 # with one group per file. changegroup 3 has a series of directory
205 # with one group per file. changegroup 3 has a series of directory
206 # manifests before the files.
206 # manifests before the files.
207 count = 0
207 count = 0
208 emptycount = 0
208 emptycount = 0
209 while emptycount < self._grouplistcount:
209 while emptycount < self._grouplistcount:
210 empty = True
210 empty = True
211 count += 1
211 count += 1
212 while True:
212 while True:
213 chunk = getchunk(self)
213 chunk = getchunk(self)
214 if not chunk:
214 if not chunk:
215 if empty and count > 2:
215 if empty and count > 2:
216 emptycount += 1
216 emptycount += 1
217 break
217 break
218 empty = False
218 empty = False
219 yield chunkheader(len(chunk))
219 yield chunkheader(len(chunk))
220 pos = 0
220 pos = 0
221 while pos < len(chunk):
221 while pos < len(chunk):
222 next = pos + 2**20
222 next = pos + 2**20
223 yield chunk[pos:next]
223 yield chunk[pos:next]
224 pos = next
224 pos = next
225 yield closechunk()
225 yield closechunk()
226
226
227 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
227 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
228 # We know that we'll never have more manifests than we had
228 # We know that we'll never have more manifests than we had
229 # changesets.
229 # changesets.
230 self.callback = prog(_('manifests'), numchanges)
230 self.callback = prog(_('manifests'), numchanges)
231 # no need to check for empty manifest group here:
231 # no need to check for empty manifest group here:
232 # if the result of the merge of 1 and 2 is the same in 3 and 4,
232 # if the result of the merge of 1 and 2 is the same in 3 and 4,
233 # no new manifest will be created and the manifest group will
233 # no new manifest will be created and the manifest group will
234 # be empty during the pull
234 # be empty during the pull
235 self.manifestheader()
235 self.manifestheader()
236 repo.manifestlog._revlog.addgroup(self, revmap, trp)
236 repo.manifestlog._revlog.addgroup(self, revmap, trp)
237 repo.ui.progress(_('manifests'), None)
237 repo.ui.progress(_('manifests'), None)
238 self.callback = None
238 self.callback = None
239
239
240 def apply(self, repo, tr, srctype, url, targetphase=phases.draft,
240 def apply(self, repo, tr, srctype, url, targetphase=phases.draft,
241 expectedtotal=None):
241 expectedtotal=None):
242 """Add the changegroup returned by source.read() to this repo.
242 """Add the changegroup returned by source.read() to this repo.
243 srctype is a string like 'push', 'pull', or 'unbundle'. url is
243 srctype is a string like 'push', 'pull', or 'unbundle'. url is
244 the URL of the repo where this changegroup is coming from.
244 the URL of the repo where this changegroup is coming from.
245
245
246 Return an integer summarizing the change to this repo:
246 Return an integer summarizing the change to this repo:
247 - nothing changed or no source: 0
247 - nothing changed or no source: 0
248 - more heads than before: 1+added heads (2..n)
248 - more heads than before: 1+added heads (2..n)
249 - fewer heads than before: -1-removed heads (-2..-n)
249 - fewer heads than before: -1-removed heads (-2..-n)
250 - number of heads stays the same: 1
250 - number of heads stays the same: 1
251 """
251 """
252 repo = repo.unfiltered()
252 repo = repo.unfiltered()
253 def csmap(x):
253 def csmap(x):
254 repo.ui.debug("add changeset %s\n" % short(x))
254 repo.ui.debug("add changeset %s\n" % short(x))
255 return len(cl)
255 return len(cl)
256
256
257 def revmap(x):
257 def revmap(x):
258 return cl.rev(x)
258 return cl.rev(x)
259
259
260 changesets = files = revisions = 0
260 changesets = files = revisions = 0
261
261
262 try:
262 try:
263 # The transaction may already carry source information. In this
263 # The transaction may already carry source information. In this
264 # case we use the top level data. We overwrite the argument
264 # case we use the top level data. We overwrite the argument
265 # because we need to use the top level value (if they exist)
265 # because we need to use the top level value (if they exist)
266 # in this function.
266 # in this function.
267 srctype = tr.hookargs.setdefault('source', srctype)
267 srctype = tr.hookargs.setdefault('source', srctype)
268 url = tr.hookargs.setdefault('url', url)
268 url = tr.hookargs.setdefault('url', url)
269 repo.hook('prechangegroup', throw=True, **tr.hookargs)
269 repo.hook('prechangegroup', throw=True, **tr.hookargs)
270
270
271 # write changelog data to temp files so concurrent readers
271 # write changelog data to temp files so concurrent readers
272 # will not see an inconsistent view
272 # will not see an inconsistent view
273 cl = repo.changelog
273 cl = repo.changelog
274 cl.delayupdate(tr)
274 cl.delayupdate(tr)
275 oldheads = set(cl.heads())
275 oldheads = set(cl.heads())
276
276
277 trp = weakref.proxy(tr)
277 trp = weakref.proxy(tr)
278 # pull off the changeset group
278 # pull off the changeset group
279 repo.ui.status(_("adding changesets\n"))
279 repo.ui.status(_("adding changesets\n"))
280 clstart = len(cl)
280 clstart = len(cl)
281 class prog(object):
281 class prog(object):
282 def __init__(self, step, total):
282 def __init__(self, step, total):
283 self._step = step
283 self._step = step
284 self._total = total
284 self._total = total
285 self._count = 1
285 self._count = 1
286 def __call__(self):
286 def __call__(self):
287 repo.ui.progress(self._step, self._count, unit=_('chunks'),
287 repo.ui.progress(self._step, self._count, unit=_('chunks'),
288 total=self._total)
288 total=self._total)
289 self._count += 1
289 self._count += 1
290 self.callback = prog(_('changesets'), expectedtotal)
290 self.callback = prog(_('changesets'), expectedtotal)
291
291
292 efiles = set()
292 efiles = set()
293 def onchangelog(cl, node):
293 def onchangelog(cl, node):
294 efiles.update(cl.readfiles(node))
294 efiles.update(cl.readfiles(node))
295
295
296 self.changelogheader()
296 self.changelogheader()
297 cgnodes = cl.addgroup(self, csmap, trp, addrevisioncb=onchangelog)
297 cgnodes = cl.addgroup(self, csmap, trp, addrevisioncb=onchangelog)
298 efiles = len(efiles)
298 efiles = len(efiles)
299
299
300 if not cgnodes:
300 if not cgnodes:
301 repo.ui.develwarn('applied empty changegroup',
301 repo.ui.develwarn('applied empty changegroup',
302 config='empty-changegroup')
302 config='empty-changegroup')
303 clend = len(cl)
303 clend = len(cl)
304 changesets = clend - clstart
304 changesets = clend - clstart
305 repo.ui.progress(_('changesets'), None)
305 repo.ui.progress(_('changesets'), None)
306 self.callback = None
306 self.callback = None
307
307
308 # pull off the manifest group
308 # pull off the manifest group
309 repo.ui.status(_("adding manifests\n"))
309 repo.ui.status(_("adding manifests\n"))
310 self._unpackmanifests(repo, revmap, trp, prog, changesets)
310 self._unpackmanifests(repo, revmap, trp, prog, changesets)
311
311
312 needfiles = {}
312 needfiles = {}
313 if repo.ui.configbool('server', 'validate'):
313 if repo.ui.configbool('server', 'validate'):
314 cl = repo.changelog
314 cl = repo.changelog
315 ml = repo.manifestlog
315 ml = repo.manifestlog
316 # validate incoming csets have their manifests
316 # validate incoming csets have their manifests
317 for cset in xrange(clstart, clend):
317 for cset in xrange(clstart, clend):
318 mfnode = cl.changelogrevision(cset).manifest
318 mfnode = cl.changelogrevision(cset).manifest
319 mfest = ml[mfnode].readdelta()
319 mfest = ml[mfnode].readdelta()
320 # store file cgnodes we must see
320 # store file cgnodes we must see
321 for f, n in mfest.iteritems():
321 for f, n in mfest.iteritems():
322 needfiles.setdefault(f, set()).add(n)
322 needfiles.setdefault(f, set()).add(n)
323
323
324 # process the files
324 # process the files
325 repo.ui.status(_("adding file changes\n"))
325 repo.ui.status(_("adding file changes\n"))
326 newrevs, newfiles = _addchangegroupfiles(
326 newrevs, newfiles = _addchangegroupfiles(
327 repo, self, revmap, trp, efiles, needfiles)
327 repo, self, revmap, trp, efiles, needfiles)
328 revisions += newrevs
328 revisions += newrevs
329 files += newfiles
329 files += newfiles
330
330
331 deltaheads = 0
331 deltaheads = 0
332 if oldheads:
332 if oldheads:
333 heads = cl.heads()
333 heads = cl.heads()
334 deltaheads = len(heads) - len(oldheads)
334 deltaheads = len(heads) - len(oldheads)
335 for h in heads:
335 for h in heads:
336 if h not in oldheads and repo[h].closesbranch():
336 if h not in oldheads and repo[h].closesbranch():
337 deltaheads -= 1
337 deltaheads -= 1
338 htext = ""
338 htext = ""
339 if deltaheads:
339 if deltaheads:
340 htext = _(" (%+d heads)") % deltaheads
340 htext = _(" (%+d heads)") % deltaheads
341
341
342 repo.ui.status(_("added %d changesets"
342 repo.ui.status(_("added %d changesets"
343 " with %d changes to %d files%s\n")
343 " with %d changes to %d files%s\n")
344 % (changesets, revisions, files, htext))
344 % (changesets, revisions, files, htext))
345 repo.invalidatevolatilesets()
345 repo.invalidatevolatilesets()
346
346
347 if changesets > 0:
347 if changesets > 0:
348 if 'node' not in tr.hookargs:
348 if 'node' not in tr.hookargs:
349 tr.hookargs['node'] = hex(cl.node(clstart))
349 tr.hookargs['node'] = hex(cl.node(clstart))
350 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
350 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
351 hookargs = dict(tr.hookargs)
351 hookargs = dict(tr.hookargs)
352 else:
352 else:
353 hookargs = dict(tr.hookargs)
353 hookargs = dict(tr.hookargs)
354 hookargs['node'] = hex(cl.node(clstart))
354 hookargs['node'] = hex(cl.node(clstart))
355 hookargs['node_last'] = hex(cl.node(clend - 1))
355 hookargs['node_last'] = hex(cl.node(clend - 1))
356 repo.hook('pretxnchangegroup', throw=True, **hookargs)
356 repo.hook('pretxnchangegroup', throw=True, **hookargs)
357
357
358 added = [cl.node(r) for r in xrange(clstart, clend)]
358 added = [cl.node(r) for r in xrange(clstart, clend)]
359 phaseall = None
359 phaseall = None
360 if srctype in ('push', 'serve'):
360 if srctype in ('push', 'serve'):
361 # Old servers can not push the boundary themselves.
361 # Old servers can not push the boundary themselves.
362 # New servers won't push the boundary if changeset already
362 # New servers won't push the boundary if changeset already
363 # exists locally as secret
363 # exists locally as secret
364 #
364 #
365 # We should not use added here but the list of all change in
365 # We should not use added here but the list of all change in
366 # the bundle
366 # the bundle
367 if repo.publishing():
367 if repo.publishing():
368 targetphase = phaseall = phases.public
368 targetphase = phaseall = phases.public
369 else:
369 else:
370 # closer target phase computation
370 # closer target phase computation
371
371
372 # Those changesets have been pushed from the
372 # Those changesets have been pushed from the
373 # outside, their phases are going to be pushed
373 # outside, their phases are going to be pushed
374 # alongside. Therefor `targetphase` is
374 # alongside. Therefor `targetphase` is
375 # ignored.
375 # ignored.
376 targetphase = phaseall = phases.draft
376 targetphase = phaseall = phases.draft
377 if added:
377 if added:
378 phases.registernew(repo, tr, targetphase, added)
378 phases.registernew(repo, tr, targetphase, added)
379 if phaseall is not None:
379 if phaseall is not None:
380 phases.advanceboundary(repo, tr, phaseall, cgnodes)
380 phases.advanceboundary(repo, tr, phaseall, cgnodes)
381
381
382 if changesets > 0:
382 if changesets > 0:
383
383
384 def runhooks():
384 def runhooks():
385 # These hooks run when the lock releases, not when the
385 # These hooks run when the lock releases, not when the
386 # transaction closes. So it's possible for the changelog
386 # transaction closes. So it's possible for the changelog
387 # to have changed since we last saw it.
387 # to have changed since we last saw it.
388 if clstart >= len(repo):
388 if clstart >= len(repo):
389 return
389 return
390
390
391 repo.hook("changegroup", **hookargs)
391 repo.hook("changegroup", **hookargs)
392
392
393 for n in added:
393 for n in added:
394 args = hookargs.copy()
394 args = hookargs.copy()
395 args['node'] = hex(n)
395 args['node'] = hex(n)
396 del args['node_last']
396 del args['node_last']
397 repo.hook("incoming", **args)
397 repo.hook("incoming", **args)
398
398
399 newheads = [h for h in repo.heads()
399 newheads = [h for h in repo.heads()
400 if h not in oldheads]
400 if h not in oldheads]
401 repo.ui.log("incoming",
401 repo.ui.log("incoming",
402 "%s incoming changes - new heads: %s\n",
402 "%s incoming changes - new heads: %s\n",
403 len(added),
403 len(added),
404 ', '.join([hex(c[:6]) for c in newheads]))
404 ', '.join([hex(c[:6]) for c in newheads]))
405
405
406 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
406 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
407 lambda tr: repo._afterlock(runhooks))
407 lambda tr: repo._afterlock(runhooks))
408 finally:
408 finally:
409 repo.ui.flush()
409 repo.ui.flush()
410 # never return 0 here:
410 # never return 0 here:
411 if deltaheads < 0:
411 if deltaheads < 0:
412 ret = deltaheads - 1
412 ret = deltaheads - 1
413 else:
413 else:
414 ret = deltaheads + 1
414 ret = deltaheads + 1
415 return ret, added
415 return ret
416
416
417 class cg2unpacker(cg1unpacker):
417 class cg2unpacker(cg1unpacker):
418 """Unpacker for cg2 streams.
418 """Unpacker for cg2 streams.
419
419
420 cg2 streams add support for generaldelta, so the delta header
420 cg2 streams add support for generaldelta, so the delta header
421 format is slightly different. All other features about the data
421 format is slightly different. All other features about the data
422 remain the same.
422 remain the same.
423 """
423 """
424 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
424 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
425 deltaheadersize = struct.calcsize(deltaheader)
425 deltaheadersize = struct.calcsize(deltaheader)
426 version = '02'
426 version = '02'
427
427
428 def _deltaheader(self, headertuple, prevnode):
428 def _deltaheader(self, headertuple, prevnode):
429 node, p1, p2, deltabase, cs = headertuple
429 node, p1, p2, deltabase, cs = headertuple
430 flags = 0
430 flags = 0
431 return node, p1, p2, deltabase, cs, flags
431 return node, p1, p2, deltabase, cs, flags
432
432
433 class cg3unpacker(cg2unpacker):
433 class cg3unpacker(cg2unpacker):
434 """Unpacker for cg3 streams.
434 """Unpacker for cg3 streams.
435
435
436 cg3 streams add support for exchanging treemanifests and revlog
436 cg3 streams add support for exchanging treemanifests and revlog
437 flags. It adds the revlog flags to the delta header and an empty chunk
437 flags. It adds the revlog flags to the delta header and an empty chunk
438 separating manifests and files.
438 separating manifests and files.
439 """
439 """
440 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
440 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
441 deltaheadersize = struct.calcsize(deltaheader)
441 deltaheadersize = struct.calcsize(deltaheader)
442 version = '03'
442 version = '03'
443 _grouplistcount = 2 # One list of manifests and one list of files
443 _grouplistcount = 2 # One list of manifests and one list of files
444
444
445 def _deltaheader(self, headertuple, prevnode):
445 def _deltaheader(self, headertuple, prevnode):
446 node, p1, p2, deltabase, cs, flags = headertuple
446 node, p1, p2, deltabase, cs, flags = headertuple
447 return node, p1, p2, deltabase, cs, flags
447 return node, p1, p2, deltabase, cs, flags
448
448
449 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
449 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
450 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
450 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
451 numchanges)
451 numchanges)
452 for chunkdata in iter(self.filelogheader, {}):
452 for chunkdata in iter(self.filelogheader, {}):
453 # If we get here, there are directory manifests in the changegroup
453 # If we get here, there are directory manifests in the changegroup
454 d = chunkdata["filename"]
454 d = chunkdata["filename"]
455 repo.ui.debug("adding %s revisions\n" % d)
455 repo.ui.debug("adding %s revisions\n" % d)
456 dirlog = repo.manifestlog._revlog.dirlog(d)
456 dirlog = repo.manifestlog._revlog.dirlog(d)
457 if not dirlog.addgroup(self, revmap, trp):
457 if not dirlog.addgroup(self, revmap, trp):
458 raise error.Abort(_("received dir revlog group is empty"))
458 raise error.Abort(_("received dir revlog group is empty"))
459
459
460 class headerlessfixup(object):
460 class headerlessfixup(object):
461 def __init__(self, fh, h):
461 def __init__(self, fh, h):
462 self._h = h
462 self._h = h
463 self._fh = fh
463 self._fh = fh
464 def read(self, n):
464 def read(self, n):
465 if self._h:
465 if self._h:
466 d, self._h = self._h[:n], self._h[n:]
466 d, self._h = self._h[:n], self._h[n:]
467 if len(d) < n:
467 if len(d) < n:
468 d += readexactly(self._fh, n - len(d))
468 d += readexactly(self._fh, n - len(d))
469 return d
469 return d
470 return readexactly(self._fh, n)
470 return readexactly(self._fh, n)
471
471
472 class cg1packer(object):
472 class cg1packer(object):
473 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
473 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
474 version = '01'
474 version = '01'
475 def __init__(self, repo, bundlecaps=None):
475 def __init__(self, repo, bundlecaps=None):
476 """Given a source repo, construct a bundler.
476 """Given a source repo, construct a bundler.
477
477
478 bundlecaps is optional and can be used to specify the set of
478 bundlecaps is optional and can be used to specify the set of
479 capabilities which can be used to build the bundle. While bundlecaps is
479 capabilities which can be used to build the bundle. While bundlecaps is
480 unused in core Mercurial, extensions rely on this feature to communicate
480 unused in core Mercurial, extensions rely on this feature to communicate
481 capabilities to customize the changegroup packer.
481 capabilities to customize the changegroup packer.
482 """
482 """
483 # Set of capabilities we can use to build the bundle.
483 # Set of capabilities we can use to build the bundle.
484 if bundlecaps is None:
484 if bundlecaps is None:
485 bundlecaps = set()
485 bundlecaps = set()
486 self._bundlecaps = bundlecaps
486 self._bundlecaps = bundlecaps
487 # experimental config: bundle.reorder
487 # experimental config: bundle.reorder
488 reorder = repo.ui.config('bundle', 'reorder')
488 reorder = repo.ui.config('bundle', 'reorder')
489 if reorder == 'auto':
489 if reorder == 'auto':
490 reorder = None
490 reorder = None
491 else:
491 else:
492 reorder = util.parsebool(reorder)
492 reorder = util.parsebool(reorder)
493 self._repo = repo
493 self._repo = repo
494 self._reorder = reorder
494 self._reorder = reorder
495 self._progress = repo.ui.progress
495 self._progress = repo.ui.progress
496 if self._repo.ui.verbose and not self._repo.ui.debugflag:
496 if self._repo.ui.verbose and not self._repo.ui.debugflag:
497 self._verbosenote = self._repo.ui.note
497 self._verbosenote = self._repo.ui.note
498 else:
498 else:
499 self._verbosenote = lambda s: None
499 self._verbosenote = lambda s: None
500
500
501 def close(self):
501 def close(self):
502 return closechunk()
502 return closechunk()
503
503
504 def fileheader(self, fname):
504 def fileheader(self, fname):
505 return chunkheader(len(fname)) + fname
505 return chunkheader(len(fname)) + fname
506
506
507 # Extracted both for clarity and for overriding in extensions.
507 # Extracted both for clarity and for overriding in extensions.
508 def _sortgroup(self, revlog, nodelist, lookup):
508 def _sortgroup(self, revlog, nodelist, lookup):
509 """Sort nodes for change group and turn them into revnums."""
509 """Sort nodes for change group and turn them into revnums."""
510 # for generaldelta revlogs, we linearize the revs; this will both be
510 # for generaldelta revlogs, we linearize the revs; this will both be
511 # much quicker and generate a much smaller bundle
511 # much quicker and generate a much smaller bundle
512 if (revlog._generaldelta and self._reorder is None) or self._reorder:
512 if (revlog._generaldelta and self._reorder is None) or self._reorder:
513 dag = dagutil.revlogdag(revlog)
513 dag = dagutil.revlogdag(revlog)
514 return dag.linearize(set(revlog.rev(n) for n in nodelist))
514 return dag.linearize(set(revlog.rev(n) for n in nodelist))
515 else:
515 else:
516 return sorted([revlog.rev(n) for n in nodelist])
516 return sorted([revlog.rev(n) for n in nodelist])
517
517
518 def group(self, nodelist, revlog, lookup, units=None):
518 def group(self, nodelist, revlog, lookup, units=None):
519 """Calculate a delta group, yielding a sequence of changegroup chunks
519 """Calculate a delta group, yielding a sequence of changegroup chunks
520 (strings).
520 (strings).
521
521
522 Given a list of changeset revs, return a set of deltas and
522 Given a list of changeset revs, return a set of deltas and
523 metadata corresponding to nodes. The first delta is
523 metadata corresponding to nodes. The first delta is
524 first parent(nodelist[0]) -> nodelist[0], the receiver is
524 first parent(nodelist[0]) -> nodelist[0], the receiver is
525 guaranteed to have this parent as it has all history before
525 guaranteed to have this parent as it has all history before
526 these changesets. In the case firstparent is nullrev the
526 these changesets. In the case firstparent is nullrev the
527 changegroup starts with a full revision.
527 changegroup starts with a full revision.
528
528
529 If units is not None, progress detail will be generated, units specifies
529 If units is not None, progress detail will be generated, units specifies
530 the type of revlog that is touched (changelog, manifest, etc.).
530 the type of revlog that is touched (changelog, manifest, etc.).
531 """
531 """
532 # if we don't have any revisions touched by these changesets, bail
532 # if we don't have any revisions touched by these changesets, bail
533 if len(nodelist) == 0:
533 if len(nodelist) == 0:
534 yield self.close()
534 yield self.close()
535 return
535 return
536
536
537 revs = self._sortgroup(revlog, nodelist, lookup)
537 revs = self._sortgroup(revlog, nodelist, lookup)
538
538
539 # add the parent of the first rev
539 # add the parent of the first rev
540 p = revlog.parentrevs(revs[0])[0]
540 p = revlog.parentrevs(revs[0])[0]
541 revs.insert(0, p)
541 revs.insert(0, p)
542
542
543 # build deltas
543 # build deltas
544 total = len(revs) - 1
544 total = len(revs) - 1
545 msgbundling = _('bundling')
545 msgbundling = _('bundling')
546 for r in xrange(len(revs) - 1):
546 for r in xrange(len(revs) - 1):
547 if units is not None:
547 if units is not None:
548 self._progress(msgbundling, r + 1, unit=units, total=total)
548 self._progress(msgbundling, r + 1, unit=units, total=total)
549 prev, curr = revs[r], revs[r + 1]
549 prev, curr = revs[r], revs[r + 1]
550 linknode = lookup(revlog.node(curr))
550 linknode = lookup(revlog.node(curr))
551 for c in self.revchunk(revlog, curr, prev, linknode):
551 for c in self.revchunk(revlog, curr, prev, linknode):
552 yield c
552 yield c
553
553
554 if units is not None:
554 if units is not None:
555 self._progress(msgbundling, None)
555 self._progress(msgbundling, None)
556 yield self.close()
556 yield self.close()
557
557
558 # filter any nodes that claim to be part of the known set
558 # filter any nodes that claim to be part of the known set
559 def prune(self, revlog, missing, commonrevs):
559 def prune(self, revlog, missing, commonrevs):
560 rr, rl = revlog.rev, revlog.linkrev
560 rr, rl = revlog.rev, revlog.linkrev
561 return [n for n in missing if rl(rr(n)) not in commonrevs]
561 return [n for n in missing if rl(rr(n)) not in commonrevs]
562
562
563 def _packmanifests(self, dir, mfnodes, lookuplinknode):
563 def _packmanifests(self, dir, mfnodes, lookuplinknode):
564 """Pack flat manifests into a changegroup stream."""
564 """Pack flat manifests into a changegroup stream."""
565 assert not dir
565 assert not dir
566 for chunk in self.group(mfnodes, self._repo.manifestlog._revlog,
566 for chunk in self.group(mfnodes, self._repo.manifestlog._revlog,
567 lookuplinknode, units=_('manifests')):
567 lookuplinknode, units=_('manifests')):
568 yield chunk
568 yield chunk
569
569
570 def _manifestsdone(self):
570 def _manifestsdone(self):
571 return ''
571 return ''
572
572
573 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
573 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
574 '''yield a sequence of changegroup chunks (strings)'''
574 '''yield a sequence of changegroup chunks (strings)'''
575 repo = self._repo
575 repo = self._repo
576 cl = repo.changelog
576 cl = repo.changelog
577
577
578 clrevorder = {}
578 clrevorder = {}
579 mfs = {} # needed manifests
579 mfs = {} # needed manifests
580 fnodes = {} # needed file nodes
580 fnodes = {} # needed file nodes
581 changedfiles = set()
581 changedfiles = set()
582
582
583 # Callback for the changelog, used to collect changed files and manifest
583 # Callback for the changelog, used to collect changed files and manifest
584 # nodes.
584 # nodes.
585 # Returns the linkrev node (identity in the changelog case).
585 # Returns the linkrev node (identity in the changelog case).
586 def lookupcl(x):
586 def lookupcl(x):
587 c = cl.read(x)
587 c = cl.read(x)
588 clrevorder[x] = len(clrevorder)
588 clrevorder[x] = len(clrevorder)
589 n = c[0]
589 n = c[0]
590 # record the first changeset introducing this manifest version
590 # record the first changeset introducing this manifest version
591 mfs.setdefault(n, x)
591 mfs.setdefault(n, x)
592 # Record a complete list of potentially-changed files in
592 # Record a complete list of potentially-changed files in
593 # this manifest.
593 # this manifest.
594 changedfiles.update(c[3])
594 changedfiles.update(c[3])
595 return x
595 return x
596
596
597 self._verbosenote(_('uncompressed size of bundle content:\n'))
597 self._verbosenote(_('uncompressed size of bundle content:\n'))
598 size = 0
598 size = 0
599 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
599 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
600 size += len(chunk)
600 size += len(chunk)
601 yield chunk
601 yield chunk
602 self._verbosenote(_('%8.i (changelog)\n') % size)
602 self._verbosenote(_('%8.i (changelog)\n') % size)
603
603
604 # We need to make sure that the linkrev in the changegroup refers to
604 # We need to make sure that the linkrev in the changegroup refers to
605 # the first changeset that introduced the manifest or file revision.
605 # the first changeset that introduced the manifest or file revision.
606 # The fastpath is usually safer than the slowpath, because the filelogs
606 # The fastpath is usually safer than the slowpath, because the filelogs
607 # are walked in revlog order.
607 # are walked in revlog order.
608 #
608 #
609 # When taking the slowpath with reorder=None and the manifest revlog
609 # When taking the slowpath with reorder=None and the manifest revlog
610 # uses generaldelta, the manifest may be walked in the "wrong" order.
610 # uses generaldelta, the manifest may be walked in the "wrong" order.
611 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
611 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
612 # cc0ff93d0c0c).
612 # cc0ff93d0c0c).
613 #
613 #
614 # When taking the fastpath, we are only vulnerable to reordering
614 # When taking the fastpath, we are only vulnerable to reordering
615 # of the changelog itself. The changelog never uses generaldelta, so
615 # of the changelog itself. The changelog never uses generaldelta, so
616 # it is only reordered when reorder=True. To handle this case, we
616 # it is only reordered when reorder=True. To handle this case, we
617 # simply take the slowpath, which already has the 'clrevorder' logic.
617 # simply take the slowpath, which already has the 'clrevorder' logic.
618 # This was also fixed in cc0ff93d0c0c.
618 # This was also fixed in cc0ff93d0c0c.
619 fastpathlinkrev = fastpathlinkrev and not self._reorder
619 fastpathlinkrev = fastpathlinkrev and not self._reorder
620 # Treemanifests don't work correctly with fastpathlinkrev
620 # Treemanifests don't work correctly with fastpathlinkrev
621 # either, because we don't discover which directory nodes to
621 # either, because we don't discover which directory nodes to
622 # send along with files. This could probably be fixed.
622 # send along with files. This could probably be fixed.
623 fastpathlinkrev = fastpathlinkrev and (
623 fastpathlinkrev = fastpathlinkrev and (
624 'treemanifest' not in repo.requirements)
624 'treemanifest' not in repo.requirements)
625
625
626 for chunk in self.generatemanifests(commonrevs, clrevorder,
626 for chunk in self.generatemanifests(commonrevs, clrevorder,
627 fastpathlinkrev, mfs, fnodes):
627 fastpathlinkrev, mfs, fnodes):
628 yield chunk
628 yield chunk
629 mfs.clear()
629 mfs.clear()
630 clrevs = set(cl.rev(x) for x in clnodes)
630 clrevs = set(cl.rev(x) for x in clnodes)
631
631
632 if not fastpathlinkrev:
632 if not fastpathlinkrev:
633 def linknodes(unused, fname):
633 def linknodes(unused, fname):
634 return fnodes.get(fname, {})
634 return fnodes.get(fname, {})
635 else:
635 else:
636 cln = cl.node
636 cln = cl.node
637 def linknodes(filerevlog, fname):
637 def linknodes(filerevlog, fname):
638 llr = filerevlog.linkrev
638 llr = filerevlog.linkrev
639 fln = filerevlog.node
639 fln = filerevlog.node
640 revs = ((r, llr(r)) for r in filerevlog)
640 revs = ((r, llr(r)) for r in filerevlog)
641 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
641 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
642
642
643 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
643 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
644 source):
644 source):
645 yield chunk
645 yield chunk
646
646
647 yield self.close()
647 yield self.close()
648
648
649 if clnodes:
649 if clnodes:
650 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
650 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
651
651
652 def generatemanifests(self, commonrevs, clrevorder, fastpathlinkrev, mfs,
652 def generatemanifests(self, commonrevs, clrevorder, fastpathlinkrev, mfs,
653 fnodes):
653 fnodes):
654 repo = self._repo
654 repo = self._repo
655 mfl = repo.manifestlog
655 mfl = repo.manifestlog
656 dirlog = mfl._revlog.dirlog
656 dirlog = mfl._revlog.dirlog
657 tmfnodes = {'': mfs}
657 tmfnodes = {'': mfs}
658
658
659 # Callback for the manifest, used to collect linkrevs for filelog
659 # Callback for the manifest, used to collect linkrevs for filelog
660 # revisions.
660 # revisions.
661 # Returns the linkrev node (collected in lookupcl).
661 # Returns the linkrev node (collected in lookupcl).
662 def makelookupmflinknode(dir):
662 def makelookupmflinknode(dir):
663 if fastpathlinkrev:
663 if fastpathlinkrev:
664 assert not dir
664 assert not dir
665 return mfs.__getitem__
665 return mfs.__getitem__
666
666
667 def lookupmflinknode(x):
667 def lookupmflinknode(x):
668 """Callback for looking up the linknode for manifests.
668 """Callback for looking up the linknode for manifests.
669
669
670 Returns the linkrev node for the specified manifest.
670 Returns the linkrev node for the specified manifest.
671
671
672 SIDE EFFECT:
672 SIDE EFFECT:
673
673
674 1) fclnodes gets populated with the list of relevant
674 1) fclnodes gets populated with the list of relevant
675 file nodes if we're not using fastpathlinkrev
675 file nodes if we're not using fastpathlinkrev
676 2) When treemanifests are in use, collects treemanifest nodes
676 2) When treemanifests are in use, collects treemanifest nodes
677 to send
677 to send
678
678
679 Note that this means manifests must be completely sent to
679 Note that this means manifests must be completely sent to
680 the client before you can trust the list of files and
680 the client before you can trust the list of files and
681 treemanifests to send.
681 treemanifests to send.
682 """
682 """
683 clnode = tmfnodes[dir][x]
683 clnode = tmfnodes[dir][x]
684 mdata = mfl.get(dir, x).readfast(shallow=True)
684 mdata = mfl.get(dir, x).readfast(shallow=True)
685 for p, n, fl in mdata.iterentries():
685 for p, n, fl in mdata.iterentries():
686 if fl == 't': # subdirectory manifest
686 if fl == 't': # subdirectory manifest
687 subdir = dir + p + '/'
687 subdir = dir + p + '/'
688 tmfclnodes = tmfnodes.setdefault(subdir, {})
688 tmfclnodes = tmfnodes.setdefault(subdir, {})
689 tmfclnode = tmfclnodes.setdefault(n, clnode)
689 tmfclnode = tmfclnodes.setdefault(n, clnode)
690 if clrevorder[clnode] < clrevorder[tmfclnode]:
690 if clrevorder[clnode] < clrevorder[tmfclnode]:
691 tmfclnodes[n] = clnode
691 tmfclnodes[n] = clnode
692 else:
692 else:
693 f = dir + p
693 f = dir + p
694 fclnodes = fnodes.setdefault(f, {})
694 fclnodes = fnodes.setdefault(f, {})
695 fclnode = fclnodes.setdefault(n, clnode)
695 fclnode = fclnodes.setdefault(n, clnode)
696 if clrevorder[clnode] < clrevorder[fclnode]:
696 if clrevorder[clnode] < clrevorder[fclnode]:
697 fclnodes[n] = clnode
697 fclnodes[n] = clnode
698 return clnode
698 return clnode
699 return lookupmflinknode
699 return lookupmflinknode
700
700
701 size = 0
701 size = 0
702 while tmfnodes:
702 while tmfnodes:
703 dir = min(tmfnodes)
703 dir = min(tmfnodes)
704 nodes = tmfnodes[dir]
704 nodes = tmfnodes[dir]
705 prunednodes = self.prune(dirlog(dir), nodes, commonrevs)
705 prunednodes = self.prune(dirlog(dir), nodes, commonrevs)
706 if not dir or prunednodes:
706 if not dir or prunednodes:
707 for x in self._packmanifests(dir, prunednodes,
707 for x in self._packmanifests(dir, prunednodes,
708 makelookupmflinknode(dir)):
708 makelookupmflinknode(dir)):
709 size += len(x)
709 size += len(x)
710 yield x
710 yield x
711 del tmfnodes[dir]
711 del tmfnodes[dir]
712 self._verbosenote(_('%8.i (manifests)\n') % size)
712 self._verbosenote(_('%8.i (manifests)\n') % size)
713 yield self._manifestsdone()
713 yield self._manifestsdone()
714
714
715 # The 'source' parameter is useful for extensions
715 # The 'source' parameter is useful for extensions
716 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
716 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
717 repo = self._repo
717 repo = self._repo
718 progress = self._progress
718 progress = self._progress
719 msgbundling = _('bundling')
719 msgbundling = _('bundling')
720
720
721 total = len(changedfiles)
721 total = len(changedfiles)
722 # for progress output
722 # for progress output
723 msgfiles = _('files')
723 msgfiles = _('files')
724 for i, fname in enumerate(sorted(changedfiles)):
724 for i, fname in enumerate(sorted(changedfiles)):
725 filerevlog = repo.file(fname)
725 filerevlog = repo.file(fname)
726 if not filerevlog:
726 if not filerevlog:
727 raise error.Abort(_("empty or missing revlog for %s") % fname)
727 raise error.Abort(_("empty or missing revlog for %s") % fname)
728
728
729 linkrevnodes = linknodes(filerevlog, fname)
729 linkrevnodes = linknodes(filerevlog, fname)
730 # Lookup for filenodes, we collected the linkrev nodes above in the
730 # Lookup for filenodes, we collected the linkrev nodes above in the
731 # fastpath case and with lookupmf in the slowpath case.
731 # fastpath case and with lookupmf in the slowpath case.
732 def lookupfilelog(x):
732 def lookupfilelog(x):
733 return linkrevnodes[x]
733 return linkrevnodes[x]
734
734
735 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
735 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
736 if filenodes:
736 if filenodes:
737 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
737 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
738 total=total)
738 total=total)
739 h = self.fileheader(fname)
739 h = self.fileheader(fname)
740 size = len(h)
740 size = len(h)
741 yield h
741 yield h
742 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
742 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
743 size += len(chunk)
743 size += len(chunk)
744 yield chunk
744 yield chunk
745 self._verbosenote(_('%8.i %s\n') % (size, fname))
745 self._verbosenote(_('%8.i %s\n') % (size, fname))
746 progress(msgbundling, None)
746 progress(msgbundling, None)
747
747
748 def deltaparent(self, revlog, rev, p1, p2, prev):
748 def deltaparent(self, revlog, rev, p1, p2, prev):
749 return prev
749 return prev
750
750
751 def revchunk(self, revlog, rev, prev, linknode):
751 def revchunk(self, revlog, rev, prev, linknode):
752 node = revlog.node(rev)
752 node = revlog.node(rev)
753 p1, p2 = revlog.parentrevs(rev)
753 p1, p2 = revlog.parentrevs(rev)
754 base = self.deltaparent(revlog, rev, p1, p2, prev)
754 base = self.deltaparent(revlog, rev, p1, p2, prev)
755
755
756 prefix = ''
756 prefix = ''
757 if revlog.iscensored(base) or revlog.iscensored(rev):
757 if revlog.iscensored(base) or revlog.iscensored(rev):
758 try:
758 try:
759 delta = revlog.revision(node, raw=True)
759 delta = revlog.revision(node, raw=True)
760 except error.CensoredNodeError as e:
760 except error.CensoredNodeError as e:
761 delta = e.tombstone
761 delta = e.tombstone
762 if base == nullrev:
762 if base == nullrev:
763 prefix = mdiff.trivialdiffheader(len(delta))
763 prefix = mdiff.trivialdiffheader(len(delta))
764 else:
764 else:
765 baselen = revlog.rawsize(base)
765 baselen = revlog.rawsize(base)
766 prefix = mdiff.replacediffheader(baselen, len(delta))
766 prefix = mdiff.replacediffheader(baselen, len(delta))
767 elif base == nullrev:
767 elif base == nullrev:
768 delta = revlog.revision(node, raw=True)
768 delta = revlog.revision(node, raw=True)
769 prefix = mdiff.trivialdiffheader(len(delta))
769 prefix = mdiff.trivialdiffheader(len(delta))
770 else:
770 else:
771 delta = revlog.revdiff(base, rev)
771 delta = revlog.revdiff(base, rev)
772 p1n, p2n = revlog.parents(node)
772 p1n, p2n = revlog.parents(node)
773 basenode = revlog.node(base)
773 basenode = revlog.node(base)
774 flags = revlog.flags(rev)
774 flags = revlog.flags(rev)
775 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
775 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
776 meta += prefix
776 meta += prefix
777 l = len(meta) + len(delta)
777 l = len(meta) + len(delta)
778 yield chunkheader(l)
778 yield chunkheader(l)
779 yield meta
779 yield meta
780 yield delta
780 yield delta
781 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
781 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
782 # do nothing with basenode, it is implicitly the previous one in HG10
782 # do nothing with basenode, it is implicitly the previous one in HG10
783 # do nothing with flags, it is implicitly 0 for cg1 and cg2
783 # do nothing with flags, it is implicitly 0 for cg1 and cg2
784 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
784 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
785
785
786 class cg2packer(cg1packer):
786 class cg2packer(cg1packer):
787 version = '02'
787 version = '02'
788 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
788 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
789
789
790 def __init__(self, repo, bundlecaps=None):
790 def __init__(self, repo, bundlecaps=None):
791 super(cg2packer, self).__init__(repo, bundlecaps)
791 super(cg2packer, self).__init__(repo, bundlecaps)
792 if self._reorder is None:
792 if self._reorder is None:
793 # Since generaldelta is directly supported by cg2, reordering
793 # Since generaldelta is directly supported by cg2, reordering
794 # generally doesn't help, so we disable it by default (treating
794 # generally doesn't help, so we disable it by default (treating
795 # bundle.reorder=auto just like bundle.reorder=False).
795 # bundle.reorder=auto just like bundle.reorder=False).
796 self._reorder = False
796 self._reorder = False
797
797
798 def deltaparent(self, revlog, rev, p1, p2, prev):
798 def deltaparent(self, revlog, rev, p1, p2, prev):
799 dp = revlog.deltaparent(rev)
799 dp = revlog.deltaparent(rev)
800 if dp == nullrev and revlog.storedeltachains:
800 if dp == nullrev and revlog.storedeltachains:
801 # Avoid sending full revisions when delta parent is null. Pick prev
801 # Avoid sending full revisions when delta parent is null. Pick prev
802 # in that case. It's tempting to pick p1 in this case, as p1 will
802 # in that case. It's tempting to pick p1 in this case, as p1 will
803 # be smaller in the common case. However, computing a delta against
803 # be smaller in the common case. However, computing a delta against
804 # p1 may require resolving the raw text of p1, which could be
804 # p1 may require resolving the raw text of p1, which could be
805 # expensive. The revlog caches should have prev cached, meaning
805 # expensive. The revlog caches should have prev cached, meaning
806 # less CPU for changegroup generation. There is likely room to add
806 # less CPU for changegroup generation. There is likely room to add
807 # a flag and/or config option to control this behavior.
807 # a flag and/or config option to control this behavior.
808 return prev
808 return prev
809 elif dp == nullrev:
809 elif dp == nullrev:
810 # revlog is configured to use full snapshot for a reason,
810 # revlog is configured to use full snapshot for a reason,
811 # stick to full snapshot.
811 # stick to full snapshot.
812 return nullrev
812 return nullrev
813 elif dp not in (p1, p2, prev):
813 elif dp not in (p1, p2, prev):
814 # Pick prev when we can't be sure remote has the base revision.
814 # Pick prev when we can't be sure remote has the base revision.
815 return prev
815 return prev
816 else:
816 else:
817 return dp
817 return dp
818
818
819 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
819 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
820 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
820 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
821 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
821 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
822
822
823 class cg3packer(cg2packer):
823 class cg3packer(cg2packer):
824 version = '03'
824 version = '03'
825 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
825 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
826
826
827 def _packmanifests(self, dir, mfnodes, lookuplinknode):
827 def _packmanifests(self, dir, mfnodes, lookuplinknode):
828 if dir:
828 if dir:
829 yield self.fileheader(dir)
829 yield self.fileheader(dir)
830
830
831 dirlog = self._repo.manifestlog._revlog.dirlog(dir)
831 dirlog = self._repo.manifestlog._revlog.dirlog(dir)
832 for chunk in self.group(mfnodes, dirlog, lookuplinknode,
832 for chunk in self.group(mfnodes, dirlog, lookuplinknode,
833 units=_('manifests')):
833 units=_('manifests')):
834 yield chunk
834 yield chunk
835
835
836 def _manifestsdone(self):
836 def _manifestsdone(self):
837 return self.close()
837 return self.close()
838
838
839 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
839 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
840 return struct.pack(
840 return struct.pack(
841 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
841 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
842
842
843 _packermap = {'01': (cg1packer, cg1unpacker),
843 _packermap = {'01': (cg1packer, cg1unpacker),
844 # cg2 adds support for exchanging generaldelta
844 # cg2 adds support for exchanging generaldelta
845 '02': (cg2packer, cg2unpacker),
845 '02': (cg2packer, cg2unpacker),
846 # cg3 adds support for exchanging revlog flags and treemanifests
846 # cg3 adds support for exchanging revlog flags and treemanifests
847 '03': (cg3packer, cg3unpacker),
847 '03': (cg3packer, cg3unpacker),
848 }
848 }
849
849
850 def allsupportedversions(repo):
850 def allsupportedversions(repo):
851 versions = set(_packermap.keys())
851 versions = set(_packermap.keys())
852 if not (repo.ui.configbool('experimental', 'changegroup3') or
852 if not (repo.ui.configbool('experimental', 'changegroup3') or
853 repo.ui.configbool('experimental', 'treemanifest') or
853 repo.ui.configbool('experimental', 'treemanifest') or
854 'treemanifest' in repo.requirements):
854 'treemanifest' in repo.requirements):
855 versions.discard('03')
855 versions.discard('03')
856 return versions
856 return versions
857
857
858 # Changegroup versions that can be applied to the repo
858 # Changegroup versions that can be applied to the repo
859 def supportedincomingversions(repo):
859 def supportedincomingversions(repo):
860 return allsupportedversions(repo)
860 return allsupportedversions(repo)
861
861
862 # Changegroup versions that can be created from the repo
862 # Changegroup versions that can be created from the repo
863 def supportedoutgoingversions(repo):
863 def supportedoutgoingversions(repo):
864 versions = allsupportedversions(repo)
864 versions = allsupportedversions(repo)
865 if 'treemanifest' in repo.requirements:
865 if 'treemanifest' in repo.requirements:
866 # Versions 01 and 02 support only flat manifests and it's just too
866 # Versions 01 and 02 support only flat manifests and it's just too
867 # expensive to convert between the flat manifest and tree manifest on
867 # expensive to convert between the flat manifest and tree manifest on
868 # the fly. Since tree manifests are hashed differently, all of history
868 # the fly. Since tree manifests are hashed differently, all of history
869 # would have to be converted. Instead, we simply don't even pretend to
869 # would have to be converted. Instead, we simply don't even pretend to
870 # support versions 01 and 02.
870 # support versions 01 and 02.
871 versions.discard('01')
871 versions.discard('01')
872 versions.discard('02')
872 versions.discard('02')
873 return versions
873 return versions
874
874
875 def safeversion(repo):
875 def safeversion(repo):
876 # Finds the smallest version that it's safe to assume clients of the repo
876 # Finds the smallest version that it's safe to assume clients of the repo
877 # will support. For example, all hg versions that support generaldelta also
877 # will support. For example, all hg versions that support generaldelta also
878 # support changegroup 02.
878 # support changegroup 02.
879 versions = supportedoutgoingversions(repo)
879 versions = supportedoutgoingversions(repo)
880 if 'generaldelta' in repo.requirements:
880 if 'generaldelta' in repo.requirements:
881 versions.discard('01')
881 versions.discard('01')
882 assert versions
882 assert versions
883 return min(versions)
883 return min(versions)
884
884
885 def getbundler(version, repo, bundlecaps=None):
885 def getbundler(version, repo, bundlecaps=None):
886 assert version in supportedoutgoingversions(repo)
886 assert version in supportedoutgoingversions(repo)
887 return _packermap[version][0](repo, bundlecaps)
887 return _packermap[version][0](repo, bundlecaps)
888
888
889 def getunbundler(version, fh, alg, extras=None):
889 def getunbundler(version, fh, alg, extras=None):
890 return _packermap[version][1](fh, alg, extras=extras)
890 return _packermap[version][1](fh, alg, extras=extras)
891
891
892 def _changegroupinfo(repo, nodes, source):
892 def _changegroupinfo(repo, nodes, source):
893 if repo.ui.verbose or source == 'bundle':
893 if repo.ui.verbose or source == 'bundle':
894 repo.ui.status(_("%d changesets found\n") % len(nodes))
894 repo.ui.status(_("%d changesets found\n") % len(nodes))
895 if repo.ui.debugflag:
895 if repo.ui.debugflag:
896 repo.ui.debug("list of changesets:\n")
896 repo.ui.debug("list of changesets:\n")
897 for node in nodes:
897 for node in nodes:
898 repo.ui.debug("%s\n" % hex(node))
898 repo.ui.debug("%s\n" % hex(node))
899
899
900 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
900 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
901 repo = repo.unfiltered()
901 repo = repo.unfiltered()
902 commonrevs = outgoing.common
902 commonrevs = outgoing.common
903 csets = outgoing.missing
903 csets = outgoing.missing
904 heads = outgoing.missingheads
904 heads = outgoing.missingheads
905 # We go through the fast path if we get told to, or if all (unfiltered
905 # We go through the fast path if we get told to, or if all (unfiltered
906 # heads have been requested (since we then know there all linkrevs will
906 # heads have been requested (since we then know there all linkrevs will
907 # be pulled by the client).
907 # be pulled by the client).
908 heads.sort()
908 heads.sort()
909 fastpathlinkrev = fastpath or (
909 fastpathlinkrev = fastpath or (
910 repo.filtername is None and heads == sorted(repo.heads()))
910 repo.filtername is None and heads == sorted(repo.heads()))
911
911
912 repo.hook('preoutgoing', throw=True, source=source)
912 repo.hook('preoutgoing', throw=True, source=source)
913 _changegroupinfo(repo, csets, source)
913 _changegroupinfo(repo, csets, source)
914 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
914 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
915
915
916 def getsubset(repo, outgoing, bundler, source, fastpath=False):
916 def getsubset(repo, outgoing, bundler, source, fastpath=False):
917 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
917 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
918 return getunbundler(bundler.version, util.chunkbuffer(gengroup), None,
918 return getunbundler(bundler.version, util.chunkbuffer(gengroup), None,
919 {'clcount': len(outgoing.missing)})
919 {'clcount': len(outgoing.missing)})
920
920
921 def changegroupsubset(repo, roots, heads, source, version='01'):
921 def changegroupsubset(repo, roots, heads, source, version='01'):
922 """Compute a changegroup consisting of all the nodes that are
922 """Compute a changegroup consisting of all the nodes that are
923 descendants of any of the roots and ancestors of any of the heads.
923 descendants of any of the roots and ancestors of any of the heads.
924 Return a chunkbuffer object whose read() method will return
924 Return a chunkbuffer object whose read() method will return
925 successive changegroup chunks.
925 successive changegroup chunks.
926
926
927 It is fairly complex as determining which filenodes and which
927 It is fairly complex as determining which filenodes and which
928 manifest nodes need to be included for the changeset to be complete
928 manifest nodes need to be included for the changeset to be complete
929 is non-trivial.
929 is non-trivial.
930
930
931 Another wrinkle is doing the reverse, figuring out which changeset in
931 Another wrinkle is doing the reverse, figuring out which changeset in
932 the changegroup a particular filenode or manifestnode belongs to.
932 the changegroup a particular filenode or manifestnode belongs to.
933 """
933 """
934 outgoing = discovery.outgoing(repo, missingroots=roots, missingheads=heads)
934 outgoing = discovery.outgoing(repo, missingroots=roots, missingheads=heads)
935 bundler = getbundler(version, repo)
935 bundler = getbundler(version, repo)
936 return getsubset(repo, outgoing, bundler, source)
936 return getsubset(repo, outgoing, bundler, source)
937
937
938 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
938 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
939 version='01'):
939 version='01'):
940 """Like getbundle, but taking a discovery.outgoing as an argument.
940 """Like getbundle, but taking a discovery.outgoing as an argument.
941
941
942 This is only implemented for local repos and reuses potentially
942 This is only implemented for local repos and reuses potentially
943 precomputed sets in outgoing. Returns a raw changegroup generator."""
943 precomputed sets in outgoing. Returns a raw changegroup generator."""
944 if not outgoing.missing:
944 if not outgoing.missing:
945 return None
945 return None
946 bundler = getbundler(version, repo, bundlecaps)
946 bundler = getbundler(version, repo, bundlecaps)
947 return getsubsetraw(repo, outgoing, bundler, source)
947 return getsubsetraw(repo, outgoing, bundler, source)
948
948
949 def getchangegroup(repo, source, outgoing, bundlecaps=None,
949 def getchangegroup(repo, source, outgoing, bundlecaps=None,
950 version='01'):
950 version='01'):
951 """Like getbundle, but taking a discovery.outgoing as an argument.
951 """Like getbundle, but taking a discovery.outgoing as an argument.
952
952
953 This is only implemented for local repos and reuses potentially
953 This is only implemented for local repos and reuses potentially
954 precomputed sets in outgoing."""
954 precomputed sets in outgoing."""
955 if not outgoing.missing:
955 if not outgoing.missing:
956 return None
956 return None
957 bundler = getbundler(version, repo, bundlecaps)
957 bundler = getbundler(version, repo, bundlecaps)
958 return getsubset(repo, outgoing, bundler, source)
958 return getsubset(repo, outgoing, bundler, source)
959
959
960 def getlocalchangegroup(repo, *args, **kwargs):
960 def getlocalchangegroup(repo, *args, **kwargs):
961 repo.ui.deprecwarn('getlocalchangegroup is deprecated, use getchangegroup',
961 repo.ui.deprecwarn('getlocalchangegroup is deprecated, use getchangegroup',
962 '4.3')
962 '4.3')
963 return getchangegroup(repo, *args, **kwargs)
963 return getchangegroup(repo, *args, **kwargs)
964
964
965 def changegroup(repo, basenodes, source):
965 def changegroup(repo, basenodes, source):
966 # to avoid a race we use changegroupsubset() (issue1320)
966 # to avoid a race we use changegroupsubset() (issue1320)
967 return changegroupsubset(repo, basenodes, repo.heads(), source)
967 return changegroupsubset(repo, basenodes, repo.heads(), source)
968
968
969 def _addchangegroupfiles(repo, source, revmap, trp, expectedfiles, needfiles):
969 def _addchangegroupfiles(repo, source, revmap, trp, expectedfiles, needfiles):
970 revisions = 0
970 revisions = 0
971 files = 0
971 files = 0
972 for chunkdata in iter(source.filelogheader, {}):
972 for chunkdata in iter(source.filelogheader, {}):
973 files += 1
973 files += 1
974 f = chunkdata["filename"]
974 f = chunkdata["filename"]
975 repo.ui.debug("adding %s revisions\n" % f)
975 repo.ui.debug("adding %s revisions\n" % f)
976 repo.ui.progress(_('files'), files, unit=_('files'),
976 repo.ui.progress(_('files'), files, unit=_('files'),
977 total=expectedfiles)
977 total=expectedfiles)
978 fl = repo.file(f)
978 fl = repo.file(f)
979 o = len(fl)
979 o = len(fl)
980 try:
980 try:
981 if not fl.addgroup(source, revmap, trp):
981 if not fl.addgroup(source, revmap, trp):
982 raise error.Abort(_("received file revlog group is empty"))
982 raise error.Abort(_("received file revlog group is empty"))
983 except error.CensoredBaseError as e:
983 except error.CensoredBaseError as e:
984 raise error.Abort(_("received delta base is censored: %s") % e)
984 raise error.Abort(_("received delta base is censored: %s") % e)
985 revisions += len(fl) - o
985 revisions += len(fl) - o
986 if f in needfiles:
986 if f in needfiles:
987 needs = needfiles[f]
987 needs = needfiles[f]
988 for new in xrange(o, len(fl)):
988 for new in xrange(o, len(fl)):
989 n = fl.node(new)
989 n = fl.node(new)
990 if n in needs:
990 if n in needs:
991 needs.remove(n)
991 needs.remove(n)
992 else:
992 else:
993 raise error.Abort(
993 raise error.Abort(
994 _("received spurious file revlog entry"))
994 _("received spurious file revlog entry"))
995 if not needs:
995 if not needs:
996 del needfiles[f]
996 del needfiles[f]
997 repo.ui.progress(_('files'), None)
997 repo.ui.progress(_('files'), None)
998
998
999 for f, needs in needfiles.iteritems():
999 for f, needs in needfiles.iteritems():
1000 fl = repo.file(f)
1000 fl = repo.file(f)
1001 for n in needs:
1001 for n in needs:
1002 try:
1002 try:
1003 fl.rev(n)
1003 fl.rev(n)
1004 except error.LookupError:
1004 except error.LookupError:
1005 raise error.Abort(
1005 raise error.Abort(
1006 _('missing file data for %s:%s - run hg verify') %
1006 _('missing file data for %s:%s - run hg verify') %
1007 (f, hex(n)))
1007 (f, hex(n)))
1008
1008
1009 return revisions, files
1009 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now