##// END OF EJS Templates
bundle2: only seek to beginning of part in bundlerepo...
Gregory Szorc -
r35112:2b72bc88 default
parent child Browse files
Show More
@@ -1,2001 +1,1999
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import errno
150 import errno
151 import os
151 import os
152 import re
152 import re
153 import string
153 import string
154 import struct
154 import struct
155 import sys
155 import sys
156
156
157 from .i18n import _
157 from .i18n import _
158 from . import (
158 from . import (
159 changegroup,
159 changegroup,
160 error,
160 error,
161 node as nodemod,
161 node as nodemod,
162 obsolete,
162 obsolete,
163 phases,
163 phases,
164 pushkey,
164 pushkey,
165 pycompat,
165 pycompat,
166 tags,
166 tags,
167 url,
167 url,
168 util,
168 util,
169 )
169 )
170
170
171 urlerr = util.urlerr
171 urlerr = util.urlerr
172 urlreq = util.urlreq
172 urlreq = util.urlreq
173
173
174 _pack = struct.pack
174 _pack = struct.pack
175 _unpack = struct.unpack
175 _unpack = struct.unpack
176
176
177 _fstreamparamsize = '>i'
177 _fstreamparamsize = '>i'
178 _fpartheadersize = '>i'
178 _fpartheadersize = '>i'
179 _fparttypesize = '>B'
179 _fparttypesize = '>B'
180 _fpartid = '>I'
180 _fpartid = '>I'
181 _fpayloadsize = '>i'
181 _fpayloadsize = '>i'
182 _fpartparamcount = '>BB'
182 _fpartparamcount = '>BB'
183
183
184 preferedchunksize = 4096
184 preferedchunksize = 4096
185
185
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187
187
188 def outdebug(ui, message):
188 def outdebug(ui, message):
189 """debug regarding output stream (bundling)"""
189 """debug regarding output stream (bundling)"""
190 if ui.configbool('devel', 'bundle2.debug'):
190 if ui.configbool('devel', 'bundle2.debug'):
191 ui.debug('bundle2-output: %s\n' % message)
191 ui.debug('bundle2-output: %s\n' % message)
192
192
193 def indebug(ui, message):
193 def indebug(ui, message):
194 """debug on input stream (unbundling)"""
194 """debug on input stream (unbundling)"""
195 if ui.configbool('devel', 'bundle2.debug'):
195 if ui.configbool('devel', 'bundle2.debug'):
196 ui.debug('bundle2-input: %s\n' % message)
196 ui.debug('bundle2-input: %s\n' % message)
197
197
198 def validateparttype(parttype):
198 def validateparttype(parttype):
199 """raise ValueError if a parttype contains invalid character"""
199 """raise ValueError if a parttype contains invalid character"""
200 if _parttypeforbidden.search(parttype):
200 if _parttypeforbidden.search(parttype):
201 raise ValueError(parttype)
201 raise ValueError(parttype)
202
202
203 def _makefpartparamsizes(nbparams):
203 def _makefpartparamsizes(nbparams):
204 """return a struct format to read part parameter sizes
204 """return a struct format to read part parameter sizes
205
205
206 The number parameters is variable so we need to build that format
206 The number parameters is variable so we need to build that format
207 dynamically.
207 dynamically.
208 """
208 """
209 return '>'+('BB'*nbparams)
209 return '>'+('BB'*nbparams)
210
210
211 parthandlermapping = {}
211 parthandlermapping = {}
212
212
213 def parthandler(parttype, params=()):
213 def parthandler(parttype, params=()):
214 """decorator that register a function as a bundle2 part handler
214 """decorator that register a function as a bundle2 part handler
215
215
216 eg::
216 eg::
217
217
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 def myparttypehandler(...):
219 def myparttypehandler(...):
220 '''process a part of type "my part".'''
220 '''process a part of type "my part".'''
221 ...
221 ...
222 """
222 """
223 validateparttype(parttype)
223 validateparttype(parttype)
224 def _decorator(func):
224 def _decorator(func):
225 lparttype = parttype.lower() # enforce lower case matching.
225 lparttype = parttype.lower() # enforce lower case matching.
226 assert lparttype not in parthandlermapping
226 assert lparttype not in parthandlermapping
227 parthandlermapping[lparttype] = func
227 parthandlermapping[lparttype] = func
228 func.params = frozenset(params)
228 func.params = frozenset(params)
229 return func
229 return func
230 return _decorator
230 return _decorator
231
231
232 class unbundlerecords(object):
232 class unbundlerecords(object):
233 """keep record of what happens during and unbundle
233 """keep record of what happens during and unbundle
234
234
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 category of record and obj is an arbitrary object.
236 category of record and obj is an arbitrary object.
237
237
238 `records['cat']` will return all entries of this category 'cat'.
238 `records['cat']` will return all entries of this category 'cat'.
239
239
240 Iterating on the object itself will yield `('category', obj)` tuples
240 Iterating on the object itself will yield `('category', obj)` tuples
241 for all entries.
241 for all entries.
242
242
243 All iterations happens in chronological order.
243 All iterations happens in chronological order.
244 """
244 """
245
245
246 def __init__(self):
246 def __init__(self):
247 self._categories = {}
247 self._categories = {}
248 self._sequences = []
248 self._sequences = []
249 self._replies = {}
249 self._replies = {}
250
250
251 def add(self, category, entry, inreplyto=None):
251 def add(self, category, entry, inreplyto=None):
252 """add a new record of a given category.
252 """add a new record of a given category.
253
253
254 The entry can then be retrieved in the list returned by
254 The entry can then be retrieved in the list returned by
255 self['category']."""
255 self['category']."""
256 self._categories.setdefault(category, []).append(entry)
256 self._categories.setdefault(category, []).append(entry)
257 self._sequences.append((category, entry))
257 self._sequences.append((category, entry))
258 if inreplyto is not None:
258 if inreplyto is not None:
259 self.getreplies(inreplyto).add(category, entry)
259 self.getreplies(inreplyto).add(category, entry)
260
260
261 def getreplies(self, partid):
261 def getreplies(self, partid):
262 """get the records that are replies to a specific part"""
262 """get the records that are replies to a specific part"""
263 return self._replies.setdefault(partid, unbundlerecords())
263 return self._replies.setdefault(partid, unbundlerecords())
264
264
265 def __getitem__(self, cat):
265 def __getitem__(self, cat):
266 return tuple(self._categories.get(cat, ()))
266 return tuple(self._categories.get(cat, ()))
267
267
268 def __iter__(self):
268 def __iter__(self):
269 return iter(self._sequences)
269 return iter(self._sequences)
270
270
271 def __len__(self):
271 def __len__(self):
272 return len(self._sequences)
272 return len(self._sequences)
273
273
274 def __nonzero__(self):
274 def __nonzero__(self):
275 return bool(self._sequences)
275 return bool(self._sequences)
276
276
277 __bool__ = __nonzero__
277 __bool__ = __nonzero__
278
278
279 class bundleoperation(object):
279 class bundleoperation(object):
280 """an object that represents a single bundling process
280 """an object that represents a single bundling process
281
281
282 Its purpose is to carry unbundle-related objects and states.
282 Its purpose is to carry unbundle-related objects and states.
283
283
284 A new object should be created at the beginning of each bundle processing.
284 A new object should be created at the beginning of each bundle processing.
285 The object is to be returned by the processing function.
285 The object is to be returned by the processing function.
286
286
287 The object has very little content now it will ultimately contain:
287 The object has very little content now it will ultimately contain:
288 * an access to the repo the bundle is applied to,
288 * an access to the repo the bundle is applied to,
289 * a ui object,
289 * a ui object,
290 * a way to retrieve a transaction to add changes to the repo,
290 * a way to retrieve a transaction to add changes to the repo,
291 * a way to record the result of processing each part,
291 * a way to record the result of processing each part,
292 * a way to construct a bundle response when applicable.
292 * a way to construct a bundle response when applicable.
293 """
293 """
294
294
295 def __init__(self, repo, transactiongetter, captureoutput=True):
295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 self.repo = repo
296 self.repo = repo
297 self.ui = repo.ui
297 self.ui = repo.ui
298 self.records = unbundlerecords()
298 self.records = unbundlerecords()
299 self.reply = None
299 self.reply = None
300 self.captureoutput = captureoutput
300 self.captureoutput = captureoutput
301 self.hookargs = {}
301 self.hookargs = {}
302 self._gettransaction = transactiongetter
302 self._gettransaction = transactiongetter
303
303
304 def gettransaction(self):
304 def gettransaction(self):
305 transaction = self._gettransaction()
305 transaction = self._gettransaction()
306
306
307 if self.hookargs:
307 if self.hookargs:
308 # the ones added to the transaction supercede those added
308 # the ones added to the transaction supercede those added
309 # to the operation.
309 # to the operation.
310 self.hookargs.update(transaction.hookargs)
310 self.hookargs.update(transaction.hookargs)
311 transaction.hookargs = self.hookargs
311 transaction.hookargs = self.hookargs
312
312
313 # mark the hookargs as flushed. further attempts to add to
313 # mark the hookargs as flushed. further attempts to add to
314 # hookargs will result in an abort.
314 # hookargs will result in an abort.
315 self.hookargs = None
315 self.hookargs = None
316
316
317 return transaction
317 return transaction
318
318
319 def addhookargs(self, hookargs):
319 def addhookargs(self, hookargs):
320 if self.hookargs is None:
320 if self.hookargs is None:
321 raise error.ProgrammingError('attempted to add hookargs to '
321 raise error.ProgrammingError('attempted to add hookargs to '
322 'operation after transaction started')
322 'operation after transaction started')
323 self.hookargs.update(hookargs)
323 self.hookargs.update(hookargs)
324
324
325 class TransactionUnavailable(RuntimeError):
325 class TransactionUnavailable(RuntimeError):
326 pass
326 pass
327
327
328 def _notransaction():
328 def _notransaction():
329 """default method to get a transaction while processing a bundle
329 """default method to get a transaction while processing a bundle
330
330
331 Raise an exception to highlight the fact that no transaction was expected
331 Raise an exception to highlight the fact that no transaction was expected
332 to be created"""
332 to be created"""
333 raise TransactionUnavailable()
333 raise TransactionUnavailable()
334
334
335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
336 # transform me into unbundler.apply() as soon as the freeze is lifted
336 # transform me into unbundler.apply() as soon as the freeze is lifted
337 if isinstance(unbundler, unbundle20):
337 if isinstance(unbundler, unbundle20):
338 tr.hookargs['bundle2'] = '1'
338 tr.hookargs['bundle2'] = '1'
339 if source is not None and 'source' not in tr.hookargs:
339 if source is not None and 'source' not in tr.hookargs:
340 tr.hookargs['source'] = source
340 tr.hookargs['source'] = source
341 if url is not None and 'url' not in tr.hookargs:
341 if url is not None and 'url' not in tr.hookargs:
342 tr.hookargs['url'] = url
342 tr.hookargs['url'] = url
343 return processbundle(repo, unbundler, lambda: tr)
343 return processbundle(repo, unbundler, lambda: tr)
344 else:
344 else:
345 # the transactiongetter won't be used, but we might as well set it
345 # the transactiongetter won't be used, but we might as well set it
346 op = bundleoperation(repo, lambda: tr)
346 op = bundleoperation(repo, lambda: tr)
347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
348 return op
348 return op
349
349
350 class partiterator(object):
350 class partiterator(object):
351 def __init__(self, repo, op, unbundler):
351 def __init__(self, repo, op, unbundler):
352 self.repo = repo
352 self.repo = repo
353 self.op = op
353 self.op = op
354 self.unbundler = unbundler
354 self.unbundler = unbundler
355 self.iterator = None
355 self.iterator = None
356 self.count = 0
356 self.count = 0
357 self.current = None
357 self.current = None
358
358
359 def __enter__(self):
359 def __enter__(self):
360 def func():
360 def func():
361 itr = enumerate(self.unbundler.iterparts())
361 itr = enumerate(self.unbundler.iterparts())
362 for count, p in itr:
362 for count, p in itr:
363 self.count = count
363 self.count = count
364 self.current = p
364 self.current = p
365 yield p
365 yield p
366 p.consume()
366 p.consume()
367 self.current = None
367 self.current = None
368 self.iterator = func()
368 self.iterator = func()
369 return self.iterator
369 return self.iterator
370
370
371 def __exit__(self, type, exc, tb):
371 def __exit__(self, type, exc, tb):
372 if not self.iterator:
372 if not self.iterator:
373 return
373 return
374
374
375 # Only gracefully abort in a normal exception situation. User aborts
375 # Only gracefully abort in a normal exception situation. User aborts
376 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
376 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
377 # and should not gracefully cleanup.
377 # and should not gracefully cleanup.
378 if isinstance(exc, Exception):
378 if isinstance(exc, Exception):
379 # Any exceptions seeking to the end of the bundle at this point are
379 # Any exceptions seeking to the end of the bundle at this point are
380 # almost certainly related to the underlying stream being bad.
380 # almost certainly related to the underlying stream being bad.
381 # And, chances are that the exception we're handling is related to
381 # And, chances are that the exception we're handling is related to
382 # getting in that bad state. So, we swallow the seeking error and
382 # getting in that bad state. So, we swallow the seeking error and
383 # re-raise the original error.
383 # re-raise the original error.
384 seekerror = False
384 seekerror = False
385 try:
385 try:
386 if self.current:
386 if self.current:
387 # consume the part content to not corrupt the stream.
387 # consume the part content to not corrupt the stream.
388 self.current.consume()
388 self.current.consume()
389
389
390 for part in self.iterator:
390 for part in self.iterator:
391 # consume the bundle content
391 # consume the bundle content
392 part.consume()
392 part.consume()
393 except Exception:
393 except Exception:
394 seekerror = True
394 seekerror = True
395
395
396 # Small hack to let caller code distinguish exceptions from bundle2
396 # Small hack to let caller code distinguish exceptions from bundle2
397 # processing from processing the old format. This is mostly needed
397 # processing from processing the old format. This is mostly needed
398 # to handle different return codes to unbundle according to the type
398 # to handle different return codes to unbundle according to the type
399 # of bundle. We should probably clean up or drop this return code
399 # of bundle. We should probably clean up or drop this return code
400 # craziness in a future version.
400 # craziness in a future version.
401 exc.duringunbundle2 = True
401 exc.duringunbundle2 = True
402 salvaged = []
402 salvaged = []
403 replycaps = None
403 replycaps = None
404 if self.op.reply is not None:
404 if self.op.reply is not None:
405 salvaged = self.op.reply.salvageoutput()
405 salvaged = self.op.reply.salvageoutput()
406 replycaps = self.op.reply.capabilities
406 replycaps = self.op.reply.capabilities
407 exc._replycaps = replycaps
407 exc._replycaps = replycaps
408 exc._bundle2salvagedoutput = salvaged
408 exc._bundle2salvagedoutput = salvaged
409
409
410 # Re-raising from a variable loses the original stack. So only use
410 # Re-raising from a variable loses the original stack. So only use
411 # that form if we need to.
411 # that form if we need to.
412 if seekerror:
412 if seekerror:
413 raise exc
413 raise exc
414
414
415 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
415 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
416 self.count)
416 self.count)
417
417
418 def processbundle(repo, unbundler, transactiongetter=None, op=None):
418 def processbundle(repo, unbundler, transactiongetter=None, op=None):
419 """This function process a bundle, apply effect to/from a repo
419 """This function process a bundle, apply effect to/from a repo
420
420
421 It iterates over each part then searches for and uses the proper handling
421 It iterates over each part then searches for and uses the proper handling
422 code to process the part. Parts are processed in order.
422 code to process the part. Parts are processed in order.
423
423
424 Unknown Mandatory part will abort the process.
424 Unknown Mandatory part will abort the process.
425
425
426 It is temporarily possible to provide a prebuilt bundleoperation to the
426 It is temporarily possible to provide a prebuilt bundleoperation to the
427 function. This is used to ensure output is properly propagated in case of
427 function. This is used to ensure output is properly propagated in case of
428 an error during the unbundling. This output capturing part will likely be
428 an error during the unbundling. This output capturing part will likely be
429 reworked and this ability will probably go away in the process.
429 reworked and this ability will probably go away in the process.
430 """
430 """
431 if op is None:
431 if op is None:
432 if transactiongetter is None:
432 if transactiongetter is None:
433 transactiongetter = _notransaction
433 transactiongetter = _notransaction
434 op = bundleoperation(repo, transactiongetter)
434 op = bundleoperation(repo, transactiongetter)
435 # todo:
435 # todo:
436 # - replace this is a init function soon.
436 # - replace this is a init function soon.
437 # - exception catching
437 # - exception catching
438 unbundler.params
438 unbundler.params
439 if repo.ui.debugflag:
439 if repo.ui.debugflag:
440 msg = ['bundle2-input-bundle:']
440 msg = ['bundle2-input-bundle:']
441 if unbundler.params:
441 if unbundler.params:
442 msg.append(' %i params' % len(unbundler.params))
442 msg.append(' %i params' % len(unbundler.params))
443 if op._gettransaction is None or op._gettransaction is _notransaction:
443 if op._gettransaction is None or op._gettransaction is _notransaction:
444 msg.append(' no-transaction')
444 msg.append(' no-transaction')
445 else:
445 else:
446 msg.append(' with-transaction')
446 msg.append(' with-transaction')
447 msg.append('\n')
447 msg.append('\n')
448 repo.ui.debug(''.join(msg))
448 repo.ui.debug(''.join(msg))
449
449
450 processparts(repo, op, unbundler)
450 processparts(repo, op, unbundler)
451
451
452 return op
452 return op
453
453
454 def processparts(repo, op, unbundler):
454 def processparts(repo, op, unbundler):
455 with partiterator(repo, op, unbundler) as parts:
455 with partiterator(repo, op, unbundler) as parts:
456 for part in parts:
456 for part in parts:
457 _processpart(op, part)
457 _processpart(op, part)
458
458
459 def _processchangegroup(op, cg, tr, source, url, **kwargs):
459 def _processchangegroup(op, cg, tr, source, url, **kwargs):
460 ret = cg.apply(op.repo, tr, source, url, **kwargs)
460 ret = cg.apply(op.repo, tr, source, url, **kwargs)
461 op.records.add('changegroup', {
461 op.records.add('changegroup', {
462 'return': ret,
462 'return': ret,
463 })
463 })
464 return ret
464 return ret
465
465
466 def _gethandler(op, part):
466 def _gethandler(op, part):
467 status = 'unknown' # used by debug output
467 status = 'unknown' # used by debug output
468 try:
468 try:
469 handler = parthandlermapping.get(part.type)
469 handler = parthandlermapping.get(part.type)
470 if handler is None:
470 if handler is None:
471 status = 'unsupported-type'
471 status = 'unsupported-type'
472 raise error.BundleUnknownFeatureError(parttype=part.type)
472 raise error.BundleUnknownFeatureError(parttype=part.type)
473 indebug(op.ui, 'found a handler for part %s' % part.type)
473 indebug(op.ui, 'found a handler for part %s' % part.type)
474 unknownparams = part.mandatorykeys - handler.params
474 unknownparams = part.mandatorykeys - handler.params
475 if unknownparams:
475 if unknownparams:
476 unknownparams = list(unknownparams)
476 unknownparams = list(unknownparams)
477 unknownparams.sort()
477 unknownparams.sort()
478 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
478 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
479 raise error.BundleUnknownFeatureError(parttype=part.type,
479 raise error.BundleUnknownFeatureError(parttype=part.type,
480 params=unknownparams)
480 params=unknownparams)
481 status = 'supported'
481 status = 'supported'
482 except error.BundleUnknownFeatureError as exc:
482 except error.BundleUnknownFeatureError as exc:
483 if part.mandatory: # mandatory parts
483 if part.mandatory: # mandatory parts
484 raise
484 raise
485 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
485 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
486 return # skip to part processing
486 return # skip to part processing
487 finally:
487 finally:
488 if op.ui.debugflag:
488 if op.ui.debugflag:
489 msg = ['bundle2-input-part: "%s"' % part.type]
489 msg = ['bundle2-input-part: "%s"' % part.type]
490 if not part.mandatory:
490 if not part.mandatory:
491 msg.append(' (advisory)')
491 msg.append(' (advisory)')
492 nbmp = len(part.mandatorykeys)
492 nbmp = len(part.mandatorykeys)
493 nbap = len(part.params) - nbmp
493 nbap = len(part.params) - nbmp
494 if nbmp or nbap:
494 if nbmp or nbap:
495 msg.append(' (params:')
495 msg.append(' (params:')
496 if nbmp:
496 if nbmp:
497 msg.append(' %i mandatory' % nbmp)
497 msg.append(' %i mandatory' % nbmp)
498 if nbap:
498 if nbap:
499 msg.append(' %i advisory' % nbmp)
499 msg.append(' %i advisory' % nbmp)
500 msg.append(')')
500 msg.append(')')
501 msg.append(' %s\n' % status)
501 msg.append(' %s\n' % status)
502 op.ui.debug(''.join(msg))
502 op.ui.debug(''.join(msg))
503
503
504 return handler
504 return handler
505
505
506 def _processpart(op, part):
506 def _processpart(op, part):
507 """process a single part from a bundle
507 """process a single part from a bundle
508
508
509 The part is guaranteed to have been fully consumed when the function exits
509 The part is guaranteed to have been fully consumed when the function exits
510 (even if an exception is raised)."""
510 (even if an exception is raised)."""
511 handler = _gethandler(op, part)
511 handler = _gethandler(op, part)
512 if handler is None:
512 if handler is None:
513 return
513 return
514
514
515 # handler is called outside the above try block so that we don't
515 # handler is called outside the above try block so that we don't
516 # risk catching KeyErrors from anything other than the
516 # risk catching KeyErrors from anything other than the
517 # parthandlermapping lookup (any KeyError raised by handler()
517 # parthandlermapping lookup (any KeyError raised by handler()
518 # itself represents a defect of a different variety).
518 # itself represents a defect of a different variety).
519 output = None
519 output = None
520 if op.captureoutput and op.reply is not None:
520 if op.captureoutput and op.reply is not None:
521 op.ui.pushbuffer(error=True, subproc=True)
521 op.ui.pushbuffer(error=True, subproc=True)
522 output = ''
522 output = ''
523 try:
523 try:
524 handler(op, part)
524 handler(op, part)
525 finally:
525 finally:
526 if output is not None:
526 if output is not None:
527 output = op.ui.popbuffer()
527 output = op.ui.popbuffer()
528 if output:
528 if output:
529 outpart = op.reply.newpart('output', data=output,
529 outpart = op.reply.newpart('output', data=output,
530 mandatory=False)
530 mandatory=False)
531 outpart.addparam(
531 outpart.addparam(
532 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
532 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
533
533
534 def decodecaps(blob):
534 def decodecaps(blob):
535 """decode a bundle2 caps bytes blob into a dictionary
535 """decode a bundle2 caps bytes blob into a dictionary
536
536
537 The blob is a list of capabilities (one per line)
537 The blob is a list of capabilities (one per line)
538 Capabilities may have values using a line of the form::
538 Capabilities may have values using a line of the form::
539
539
540 capability=value1,value2,value3
540 capability=value1,value2,value3
541
541
542 The values are always a list."""
542 The values are always a list."""
543 caps = {}
543 caps = {}
544 for line in blob.splitlines():
544 for line in blob.splitlines():
545 if not line:
545 if not line:
546 continue
546 continue
547 if '=' not in line:
547 if '=' not in line:
548 key, vals = line, ()
548 key, vals = line, ()
549 else:
549 else:
550 key, vals = line.split('=', 1)
550 key, vals = line.split('=', 1)
551 vals = vals.split(',')
551 vals = vals.split(',')
552 key = urlreq.unquote(key)
552 key = urlreq.unquote(key)
553 vals = [urlreq.unquote(v) for v in vals]
553 vals = [urlreq.unquote(v) for v in vals]
554 caps[key] = vals
554 caps[key] = vals
555 return caps
555 return caps
556
556
557 def encodecaps(caps):
557 def encodecaps(caps):
558 """encode a bundle2 caps dictionary into a bytes blob"""
558 """encode a bundle2 caps dictionary into a bytes blob"""
559 chunks = []
559 chunks = []
560 for ca in sorted(caps):
560 for ca in sorted(caps):
561 vals = caps[ca]
561 vals = caps[ca]
562 ca = urlreq.quote(ca)
562 ca = urlreq.quote(ca)
563 vals = [urlreq.quote(v) for v in vals]
563 vals = [urlreq.quote(v) for v in vals]
564 if vals:
564 if vals:
565 ca = "%s=%s" % (ca, ','.join(vals))
565 ca = "%s=%s" % (ca, ','.join(vals))
566 chunks.append(ca)
566 chunks.append(ca)
567 return '\n'.join(chunks)
567 return '\n'.join(chunks)
568
568
569 bundletypes = {
569 bundletypes = {
570 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
570 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
571 # since the unification ssh accepts a header but there
571 # since the unification ssh accepts a header but there
572 # is no capability signaling it.
572 # is no capability signaling it.
573 "HG20": (), # special-cased below
573 "HG20": (), # special-cased below
574 "HG10UN": ("HG10UN", 'UN'),
574 "HG10UN": ("HG10UN", 'UN'),
575 "HG10BZ": ("HG10", 'BZ'),
575 "HG10BZ": ("HG10", 'BZ'),
576 "HG10GZ": ("HG10GZ", 'GZ'),
576 "HG10GZ": ("HG10GZ", 'GZ'),
577 }
577 }
578
578
579 # hgweb uses this list to communicate its preferred type
579 # hgweb uses this list to communicate its preferred type
580 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
580 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
581
581
582 class bundle20(object):
582 class bundle20(object):
583 """represent an outgoing bundle2 container
583 """represent an outgoing bundle2 container
584
584
585 Use the `addparam` method to add stream level parameter. and `newpart` to
585 Use the `addparam` method to add stream level parameter. and `newpart` to
586 populate it. Then call `getchunks` to retrieve all the binary chunks of
586 populate it. Then call `getchunks` to retrieve all the binary chunks of
587 data that compose the bundle2 container."""
587 data that compose the bundle2 container."""
588
588
589 _magicstring = 'HG20'
589 _magicstring = 'HG20'
590
590
591 def __init__(self, ui, capabilities=()):
591 def __init__(self, ui, capabilities=()):
592 self.ui = ui
592 self.ui = ui
593 self._params = []
593 self._params = []
594 self._parts = []
594 self._parts = []
595 self.capabilities = dict(capabilities)
595 self.capabilities = dict(capabilities)
596 self._compengine = util.compengines.forbundletype('UN')
596 self._compengine = util.compengines.forbundletype('UN')
597 self._compopts = None
597 self._compopts = None
598
598
599 def setcompression(self, alg, compopts=None):
599 def setcompression(self, alg, compopts=None):
600 """setup core part compression to <alg>"""
600 """setup core part compression to <alg>"""
601 if alg in (None, 'UN'):
601 if alg in (None, 'UN'):
602 return
602 return
603 assert not any(n.lower() == 'compression' for n, v in self._params)
603 assert not any(n.lower() == 'compression' for n, v in self._params)
604 self.addparam('Compression', alg)
604 self.addparam('Compression', alg)
605 self._compengine = util.compengines.forbundletype(alg)
605 self._compengine = util.compengines.forbundletype(alg)
606 self._compopts = compopts
606 self._compopts = compopts
607
607
608 @property
608 @property
609 def nbparts(self):
609 def nbparts(self):
610 """total number of parts added to the bundler"""
610 """total number of parts added to the bundler"""
611 return len(self._parts)
611 return len(self._parts)
612
612
613 # methods used to defines the bundle2 content
613 # methods used to defines the bundle2 content
614 def addparam(self, name, value=None):
614 def addparam(self, name, value=None):
615 """add a stream level parameter"""
615 """add a stream level parameter"""
616 if not name:
616 if not name:
617 raise ValueError(r'empty parameter name')
617 raise ValueError(r'empty parameter name')
618 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
618 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
619 raise ValueError(r'non letter first character: %s' % name)
619 raise ValueError(r'non letter first character: %s' % name)
620 self._params.append((name, value))
620 self._params.append((name, value))
621
621
622 def addpart(self, part):
622 def addpart(self, part):
623 """add a new part to the bundle2 container
623 """add a new part to the bundle2 container
624
624
625 Parts contains the actual applicative payload."""
625 Parts contains the actual applicative payload."""
626 assert part.id is None
626 assert part.id is None
627 part.id = len(self._parts) # very cheap counter
627 part.id = len(self._parts) # very cheap counter
628 self._parts.append(part)
628 self._parts.append(part)
629
629
630 def newpart(self, typeid, *args, **kwargs):
630 def newpart(self, typeid, *args, **kwargs):
631 """create a new part and add it to the containers
631 """create a new part and add it to the containers
632
632
633 As the part is directly added to the containers. For now, this means
633 As the part is directly added to the containers. For now, this means
634 that any failure to properly initialize the part after calling
634 that any failure to properly initialize the part after calling
635 ``newpart`` should result in a failure of the whole bundling process.
635 ``newpart`` should result in a failure of the whole bundling process.
636
636
637 You can still fall back to manually create and add if you need better
637 You can still fall back to manually create and add if you need better
638 control."""
638 control."""
639 part = bundlepart(typeid, *args, **kwargs)
639 part = bundlepart(typeid, *args, **kwargs)
640 self.addpart(part)
640 self.addpart(part)
641 return part
641 return part
642
642
643 # methods used to generate the bundle2 stream
643 # methods used to generate the bundle2 stream
644 def getchunks(self):
644 def getchunks(self):
645 if self.ui.debugflag:
645 if self.ui.debugflag:
646 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
646 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
647 if self._params:
647 if self._params:
648 msg.append(' (%i params)' % len(self._params))
648 msg.append(' (%i params)' % len(self._params))
649 msg.append(' %i parts total\n' % len(self._parts))
649 msg.append(' %i parts total\n' % len(self._parts))
650 self.ui.debug(''.join(msg))
650 self.ui.debug(''.join(msg))
651 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
651 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
652 yield self._magicstring
652 yield self._magicstring
653 param = self._paramchunk()
653 param = self._paramchunk()
654 outdebug(self.ui, 'bundle parameter: %s' % param)
654 outdebug(self.ui, 'bundle parameter: %s' % param)
655 yield _pack(_fstreamparamsize, len(param))
655 yield _pack(_fstreamparamsize, len(param))
656 if param:
656 if param:
657 yield param
657 yield param
658 for chunk in self._compengine.compressstream(self._getcorechunk(),
658 for chunk in self._compengine.compressstream(self._getcorechunk(),
659 self._compopts):
659 self._compopts):
660 yield chunk
660 yield chunk
661
661
662 def _paramchunk(self):
662 def _paramchunk(self):
663 """return a encoded version of all stream parameters"""
663 """return a encoded version of all stream parameters"""
664 blocks = []
664 blocks = []
665 for par, value in self._params:
665 for par, value in self._params:
666 par = urlreq.quote(par)
666 par = urlreq.quote(par)
667 if value is not None:
667 if value is not None:
668 value = urlreq.quote(value)
668 value = urlreq.quote(value)
669 par = '%s=%s' % (par, value)
669 par = '%s=%s' % (par, value)
670 blocks.append(par)
670 blocks.append(par)
671 return ' '.join(blocks)
671 return ' '.join(blocks)
672
672
673 def _getcorechunk(self):
673 def _getcorechunk(self):
674 """yield chunk for the core part of the bundle
674 """yield chunk for the core part of the bundle
675
675
676 (all but headers and parameters)"""
676 (all but headers and parameters)"""
677 outdebug(self.ui, 'start of parts')
677 outdebug(self.ui, 'start of parts')
678 for part in self._parts:
678 for part in self._parts:
679 outdebug(self.ui, 'bundle part: "%s"' % part.type)
679 outdebug(self.ui, 'bundle part: "%s"' % part.type)
680 for chunk in part.getchunks(ui=self.ui):
680 for chunk in part.getchunks(ui=self.ui):
681 yield chunk
681 yield chunk
682 outdebug(self.ui, 'end of bundle')
682 outdebug(self.ui, 'end of bundle')
683 yield _pack(_fpartheadersize, 0)
683 yield _pack(_fpartheadersize, 0)
684
684
685
685
686 def salvageoutput(self):
686 def salvageoutput(self):
687 """return a list with a copy of all output parts in the bundle
687 """return a list with a copy of all output parts in the bundle
688
688
689 This is meant to be used during error handling to make sure we preserve
689 This is meant to be used during error handling to make sure we preserve
690 server output"""
690 server output"""
691 salvaged = []
691 salvaged = []
692 for part in self._parts:
692 for part in self._parts:
693 if part.type.startswith('output'):
693 if part.type.startswith('output'):
694 salvaged.append(part.copy())
694 salvaged.append(part.copy())
695 return salvaged
695 return salvaged
696
696
697
697
698 class unpackermixin(object):
698 class unpackermixin(object):
699 """A mixin to extract bytes and struct data from a stream"""
699 """A mixin to extract bytes and struct data from a stream"""
700
700
701 def __init__(self, fp):
701 def __init__(self, fp):
702 self._fp = fp
702 self._fp = fp
703
703
704 def _unpack(self, format):
704 def _unpack(self, format):
705 """unpack this struct format from the stream
705 """unpack this struct format from the stream
706
706
707 This method is meant for internal usage by the bundle2 protocol only.
707 This method is meant for internal usage by the bundle2 protocol only.
708 They directly manipulate the low level stream including bundle2 level
708 They directly manipulate the low level stream including bundle2 level
709 instruction.
709 instruction.
710
710
711 Do not use it to implement higher-level logic or methods."""
711 Do not use it to implement higher-level logic or methods."""
712 data = self._readexact(struct.calcsize(format))
712 data = self._readexact(struct.calcsize(format))
713 return _unpack(format, data)
713 return _unpack(format, data)
714
714
715 def _readexact(self, size):
715 def _readexact(self, size):
716 """read exactly <size> bytes from the stream
716 """read exactly <size> bytes from the stream
717
717
718 This method is meant for internal usage by the bundle2 protocol only.
718 This method is meant for internal usage by the bundle2 protocol only.
719 They directly manipulate the low level stream including bundle2 level
719 They directly manipulate the low level stream including bundle2 level
720 instruction.
720 instruction.
721
721
722 Do not use it to implement higher-level logic or methods."""
722 Do not use it to implement higher-level logic or methods."""
723 return changegroup.readexactly(self._fp, size)
723 return changegroup.readexactly(self._fp, size)
724
724
725 def getunbundler(ui, fp, magicstring=None):
725 def getunbundler(ui, fp, magicstring=None):
726 """return a valid unbundler object for a given magicstring"""
726 """return a valid unbundler object for a given magicstring"""
727 if magicstring is None:
727 if magicstring is None:
728 magicstring = changegroup.readexactly(fp, 4)
728 magicstring = changegroup.readexactly(fp, 4)
729 magic, version = magicstring[0:2], magicstring[2:4]
729 magic, version = magicstring[0:2], magicstring[2:4]
730 if magic != 'HG':
730 if magic != 'HG':
731 ui.debug(
731 ui.debug(
732 "error: invalid magic: %r (version %r), should be 'HG'\n"
732 "error: invalid magic: %r (version %r), should be 'HG'\n"
733 % (magic, version))
733 % (magic, version))
734 raise error.Abort(_('not a Mercurial bundle'))
734 raise error.Abort(_('not a Mercurial bundle'))
735 unbundlerclass = formatmap.get(version)
735 unbundlerclass = formatmap.get(version)
736 if unbundlerclass is None:
736 if unbundlerclass is None:
737 raise error.Abort(_('unknown bundle version %s') % version)
737 raise error.Abort(_('unknown bundle version %s') % version)
738 unbundler = unbundlerclass(ui, fp)
738 unbundler = unbundlerclass(ui, fp)
739 indebug(ui, 'start processing of %s stream' % magicstring)
739 indebug(ui, 'start processing of %s stream' % magicstring)
740 return unbundler
740 return unbundler
741
741
742 class unbundle20(unpackermixin):
742 class unbundle20(unpackermixin):
743 """interpret a bundle2 stream
743 """interpret a bundle2 stream
744
744
745 This class is fed with a binary stream and yields parts through its
745 This class is fed with a binary stream and yields parts through its
746 `iterparts` methods."""
746 `iterparts` methods."""
747
747
748 _magicstring = 'HG20'
748 _magicstring = 'HG20'
749
749
750 def __init__(self, ui, fp):
750 def __init__(self, ui, fp):
751 """If header is specified, we do not read it out of the stream."""
751 """If header is specified, we do not read it out of the stream."""
752 self.ui = ui
752 self.ui = ui
753 self._compengine = util.compengines.forbundletype('UN')
753 self._compengine = util.compengines.forbundletype('UN')
754 self._compressed = None
754 self._compressed = None
755 super(unbundle20, self).__init__(fp)
755 super(unbundle20, self).__init__(fp)
756
756
757 @util.propertycache
757 @util.propertycache
758 def params(self):
758 def params(self):
759 """dictionary of stream level parameters"""
759 """dictionary of stream level parameters"""
760 indebug(self.ui, 'reading bundle2 stream parameters')
760 indebug(self.ui, 'reading bundle2 stream parameters')
761 params = {}
761 params = {}
762 paramssize = self._unpack(_fstreamparamsize)[0]
762 paramssize = self._unpack(_fstreamparamsize)[0]
763 if paramssize < 0:
763 if paramssize < 0:
764 raise error.BundleValueError('negative bundle param size: %i'
764 raise error.BundleValueError('negative bundle param size: %i'
765 % paramssize)
765 % paramssize)
766 if paramssize:
766 if paramssize:
767 params = self._readexact(paramssize)
767 params = self._readexact(paramssize)
768 params = self._processallparams(params)
768 params = self._processallparams(params)
769 return params
769 return params
770
770
771 def _processallparams(self, paramsblock):
771 def _processallparams(self, paramsblock):
772 """"""
772 """"""
773 params = util.sortdict()
773 params = util.sortdict()
774 for p in paramsblock.split(' '):
774 for p in paramsblock.split(' '):
775 p = p.split('=', 1)
775 p = p.split('=', 1)
776 p = [urlreq.unquote(i) for i in p]
776 p = [urlreq.unquote(i) for i in p]
777 if len(p) < 2:
777 if len(p) < 2:
778 p.append(None)
778 p.append(None)
779 self._processparam(*p)
779 self._processparam(*p)
780 params[p[0]] = p[1]
780 params[p[0]] = p[1]
781 return params
781 return params
782
782
783
783
784 def _processparam(self, name, value):
784 def _processparam(self, name, value):
785 """process a parameter, applying its effect if needed
785 """process a parameter, applying its effect if needed
786
786
787 Parameter starting with a lower case letter are advisory and will be
787 Parameter starting with a lower case letter are advisory and will be
788 ignored when unknown. Those starting with an upper case letter are
788 ignored when unknown. Those starting with an upper case letter are
789 mandatory and will this function will raise a KeyError when unknown.
789 mandatory and will this function will raise a KeyError when unknown.
790
790
791 Note: no option are currently supported. Any input will be either
791 Note: no option are currently supported. Any input will be either
792 ignored or failing.
792 ignored or failing.
793 """
793 """
794 if not name:
794 if not name:
795 raise ValueError(r'empty parameter name')
795 raise ValueError(r'empty parameter name')
796 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
796 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
797 raise ValueError(r'non letter first character: %s' % name)
797 raise ValueError(r'non letter first character: %s' % name)
798 try:
798 try:
799 handler = b2streamparamsmap[name.lower()]
799 handler = b2streamparamsmap[name.lower()]
800 except KeyError:
800 except KeyError:
801 if name[0:1].islower():
801 if name[0:1].islower():
802 indebug(self.ui, "ignoring unknown parameter %s" % name)
802 indebug(self.ui, "ignoring unknown parameter %s" % name)
803 else:
803 else:
804 raise error.BundleUnknownFeatureError(params=(name,))
804 raise error.BundleUnknownFeatureError(params=(name,))
805 else:
805 else:
806 handler(self, name, value)
806 handler(self, name, value)
807
807
808 def _forwardchunks(self):
808 def _forwardchunks(self):
809 """utility to transfer a bundle2 as binary
809 """utility to transfer a bundle2 as binary
810
810
811 This is made necessary by the fact the 'getbundle' command over 'ssh'
811 This is made necessary by the fact the 'getbundle' command over 'ssh'
812 have no way to know then the reply end, relying on the bundle to be
812 have no way to know then the reply end, relying on the bundle to be
813 interpreted to know its end. This is terrible and we are sorry, but we
813 interpreted to know its end. This is terrible and we are sorry, but we
814 needed to move forward to get general delta enabled.
814 needed to move forward to get general delta enabled.
815 """
815 """
816 yield self._magicstring
816 yield self._magicstring
817 assert 'params' not in vars(self)
817 assert 'params' not in vars(self)
818 paramssize = self._unpack(_fstreamparamsize)[0]
818 paramssize = self._unpack(_fstreamparamsize)[0]
819 if paramssize < 0:
819 if paramssize < 0:
820 raise error.BundleValueError('negative bundle param size: %i'
820 raise error.BundleValueError('negative bundle param size: %i'
821 % paramssize)
821 % paramssize)
822 yield _pack(_fstreamparamsize, paramssize)
822 yield _pack(_fstreamparamsize, paramssize)
823 if paramssize:
823 if paramssize:
824 params = self._readexact(paramssize)
824 params = self._readexact(paramssize)
825 self._processallparams(params)
825 self._processallparams(params)
826 yield params
826 yield params
827 assert self._compengine.bundletype == 'UN'
827 assert self._compengine.bundletype == 'UN'
828 # From there, payload might need to be decompressed
828 # From there, payload might need to be decompressed
829 self._fp = self._compengine.decompressorreader(self._fp)
829 self._fp = self._compengine.decompressorreader(self._fp)
830 emptycount = 0
830 emptycount = 0
831 while emptycount < 2:
831 while emptycount < 2:
832 # so we can brainlessly loop
832 # so we can brainlessly loop
833 assert _fpartheadersize == _fpayloadsize
833 assert _fpartheadersize == _fpayloadsize
834 size = self._unpack(_fpartheadersize)[0]
834 size = self._unpack(_fpartheadersize)[0]
835 yield _pack(_fpartheadersize, size)
835 yield _pack(_fpartheadersize, size)
836 if size:
836 if size:
837 emptycount = 0
837 emptycount = 0
838 else:
838 else:
839 emptycount += 1
839 emptycount += 1
840 continue
840 continue
841 if size == flaginterrupt:
841 if size == flaginterrupt:
842 continue
842 continue
843 elif size < 0:
843 elif size < 0:
844 raise error.BundleValueError('negative chunk size: %i')
844 raise error.BundleValueError('negative chunk size: %i')
845 yield self._readexact(size)
845 yield self._readexact(size)
846
846
847
847
848 def iterparts(self):
848 def iterparts(self):
849 """yield all parts contained in the stream"""
849 """yield all parts contained in the stream"""
850 # make sure param have been loaded
850 # make sure param have been loaded
851 self.params
851 self.params
852 # From there, payload need to be decompressed
852 # From there, payload need to be decompressed
853 self._fp = self._compengine.decompressorreader(self._fp)
853 self._fp = self._compengine.decompressorreader(self._fp)
854 indebug(self.ui, 'start extraction of bundle2 parts')
854 indebug(self.ui, 'start extraction of bundle2 parts')
855 headerblock = self._readpartheader()
855 headerblock = self._readpartheader()
856 while headerblock is not None:
856 while headerblock is not None:
857 part = seekableunbundlepart(self.ui, headerblock, self._fp)
857 part = seekableunbundlepart(self.ui, headerblock, self._fp)
858 yield part
858 yield part
859 # Ensure part is fully consumed so we can start reading the next
859 # Ensure part is fully consumed so we can start reading the next
860 # part.
860 # part.
861 part.consume()
861 part.consume()
862 # But then seek back to the beginning so the code consuming this
862
863 # generator has a part that starts at 0.
864 part.seek(0, os.SEEK_SET)
865 headerblock = self._readpartheader()
863 headerblock = self._readpartheader()
866 indebug(self.ui, 'end of bundle2 stream')
864 indebug(self.ui, 'end of bundle2 stream')
867
865
868 def _readpartheader(self):
866 def _readpartheader(self):
869 """reads a part header size and return the bytes blob
867 """reads a part header size and return the bytes blob
870
868
871 returns None if empty"""
869 returns None if empty"""
872 headersize = self._unpack(_fpartheadersize)[0]
870 headersize = self._unpack(_fpartheadersize)[0]
873 if headersize < 0:
871 if headersize < 0:
874 raise error.BundleValueError('negative part header size: %i'
872 raise error.BundleValueError('negative part header size: %i'
875 % headersize)
873 % headersize)
876 indebug(self.ui, 'part header size: %i' % headersize)
874 indebug(self.ui, 'part header size: %i' % headersize)
877 if headersize:
875 if headersize:
878 return self._readexact(headersize)
876 return self._readexact(headersize)
879 return None
877 return None
880
878
881 def compressed(self):
879 def compressed(self):
882 self.params # load params
880 self.params # load params
883 return self._compressed
881 return self._compressed
884
882
885 def close(self):
883 def close(self):
886 """close underlying file"""
884 """close underlying file"""
887 if util.safehasattr(self._fp, 'close'):
885 if util.safehasattr(self._fp, 'close'):
888 return self._fp.close()
886 return self._fp.close()
889
887
890 formatmap = {'20': unbundle20}
888 formatmap = {'20': unbundle20}
891
889
892 b2streamparamsmap = {}
890 b2streamparamsmap = {}
893
891
894 def b2streamparamhandler(name):
892 def b2streamparamhandler(name):
895 """register a handler for a stream level parameter"""
893 """register a handler for a stream level parameter"""
896 def decorator(func):
894 def decorator(func):
897 assert name not in formatmap
895 assert name not in formatmap
898 b2streamparamsmap[name] = func
896 b2streamparamsmap[name] = func
899 return func
897 return func
900 return decorator
898 return decorator
901
899
902 @b2streamparamhandler('compression')
900 @b2streamparamhandler('compression')
903 def processcompression(unbundler, param, value):
901 def processcompression(unbundler, param, value):
904 """read compression parameter and install payload decompression"""
902 """read compression parameter and install payload decompression"""
905 if value not in util.compengines.supportedbundletypes:
903 if value not in util.compengines.supportedbundletypes:
906 raise error.BundleUnknownFeatureError(params=(param,),
904 raise error.BundleUnknownFeatureError(params=(param,),
907 values=(value,))
905 values=(value,))
908 unbundler._compengine = util.compengines.forbundletype(value)
906 unbundler._compengine = util.compengines.forbundletype(value)
909 if value is not None:
907 if value is not None:
910 unbundler._compressed = True
908 unbundler._compressed = True
911
909
912 class bundlepart(object):
910 class bundlepart(object):
913 """A bundle2 part contains application level payload
911 """A bundle2 part contains application level payload
914
912
915 The part `type` is used to route the part to the application level
913 The part `type` is used to route the part to the application level
916 handler.
914 handler.
917
915
918 The part payload is contained in ``part.data``. It could be raw bytes or a
916 The part payload is contained in ``part.data``. It could be raw bytes or a
919 generator of byte chunks.
917 generator of byte chunks.
920
918
921 You can add parameters to the part using the ``addparam`` method.
919 You can add parameters to the part using the ``addparam`` method.
922 Parameters can be either mandatory (default) or advisory. Remote side
920 Parameters can be either mandatory (default) or advisory. Remote side
923 should be able to safely ignore the advisory ones.
921 should be able to safely ignore the advisory ones.
924
922
925 Both data and parameters cannot be modified after the generation has begun.
923 Both data and parameters cannot be modified after the generation has begun.
926 """
924 """
927
925
928 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
926 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
929 data='', mandatory=True):
927 data='', mandatory=True):
930 validateparttype(parttype)
928 validateparttype(parttype)
931 self.id = None
929 self.id = None
932 self.type = parttype
930 self.type = parttype
933 self._data = data
931 self._data = data
934 self._mandatoryparams = list(mandatoryparams)
932 self._mandatoryparams = list(mandatoryparams)
935 self._advisoryparams = list(advisoryparams)
933 self._advisoryparams = list(advisoryparams)
936 # checking for duplicated entries
934 # checking for duplicated entries
937 self._seenparams = set()
935 self._seenparams = set()
938 for pname, __ in self._mandatoryparams + self._advisoryparams:
936 for pname, __ in self._mandatoryparams + self._advisoryparams:
939 if pname in self._seenparams:
937 if pname in self._seenparams:
940 raise error.ProgrammingError('duplicated params: %s' % pname)
938 raise error.ProgrammingError('duplicated params: %s' % pname)
941 self._seenparams.add(pname)
939 self._seenparams.add(pname)
942 # status of the part's generation:
940 # status of the part's generation:
943 # - None: not started,
941 # - None: not started,
944 # - False: currently generated,
942 # - False: currently generated,
945 # - True: generation done.
943 # - True: generation done.
946 self._generated = None
944 self._generated = None
947 self.mandatory = mandatory
945 self.mandatory = mandatory
948
946
949 def __repr__(self):
947 def __repr__(self):
950 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
948 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
951 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
949 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
952 % (cls, id(self), self.id, self.type, self.mandatory))
950 % (cls, id(self), self.id, self.type, self.mandatory))
953
951
954 def copy(self):
952 def copy(self):
955 """return a copy of the part
953 """return a copy of the part
956
954
957 The new part have the very same content but no partid assigned yet.
955 The new part have the very same content but no partid assigned yet.
958 Parts with generated data cannot be copied."""
956 Parts with generated data cannot be copied."""
959 assert not util.safehasattr(self.data, 'next')
957 assert not util.safehasattr(self.data, 'next')
960 return self.__class__(self.type, self._mandatoryparams,
958 return self.__class__(self.type, self._mandatoryparams,
961 self._advisoryparams, self._data, self.mandatory)
959 self._advisoryparams, self._data, self.mandatory)
962
960
963 # methods used to defines the part content
961 # methods used to defines the part content
964 @property
962 @property
965 def data(self):
963 def data(self):
966 return self._data
964 return self._data
967
965
968 @data.setter
966 @data.setter
969 def data(self, data):
967 def data(self, data):
970 if self._generated is not None:
968 if self._generated is not None:
971 raise error.ReadOnlyPartError('part is being generated')
969 raise error.ReadOnlyPartError('part is being generated')
972 self._data = data
970 self._data = data
973
971
974 @property
972 @property
975 def mandatoryparams(self):
973 def mandatoryparams(self):
976 # make it an immutable tuple to force people through ``addparam``
974 # make it an immutable tuple to force people through ``addparam``
977 return tuple(self._mandatoryparams)
975 return tuple(self._mandatoryparams)
978
976
979 @property
977 @property
980 def advisoryparams(self):
978 def advisoryparams(self):
981 # make it an immutable tuple to force people through ``addparam``
979 # make it an immutable tuple to force people through ``addparam``
982 return tuple(self._advisoryparams)
980 return tuple(self._advisoryparams)
983
981
984 def addparam(self, name, value='', mandatory=True):
982 def addparam(self, name, value='', mandatory=True):
985 """add a parameter to the part
983 """add a parameter to the part
986
984
987 If 'mandatory' is set to True, the remote handler must claim support
985 If 'mandatory' is set to True, the remote handler must claim support
988 for this parameter or the unbundling will be aborted.
986 for this parameter or the unbundling will be aborted.
989
987
990 The 'name' and 'value' cannot exceed 255 bytes each.
988 The 'name' and 'value' cannot exceed 255 bytes each.
991 """
989 """
992 if self._generated is not None:
990 if self._generated is not None:
993 raise error.ReadOnlyPartError('part is being generated')
991 raise error.ReadOnlyPartError('part is being generated')
994 if name in self._seenparams:
992 if name in self._seenparams:
995 raise ValueError('duplicated params: %s' % name)
993 raise ValueError('duplicated params: %s' % name)
996 self._seenparams.add(name)
994 self._seenparams.add(name)
997 params = self._advisoryparams
995 params = self._advisoryparams
998 if mandatory:
996 if mandatory:
999 params = self._mandatoryparams
997 params = self._mandatoryparams
1000 params.append((name, value))
998 params.append((name, value))
1001
999
1002 # methods used to generates the bundle2 stream
1000 # methods used to generates the bundle2 stream
1003 def getchunks(self, ui):
1001 def getchunks(self, ui):
1004 if self._generated is not None:
1002 if self._generated is not None:
1005 raise error.ProgrammingError('part can only be consumed once')
1003 raise error.ProgrammingError('part can only be consumed once')
1006 self._generated = False
1004 self._generated = False
1007
1005
1008 if ui.debugflag:
1006 if ui.debugflag:
1009 msg = ['bundle2-output-part: "%s"' % self.type]
1007 msg = ['bundle2-output-part: "%s"' % self.type]
1010 if not self.mandatory:
1008 if not self.mandatory:
1011 msg.append(' (advisory)')
1009 msg.append(' (advisory)')
1012 nbmp = len(self.mandatoryparams)
1010 nbmp = len(self.mandatoryparams)
1013 nbap = len(self.advisoryparams)
1011 nbap = len(self.advisoryparams)
1014 if nbmp or nbap:
1012 if nbmp or nbap:
1015 msg.append(' (params:')
1013 msg.append(' (params:')
1016 if nbmp:
1014 if nbmp:
1017 msg.append(' %i mandatory' % nbmp)
1015 msg.append(' %i mandatory' % nbmp)
1018 if nbap:
1016 if nbap:
1019 msg.append(' %i advisory' % nbmp)
1017 msg.append(' %i advisory' % nbmp)
1020 msg.append(')')
1018 msg.append(')')
1021 if not self.data:
1019 if not self.data:
1022 msg.append(' empty payload')
1020 msg.append(' empty payload')
1023 elif (util.safehasattr(self.data, 'next')
1021 elif (util.safehasattr(self.data, 'next')
1024 or util.safehasattr(self.data, '__next__')):
1022 or util.safehasattr(self.data, '__next__')):
1025 msg.append(' streamed payload')
1023 msg.append(' streamed payload')
1026 else:
1024 else:
1027 msg.append(' %i bytes payload' % len(self.data))
1025 msg.append(' %i bytes payload' % len(self.data))
1028 msg.append('\n')
1026 msg.append('\n')
1029 ui.debug(''.join(msg))
1027 ui.debug(''.join(msg))
1030
1028
1031 #### header
1029 #### header
1032 if self.mandatory:
1030 if self.mandatory:
1033 parttype = self.type.upper()
1031 parttype = self.type.upper()
1034 else:
1032 else:
1035 parttype = self.type.lower()
1033 parttype = self.type.lower()
1036 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1034 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1037 ## parttype
1035 ## parttype
1038 header = [_pack(_fparttypesize, len(parttype)),
1036 header = [_pack(_fparttypesize, len(parttype)),
1039 parttype, _pack(_fpartid, self.id),
1037 parttype, _pack(_fpartid, self.id),
1040 ]
1038 ]
1041 ## parameters
1039 ## parameters
1042 # count
1040 # count
1043 manpar = self.mandatoryparams
1041 manpar = self.mandatoryparams
1044 advpar = self.advisoryparams
1042 advpar = self.advisoryparams
1045 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1043 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1046 # size
1044 # size
1047 parsizes = []
1045 parsizes = []
1048 for key, value in manpar:
1046 for key, value in manpar:
1049 parsizes.append(len(key))
1047 parsizes.append(len(key))
1050 parsizes.append(len(value))
1048 parsizes.append(len(value))
1051 for key, value in advpar:
1049 for key, value in advpar:
1052 parsizes.append(len(key))
1050 parsizes.append(len(key))
1053 parsizes.append(len(value))
1051 parsizes.append(len(value))
1054 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1052 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1055 header.append(paramsizes)
1053 header.append(paramsizes)
1056 # key, value
1054 # key, value
1057 for key, value in manpar:
1055 for key, value in manpar:
1058 header.append(key)
1056 header.append(key)
1059 header.append(value)
1057 header.append(value)
1060 for key, value in advpar:
1058 for key, value in advpar:
1061 header.append(key)
1059 header.append(key)
1062 header.append(value)
1060 header.append(value)
1063 ## finalize header
1061 ## finalize header
1064 try:
1062 try:
1065 headerchunk = ''.join(header)
1063 headerchunk = ''.join(header)
1066 except TypeError:
1064 except TypeError:
1067 raise TypeError(r'Found a non-bytes trying to '
1065 raise TypeError(r'Found a non-bytes trying to '
1068 r'build bundle part header: %r' % header)
1066 r'build bundle part header: %r' % header)
1069 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1067 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1070 yield _pack(_fpartheadersize, len(headerchunk))
1068 yield _pack(_fpartheadersize, len(headerchunk))
1071 yield headerchunk
1069 yield headerchunk
1072 ## payload
1070 ## payload
1073 try:
1071 try:
1074 for chunk in self._payloadchunks():
1072 for chunk in self._payloadchunks():
1075 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1073 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1076 yield _pack(_fpayloadsize, len(chunk))
1074 yield _pack(_fpayloadsize, len(chunk))
1077 yield chunk
1075 yield chunk
1078 except GeneratorExit:
1076 except GeneratorExit:
1079 # GeneratorExit means that nobody is listening for our
1077 # GeneratorExit means that nobody is listening for our
1080 # results anyway, so just bail quickly rather than trying
1078 # results anyway, so just bail quickly rather than trying
1081 # to produce an error part.
1079 # to produce an error part.
1082 ui.debug('bundle2-generatorexit\n')
1080 ui.debug('bundle2-generatorexit\n')
1083 raise
1081 raise
1084 except BaseException as exc:
1082 except BaseException as exc:
1085 bexc = util.forcebytestr(exc)
1083 bexc = util.forcebytestr(exc)
1086 # backup exception data for later
1084 # backup exception data for later
1087 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1085 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1088 % bexc)
1086 % bexc)
1089 tb = sys.exc_info()[2]
1087 tb = sys.exc_info()[2]
1090 msg = 'unexpected error: %s' % bexc
1088 msg = 'unexpected error: %s' % bexc
1091 interpart = bundlepart('error:abort', [('message', msg)],
1089 interpart = bundlepart('error:abort', [('message', msg)],
1092 mandatory=False)
1090 mandatory=False)
1093 interpart.id = 0
1091 interpart.id = 0
1094 yield _pack(_fpayloadsize, -1)
1092 yield _pack(_fpayloadsize, -1)
1095 for chunk in interpart.getchunks(ui=ui):
1093 for chunk in interpart.getchunks(ui=ui):
1096 yield chunk
1094 yield chunk
1097 outdebug(ui, 'closing payload chunk')
1095 outdebug(ui, 'closing payload chunk')
1098 # abort current part payload
1096 # abort current part payload
1099 yield _pack(_fpayloadsize, 0)
1097 yield _pack(_fpayloadsize, 0)
1100 pycompat.raisewithtb(exc, tb)
1098 pycompat.raisewithtb(exc, tb)
1101 # end of payload
1099 # end of payload
1102 outdebug(ui, 'closing payload chunk')
1100 outdebug(ui, 'closing payload chunk')
1103 yield _pack(_fpayloadsize, 0)
1101 yield _pack(_fpayloadsize, 0)
1104 self._generated = True
1102 self._generated = True
1105
1103
1106 def _payloadchunks(self):
1104 def _payloadchunks(self):
1107 """yield chunks of a the part payload
1105 """yield chunks of a the part payload
1108
1106
1109 Exists to handle the different methods to provide data to a part."""
1107 Exists to handle the different methods to provide data to a part."""
1110 # we only support fixed size data now.
1108 # we only support fixed size data now.
1111 # This will be improved in the future.
1109 # This will be improved in the future.
1112 if (util.safehasattr(self.data, 'next')
1110 if (util.safehasattr(self.data, 'next')
1113 or util.safehasattr(self.data, '__next__')):
1111 or util.safehasattr(self.data, '__next__')):
1114 buff = util.chunkbuffer(self.data)
1112 buff = util.chunkbuffer(self.data)
1115 chunk = buff.read(preferedchunksize)
1113 chunk = buff.read(preferedchunksize)
1116 while chunk:
1114 while chunk:
1117 yield chunk
1115 yield chunk
1118 chunk = buff.read(preferedchunksize)
1116 chunk = buff.read(preferedchunksize)
1119 elif len(self.data):
1117 elif len(self.data):
1120 yield self.data
1118 yield self.data
1121
1119
1122
1120
1123 flaginterrupt = -1
1121 flaginterrupt = -1
1124
1122
1125 class interrupthandler(unpackermixin):
1123 class interrupthandler(unpackermixin):
1126 """read one part and process it with restricted capability
1124 """read one part and process it with restricted capability
1127
1125
1128 This allows to transmit exception raised on the producer size during part
1126 This allows to transmit exception raised on the producer size during part
1129 iteration while the consumer is reading a part.
1127 iteration while the consumer is reading a part.
1130
1128
1131 Part processed in this manner only have access to a ui object,"""
1129 Part processed in this manner only have access to a ui object,"""
1132
1130
1133 def __init__(self, ui, fp):
1131 def __init__(self, ui, fp):
1134 super(interrupthandler, self).__init__(fp)
1132 super(interrupthandler, self).__init__(fp)
1135 self.ui = ui
1133 self.ui = ui
1136
1134
1137 def _readpartheader(self):
1135 def _readpartheader(self):
1138 """reads a part header size and return the bytes blob
1136 """reads a part header size and return the bytes blob
1139
1137
1140 returns None if empty"""
1138 returns None if empty"""
1141 headersize = self._unpack(_fpartheadersize)[0]
1139 headersize = self._unpack(_fpartheadersize)[0]
1142 if headersize < 0:
1140 if headersize < 0:
1143 raise error.BundleValueError('negative part header size: %i'
1141 raise error.BundleValueError('negative part header size: %i'
1144 % headersize)
1142 % headersize)
1145 indebug(self.ui, 'part header size: %i\n' % headersize)
1143 indebug(self.ui, 'part header size: %i\n' % headersize)
1146 if headersize:
1144 if headersize:
1147 return self._readexact(headersize)
1145 return self._readexact(headersize)
1148 return None
1146 return None
1149
1147
1150 def __call__(self):
1148 def __call__(self):
1151
1149
1152 self.ui.debug('bundle2-input-stream-interrupt:'
1150 self.ui.debug('bundle2-input-stream-interrupt:'
1153 ' opening out of band context\n')
1151 ' opening out of band context\n')
1154 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1152 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1155 headerblock = self._readpartheader()
1153 headerblock = self._readpartheader()
1156 if headerblock is None:
1154 if headerblock is None:
1157 indebug(self.ui, 'no part found during interruption.')
1155 indebug(self.ui, 'no part found during interruption.')
1158 return
1156 return
1159 part = seekableunbundlepart(self.ui, headerblock, self._fp)
1157 part = seekableunbundlepart(self.ui, headerblock, self._fp)
1160 op = interruptoperation(self.ui)
1158 op = interruptoperation(self.ui)
1161 hardabort = False
1159 hardabort = False
1162 try:
1160 try:
1163 _processpart(op, part)
1161 _processpart(op, part)
1164 except (SystemExit, KeyboardInterrupt):
1162 except (SystemExit, KeyboardInterrupt):
1165 hardabort = True
1163 hardabort = True
1166 raise
1164 raise
1167 finally:
1165 finally:
1168 if not hardabort:
1166 if not hardabort:
1169 part.consume()
1167 part.consume()
1170 self.ui.debug('bundle2-input-stream-interrupt:'
1168 self.ui.debug('bundle2-input-stream-interrupt:'
1171 ' closing out of band context\n')
1169 ' closing out of band context\n')
1172
1170
1173 class interruptoperation(object):
1171 class interruptoperation(object):
1174 """A limited operation to be use by part handler during interruption
1172 """A limited operation to be use by part handler during interruption
1175
1173
1176 It only have access to an ui object.
1174 It only have access to an ui object.
1177 """
1175 """
1178
1176
1179 def __init__(self, ui):
1177 def __init__(self, ui):
1180 self.ui = ui
1178 self.ui = ui
1181 self.reply = None
1179 self.reply = None
1182 self.captureoutput = False
1180 self.captureoutput = False
1183
1181
1184 @property
1182 @property
1185 def repo(self):
1183 def repo(self):
1186 raise error.ProgrammingError('no repo access from stream interruption')
1184 raise error.ProgrammingError('no repo access from stream interruption')
1187
1185
1188 def gettransaction(self):
1186 def gettransaction(self):
1189 raise TransactionUnavailable('no repo access from stream interruption')
1187 raise TransactionUnavailable('no repo access from stream interruption')
1190
1188
1191 def decodepayloadchunks(ui, fh):
1189 def decodepayloadchunks(ui, fh):
1192 """Reads bundle2 part payload data into chunks.
1190 """Reads bundle2 part payload data into chunks.
1193
1191
1194 Part payload data consists of framed chunks. This function takes
1192 Part payload data consists of framed chunks. This function takes
1195 a file handle and emits those chunks.
1193 a file handle and emits those chunks.
1196 """
1194 """
1197 headersize = struct.calcsize(_fpayloadsize)
1195 headersize = struct.calcsize(_fpayloadsize)
1198 readexactly = changegroup.readexactly
1196 readexactly = changegroup.readexactly
1199
1197
1200 chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0]
1198 chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0]
1201 indebug(ui, 'payload chunk size: %i' % chunksize)
1199 indebug(ui, 'payload chunk size: %i' % chunksize)
1202
1200
1203 while chunksize:
1201 while chunksize:
1204 if chunksize >= 0:
1202 if chunksize >= 0:
1205 yield readexactly(fh, chunksize)
1203 yield readexactly(fh, chunksize)
1206 elif chunksize == flaginterrupt:
1204 elif chunksize == flaginterrupt:
1207 # Interrupt "signal" detected. The regular stream is interrupted
1205 # Interrupt "signal" detected. The regular stream is interrupted
1208 # and a bundle2 part follows. Consume it.
1206 # and a bundle2 part follows. Consume it.
1209 interrupthandler(ui, fh)()
1207 interrupthandler(ui, fh)()
1210 else:
1208 else:
1211 raise error.BundleValueError(
1209 raise error.BundleValueError(
1212 'negative payload chunk size: %s' % chunksize)
1210 'negative payload chunk size: %s' % chunksize)
1213
1211
1214 chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0]
1212 chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0]
1215 indebug(ui, 'payload chunk size: %i' % chunksize)
1213 indebug(ui, 'payload chunk size: %i' % chunksize)
1216
1214
1217 class unbundlepart(unpackermixin):
1215 class unbundlepart(unpackermixin):
1218 """a bundle part read from a bundle"""
1216 """a bundle part read from a bundle"""
1219
1217
1220 def __init__(self, ui, header, fp):
1218 def __init__(self, ui, header, fp):
1221 super(unbundlepart, self).__init__(fp)
1219 super(unbundlepart, self).__init__(fp)
1222 self._seekable = (util.safehasattr(fp, 'seek') and
1220 self._seekable = (util.safehasattr(fp, 'seek') and
1223 util.safehasattr(fp, 'tell'))
1221 util.safehasattr(fp, 'tell'))
1224 self.ui = ui
1222 self.ui = ui
1225 # unbundle state attr
1223 # unbundle state attr
1226 self._headerdata = header
1224 self._headerdata = header
1227 self._headeroffset = 0
1225 self._headeroffset = 0
1228 self._initialized = False
1226 self._initialized = False
1229 self.consumed = False
1227 self.consumed = False
1230 # part data
1228 # part data
1231 self.id = None
1229 self.id = None
1232 self.type = None
1230 self.type = None
1233 self.mandatoryparams = None
1231 self.mandatoryparams = None
1234 self.advisoryparams = None
1232 self.advisoryparams = None
1235 self.params = None
1233 self.params = None
1236 self.mandatorykeys = ()
1234 self.mandatorykeys = ()
1237 self._readheader()
1235 self._readheader()
1238 self._mandatory = None
1236 self._mandatory = None
1239 self._pos = 0
1237 self._pos = 0
1240
1238
1241 def _fromheader(self, size):
1239 def _fromheader(self, size):
1242 """return the next <size> byte from the header"""
1240 """return the next <size> byte from the header"""
1243 offset = self._headeroffset
1241 offset = self._headeroffset
1244 data = self._headerdata[offset:(offset + size)]
1242 data = self._headerdata[offset:(offset + size)]
1245 self._headeroffset = offset + size
1243 self._headeroffset = offset + size
1246 return data
1244 return data
1247
1245
1248 def _unpackheader(self, format):
1246 def _unpackheader(self, format):
1249 """read given format from header
1247 """read given format from header
1250
1248
1251 This automatically compute the size of the format to read."""
1249 This automatically compute the size of the format to read."""
1252 data = self._fromheader(struct.calcsize(format))
1250 data = self._fromheader(struct.calcsize(format))
1253 return _unpack(format, data)
1251 return _unpack(format, data)
1254
1252
1255 def _initparams(self, mandatoryparams, advisoryparams):
1253 def _initparams(self, mandatoryparams, advisoryparams):
1256 """internal function to setup all logic related parameters"""
1254 """internal function to setup all logic related parameters"""
1257 # make it read only to prevent people touching it by mistake.
1255 # make it read only to prevent people touching it by mistake.
1258 self.mandatoryparams = tuple(mandatoryparams)
1256 self.mandatoryparams = tuple(mandatoryparams)
1259 self.advisoryparams = tuple(advisoryparams)
1257 self.advisoryparams = tuple(advisoryparams)
1260 # user friendly UI
1258 # user friendly UI
1261 self.params = util.sortdict(self.mandatoryparams)
1259 self.params = util.sortdict(self.mandatoryparams)
1262 self.params.update(self.advisoryparams)
1260 self.params.update(self.advisoryparams)
1263 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1261 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1264
1262
1265 def _readheader(self):
1263 def _readheader(self):
1266 """read the header and setup the object"""
1264 """read the header and setup the object"""
1267 typesize = self._unpackheader(_fparttypesize)[0]
1265 typesize = self._unpackheader(_fparttypesize)[0]
1268 self.type = self._fromheader(typesize)
1266 self.type = self._fromheader(typesize)
1269 indebug(self.ui, 'part type: "%s"' % self.type)
1267 indebug(self.ui, 'part type: "%s"' % self.type)
1270 self.id = self._unpackheader(_fpartid)[0]
1268 self.id = self._unpackheader(_fpartid)[0]
1271 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1269 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1272 # extract mandatory bit from type
1270 # extract mandatory bit from type
1273 self.mandatory = (self.type != self.type.lower())
1271 self.mandatory = (self.type != self.type.lower())
1274 self.type = self.type.lower()
1272 self.type = self.type.lower()
1275 ## reading parameters
1273 ## reading parameters
1276 # param count
1274 # param count
1277 mancount, advcount = self._unpackheader(_fpartparamcount)
1275 mancount, advcount = self._unpackheader(_fpartparamcount)
1278 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1276 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1279 # param size
1277 # param size
1280 fparamsizes = _makefpartparamsizes(mancount + advcount)
1278 fparamsizes = _makefpartparamsizes(mancount + advcount)
1281 paramsizes = self._unpackheader(fparamsizes)
1279 paramsizes = self._unpackheader(fparamsizes)
1282 # make it a list of couple again
1280 # make it a list of couple again
1283 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1281 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1284 # split mandatory from advisory
1282 # split mandatory from advisory
1285 mansizes = paramsizes[:mancount]
1283 mansizes = paramsizes[:mancount]
1286 advsizes = paramsizes[mancount:]
1284 advsizes = paramsizes[mancount:]
1287 # retrieve param value
1285 # retrieve param value
1288 manparams = []
1286 manparams = []
1289 for key, value in mansizes:
1287 for key, value in mansizes:
1290 manparams.append((self._fromheader(key), self._fromheader(value)))
1288 manparams.append((self._fromheader(key), self._fromheader(value)))
1291 advparams = []
1289 advparams = []
1292 for key, value in advsizes:
1290 for key, value in advsizes:
1293 advparams.append((self._fromheader(key), self._fromheader(value)))
1291 advparams.append((self._fromheader(key), self._fromheader(value)))
1294 self._initparams(manparams, advparams)
1292 self._initparams(manparams, advparams)
1295 ## part payload
1293 ## part payload
1296 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1294 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1297 # we read the data, tell it
1295 # we read the data, tell it
1298 self._initialized = True
1296 self._initialized = True
1299
1297
1300 def _payloadchunks(self):
1298 def _payloadchunks(self):
1301 """Generator of decoded chunks in the payload."""
1299 """Generator of decoded chunks in the payload."""
1302 return decodepayloadchunks(self.ui, self._fp)
1300 return decodepayloadchunks(self.ui, self._fp)
1303
1301
1304 def consume(self):
1302 def consume(self):
1305 """Read the part payload until completion.
1303 """Read the part payload until completion.
1306
1304
1307 By consuming the part data, the underlying stream read offset will
1305 By consuming the part data, the underlying stream read offset will
1308 be advanced to the next part (or end of stream).
1306 be advanced to the next part (or end of stream).
1309 """
1307 """
1310 if self.consumed:
1308 if self.consumed:
1311 return
1309 return
1312
1310
1313 chunk = self.read(32768)
1311 chunk = self.read(32768)
1314 while chunk:
1312 while chunk:
1315 self._pos += len(chunk)
1313 self._pos += len(chunk)
1316 chunk = self.read(32768)
1314 chunk = self.read(32768)
1317
1315
1318 def read(self, size=None):
1316 def read(self, size=None):
1319 """read payload data"""
1317 """read payload data"""
1320 if not self._initialized:
1318 if not self._initialized:
1321 self._readheader()
1319 self._readheader()
1322 if size is None:
1320 if size is None:
1323 data = self._payloadstream.read()
1321 data = self._payloadstream.read()
1324 else:
1322 else:
1325 data = self._payloadstream.read(size)
1323 data = self._payloadstream.read(size)
1326 self._pos += len(data)
1324 self._pos += len(data)
1327 if size is None or len(data) < size:
1325 if size is None or len(data) < size:
1328 if not self.consumed and self._pos:
1326 if not self.consumed and self._pos:
1329 self.ui.debug('bundle2-input-part: total payload size %i\n'
1327 self.ui.debug('bundle2-input-part: total payload size %i\n'
1330 % self._pos)
1328 % self._pos)
1331 self.consumed = True
1329 self.consumed = True
1332 return data
1330 return data
1333
1331
1334 class seekableunbundlepart(unbundlepart):
1332 class seekableunbundlepart(unbundlepart):
1335 """A bundle2 part in a bundle that is seekable.
1333 """A bundle2 part in a bundle that is seekable.
1336
1334
1337 Regular ``unbundlepart`` instances can only be read once. This class
1335 Regular ``unbundlepart`` instances can only be read once. This class
1338 extends ``unbundlepart`` to enable bi-directional seeking within the
1336 extends ``unbundlepart`` to enable bi-directional seeking within the
1339 part.
1337 part.
1340
1338
1341 Bundle2 part data consists of framed chunks. Offsets when seeking
1339 Bundle2 part data consists of framed chunks. Offsets when seeking
1342 refer to the decoded data, not the offsets in the underlying bundle2
1340 refer to the decoded data, not the offsets in the underlying bundle2
1343 stream.
1341 stream.
1344
1342
1345 To facilitate quickly seeking within the decoded data, instances of this
1343 To facilitate quickly seeking within the decoded data, instances of this
1346 class maintain a mapping between offsets in the underlying stream and
1344 class maintain a mapping between offsets in the underlying stream and
1347 the decoded payload. This mapping will consume memory in proportion
1345 the decoded payload. This mapping will consume memory in proportion
1348 to the number of chunks within the payload (which almost certainly
1346 to the number of chunks within the payload (which almost certainly
1349 increases in proportion with the size of the part).
1347 increases in proportion with the size of the part).
1350 """
1348 """
1351 def __init__(self, ui, header, fp):
1349 def __init__(self, ui, header, fp):
1352 # (payload, file) offsets for chunk starts.
1350 # (payload, file) offsets for chunk starts.
1353 self._chunkindex = []
1351 self._chunkindex = []
1354
1352
1355 super(seekableunbundlepart, self).__init__(ui, header, fp)
1353 super(seekableunbundlepart, self).__init__(ui, header, fp)
1356
1354
1357 def _payloadchunks(self, chunknum=0):
1355 def _payloadchunks(self, chunknum=0):
1358 '''seek to specified chunk and start yielding data'''
1356 '''seek to specified chunk and start yielding data'''
1359 if len(self._chunkindex) == 0:
1357 if len(self._chunkindex) == 0:
1360 assert chunknum == 0, 'Must start with chunk 0'
1358 assert chunknum == 0, 'Must start with chunk 0'
1361 self._chunkindex.append((0, self._tellfp()))
1359 self._chunkindex.append((0, self._tellfp()))
1362 else:
1360 else:
1363 assert chunknum < len(self._chunkindex), \
1361 assert chunknum < len(self._chunkindex), \
1364 'Unknown chunk %d' % chunknum
1362 'Unknown chunk %d' % chunknum
1365 self._seekfp(self._chunkindex[chunknum][1])
1363 self._seekfp(self._chunkindex[chunknum][1])
1366
1364
1367 pos = self._chunkindex[chunknum][0]
1365 pos = self._chunkindex[chunknum][0]
1368
1366
1369 for chunk in decodepayloadchunks(self.ui, self._fp):
1367 for chunk in decodepayloadchunks(self.ui, self._fp):
1370 chunknum += 1
1368 chunknum += 1
1371 pos += len(chunk)
1369 pos += len(chunk)
1372 if chunknum == len(self._chunkindex):
1370 if chunknum == len(self._chunkindex):
1373 self._chunkindex.append((pos, self._tellfp()))
1371 self._chunkindex.append((pos, self._tellfp()))
1374
1372
1375 yield chunk
1373 yield chunk
1376
1374
1377 def _findchunk(self, pos):
1375 def _findchunk(self, pos):
1378 '''for a given payload position, return a chunk number and offset'''
1376 '''for a given payload position, return a chunk number and offset'''
1379 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1377 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1380 if ppos == pos:
1378 if ppos == pos:
1381 return chunk, 0
1379 return chunk, 0
1382 elif ppos > pos:
1380 elif ppos > pos:
1383 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1381 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1384 raise ValueError('Unknown chunk')
1382 raise ValueError('Unknown chunk')
1385
1383
1386 def tell(self):
1384 def tell(self):
1387 return self._pos
1385 return self._pos
1388
1386
1389 def seek(self, offset, whence=os.SEEK_SET):
1387 def seek(self, offset, whence=os.SEEK_SET):
1390 if whence == os.SEEK_SET:
1388 if whence == os.SEEK_SET:
1391 newpos = offset
1389 newpos = offset
1392 elif whence == os.SEEK_CUR:
1390 elif whence == os.SEEK_CUR:
1393 newpos = self._pos + offset
1391 newpos = self._pos + offset
1394 elif whence == os.SEEK_END:
1392 elif whence == os.SEEK_END:
1395 if not self.consumed:
1393 if not self.consumed:
1396 self.read()
1394 self.read()
1397 newpos = self._chunkindex[-1][0] - offset
1395 newpos = self._chunkindex[-1][0] - offset
1398 else:
1396 else:
1399 raise ValueError('Unknown whence value: %r' % (whence,))
1397 raise ValueError('Unknown whence value: %r' % (whence,))
1400
1398
1401 if newpos > self._chunkindex[-1][0] and not self.consumed:
1399 if newpos > self._chunkindex[-1][0] and not self.consumed:
1402 self.read()
1400 self.read()
1403 if not 0 <= newpos <= self._chunkindex[-1][0]:
1401 if not 0 <= newpos <= self._chunkindex[-1][0]:
1404 raise ValueError('Offset out of range')
1402 raise ValueError('Offset out of range')
1405
1403
1406 if self._pos != newpos:
1404 if self._pos != newpos:
1407 chunk, internaloffset = self._findchunk(newpos)
1405 chunk, internaloffset = self._findchunk(newpos)
1408 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1406 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1409 adjust = self.read(internaloffset)
1407 adjust = self.read(internaloffset)
1410 if len(adjust) != internaloffset:
1408 if len(adjust) != internaloffset:
1411 raise error.Abort(_('Seek failed\n'))
1409 raise error.Abort(_('Seek failed\n'))
1412 self._pos = newpos
1410 self._pos = newpos
1413
1411
1414 def _seekfp(self, offset, whence=0):
1412 def _seekfp(self, offset, whence=0):
1415 """move the underlying file pointer
1413 """move the underlying file pointer
1416
1414
1417 This method is meant for internal usage by the bundle2 protocol only.
1415 This method is meant for internal usage by the bundle2 protocol only.
1418 They directly manipulate the low level stream including bundle2 level
1416 They directly manipulate the low level stream including bundle2 level
1419 instruction.
1417 instruction.
1420
1418
1421 Do not use it to implement higher-level logic or methods."""
1419 Do not use it to implement higher-level logic or methods."""
1422 if self._seekable:
1420 if self._seekable:
1423 return self._fp.seek(offset, whence)
1421 return self._fp.seek(offset, whence)
1424 else:
1422 else:
1425 raise NotImplementedError(_('File pointer is not seekable'))
1423 raise NotImplementedError(_('File pointer is not seekable'))
1426
1424
1427 def _tellfp(self):
1425 def _tellfp(self):
1428 """return the file offset, or None if file is not seekable
1426 """return the file offset, or None if file is not seekable
1429
1427
1430 This method is meant for internal usage by the bundle2 protocol only.
1428 This method is meant for internal usage by the bundle2 protocol only.
1431 They directly manipulate the low level stream including bundle2 level
1429 They directly manipulate the low level stream including bundle2 level
1432 instruction.
1430 instruction.
1433
1431
1434 Do not use it to implement higher-level logic or methods."""
1432 Do not use it to implement higher-level logic or methods."""
1435 if self._seekable:
1433 if self._seekable:
1436 try:
1434 try:
1437 return self._fp.tell()
1435 return self._fp.tell()
1438 except IOError as e:
1436 except IOError as e:
1439 if e.errno == errno.ESPIPE:
1437 if e.errno == errno.ESPIPE:
1440 self._seekable = False
1438 self._seekable = False
1441 else:
1439 else:
1442 raise
1440 raise
1443 return None
1441 return None
1444
1442
1445 # These are only the static capabilities.
1443 # These are only the static capabilities.
1446 # Check the 'getrepocaps' function for the rest.
1444 # Check the 'getrepocaps' function for the rest.
1447 capabilities = {'HG20': (),
1445 capabilities = {'HG20': (),
1448 'error': ('abort', 'unsupportedcontent', 'pushraced',
1446 'error': ('abort', 'unsupportedcontent', 'pushraced',
1449 'pushkey'),
1447 'pushkey'),
1450 'listkeys': (),
1448 'listkeys': (),
1451 'pushkey': (),
1449 'pushkey': (),
1452 'digests': tuple(sorted(util.DIGESTS.keys())),
1450 'digests': tuple(sorted(util.DIGESTS.keys())),
1453 'remote-changegroup': ('http', 'https'),
1451 'remote-changegroup': ('http', 'https'),
1454 'hgtagsfnodes': (),
1452 'hgtagsfnodes': (),
1455 'phases': ('heads',),
1453 'phases': ('heads',),
1456 }
1454 }
1457
1455
1458 def getrepocaps(repo, allowpushback=False):
1456 def getrepocaps(repo, allowpushback=False):
1459 """return the bundle2 capabilities for a given repo
1457 """return the bundle2 capabilities for a given repo
1460
1458
1461 Exists to allow extensions (like evolution) to mutate the capabilities.
1459 Exists to allow extensions (like evolution) to mutate the capabilities.
1462 """
1460 """
1463 caps = capabilities.copy()
1461 caps = capabilities.copy()
1464 caps['changegroup'] = tuple(sorted(
1462 caps['changegroup'] = tuple(sorted(
1465 changegroup.supportedincomingversions(repo)))
1463 changegroup.supportedincomingversions(repo)))
1466 if obsolete.isenabled(repo, obsolete.exchangeopt):
1464 if obsolete.isenabled(repo, obsolete.exchangeopt):
1467 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1465 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1468 caps['obsmarkers'] = supportedformat
1466 caps['obsmarkers'] = supportedformat
1469 if allowpushback:
1467 if allowpushback:
1470 caps['pushback'] = ()
1468 caps['pushback'] = ()
1471 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1469 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1472 if cpmode == 'check-related':
1470 if cpmode == 'check-related':
1473 caps['checkheads'] = ('related',)
1471 caps['checkheads'] = ('related',)
1474 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1472 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1475 caps.pop('phases')
1473 caps.pop('phases')
1476 return caps
1474 return caps
1477
1475
1478 def bundle2caps(remote):
1476 def bundle2caps(remote):
1479 """return the bundle capabilities of a peer as dict"""
1477 """return the bundle capabilities of a peer as dict"""
1480 raw = remote.capable('bundle2')
1478 raw = remote.capable('bundle2')
1481 if not raw and raw != '':
1479 if not raw and raw != '':
1482 return {}
1480 return {}
1483 capsblob = urlreq.unquote(remote.capable('bundle2'))
1481 capsblob = urlreq.unquote(remote.capable('bundle2'))
1484 return decodecaps(capsblob)
1482 return decodecaps(capsblob)
1485
1483
1486 def obsmarkersversion(caps):
1484 def obsmarkersversion(caps):
1487 """extract the list of supported obsmarkers versions from a bundle2caps dict
1485 """extract the list of supported obsmarkers versions from a bundle2caps dict
1488 """
1486 """
1489 obscaps = caps.get('obsmarkers', ())
1487 obscaps = caps.get('obsmarkers', ())
1490 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1488 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1491
1489
1492 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1490 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1493 vfs=None, compression=None, compopts=None):
1491 vfs=None, compression=None, compopts=None):
1494 if bundletype.startswith('HG10'):
1492 if bundletype.startswith('HG10'):
1495 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1493 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1496 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1494 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1497 compression=compression, compopts=compopts)
1495 compression=compression, compopts=compopts)
1498 elif not bundletype.startswith('HG20'):
1496 elif not bundletype.startswith('HG20'):
1499 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1497 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1500
1498
1501 caps = {}
1499 caps = {}
1502 if 'obsolescence' in opts:
1500 if 'obsolescence' in opts:
1503 caps['obsmarkers'] = ('V1',)
1501 caps['obsmarkers'] = ('V1',)
1504 bundle = bundle20(ui, caps)
1502 bundle = bundle20(ui, caps)
1505 bundle.setcompression(compression, compopts)
1503 bundle.setcompression(compression, compopts)
1506 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1504 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1507 chunkiter = bundle.getchunks()
1505 chunkiter = bundle.getchunks()
1508
1506
1509 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1507 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1510
1508
1511 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1509 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1512 # We should eventually reconcile this logic with the one behind
1510 # We should eventually reconcile this logic with the one behind
1513 # 'exchange.getbundle2partsgenerator'.
1511 # 'exchange.getbundle2partsgenerator'.
1514 #
1512 #
1515 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1513 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1516 # different right now. So we keep them separated for now for the sake of
1514 # different right now. So we keep them separated for now for the sake of
1517 # simplicity.
1515 # simplicity.
1518
1516
1519 # we always want a changegroup in such bundle
1517 # we always want a changegroup in such bundle
1520 cgversion = opts.get('cg.version')
1518 cgversion = opts.get('cg.version')
1521 if cgversion is None:
1519 if cgversion is None:
1522 cgversion = changegroup.safeversion(repo)
1520 cgversion = changegroup.safeversion(repo)
1523 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1521 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1524 part = bundler.newpart('changegroup', data=cg.getchunks())
1522 part = bundler.newpart('changegroup', data=cg.getchunks())
1525 part.addparam('version', cg.version)
1523 part.addparam('version', cg.version)
1526 if 'clcount' in cg.extras:
1524 if 'clcount' in cg.extras:
1527 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1525 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1528 mandatory=False)
1526 mandatory=False)
1529 if opts.get('phases') and repo.revs('%ln and secret()',
1527 if opts.get('phases') and repo.revs('%ln and secret()',
1530 outgoing.missingheads):
1528 outgoing.missingheads):
1531 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1529 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1532
1530
1533 addparttagsfnodescache(repo, bundler, outgoing)
1531 addparttagsfnodescache(repo, bundler, outgoing)
1534
1532
1535 if opts.get('obsolescence', False):
1533 if opts.get('obsolescence', False):
1536 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1534 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1537 buildobsmarkerspart(bundler, obsmarkers)
1535 buildobsmarkerspart(bundler, obsmarkers)
1538
1536
1539 if opts.get('phases', False):
1537 if opts.get('phases', False):
1540 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1538 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1541 phasedata = phases.binaryencode(headsbyphase)
1539 phasedata = phases.binaryencode(headsbyphase)
1542 bundler.newpart('phase-heads', data=phasedata)
1540 bundler.newpart('phase-heads', data=phasedata)
1543
1541
1544 def addparttagsfnodescache(repo, bundler, outgoing):
1542 def addparttagsfnodescache(repo, bundler, outgoing):
1545 # we include the tags fnode cache for the bundle changeset
1543 # we include the tags fnode cache for the bundle changeset
1546 # (as an optional parts)
1544 # (as an optional parts)
1547 cache = tags.hgtagsfnodescache(repo.unfiltered())
1545 cache = tags.hgtagsfnodescache(repo.unfiltered())
1548 chunks = []
1546 chunks = []
1549
1547
1550 # .hgtags fnodes are only relevant for head changesets. While we could
1548 # .hgtags fnodes are only relevant for head changesets. While we could
1551 # transfer values for all known nodes, there will likely be little to
1549 # transfer values for all known nodes, there will likely be little to
1552 # no benefit.
1550 # no benefit.
1553 #
1551 #
1554 # We don't bother using a generator to produce output data because
1552 # We don't bother using a generator to produce output data because
1555 # a) we only have 40 bytes per head and even esoteric numbers of heads
1553 # a) we only have 40 bytes per head and even esoteric numbers of heads
1556 # consume little memory (1M heads is 40MB) b) we don't want to send the
1554 # consume little memory (1M heads is 40MB) b) we don't want to send the
1557 # part if we don't have entries and knowing if we have entries requires
1555 # part if we don't have entries and knowing if we have entries requires
1558 # cache lookups.
1556 # cache lookups.
1559 for node in outgoing.missingheads:
1557 for node in outgoing.missingheads:
1560 # Don't compute missing, as this may slow down serving.
1558 # Don't compute missing, as this may slow down serving.
1561 fnode = cache.getfnode(node, computemissing=False)
1559 fnode = cache.getfnode(node, computemissing=False)
1562 if fnode is not None:
1560 if fnode is not None:
1563 chunks.extend([node, fnode])
1561 chunks.extend([node, fnode])
1564
1562
1565 if chunks:
1563 if chunks:
1566 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1564 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1567
1565
1568 def buildobsmarkerspart(bundler, markers):
1566 def buildobsmarkerspart(bundler, markers):
1569 """add an obsmarker part to the bundler with <markers>
1567 """add an obsmarker part to the bundler with <markers>
1570
1568
1571 No part is created if markers is empty.
1569 No part is created if markers is empty.
1572 Raises ValueError if the bundler doesn't support any known obsmarker format.
1570 Raises ValueError if the bundler doesn't support any known obsmarker format.
1573 """
1571 """
1574 if not markers:
1572 if not markers:
1575 return None
1573 return None
1576
1574
1577 remoteversions = obsmarkersversion(bundler.capabilities)
1575 remoteversions = obsmarkersversion(bundler.capabilities)
1578 version = obsolete.commonversion(remoteversions)
1576 version = obsolete.commonversion(remoteversions)
1579 if version is None:
1577 if version is None:
1580 raise ValueError('bundler does not support common obsmarker format')
1578 raise ValueError('bundler does not support common obsmarker format')
1581 stream = obsolete.encodemarkers(markers, True, version=version)
1579 stream = obsolete.encodemarkers(markers, True, version=version)
1582 return bundler.newpart('obsmarkers', data=stream)
1580 return bundler.newpart('obsmarkers', data=stream)
1583
1581
1584 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1582 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1585 compopts=None):
1583 compopts=None):
1586 """Write a bundle file and return its filename.
1584 """Write a bundle file and return its filename.
1587
1585
1588 Existing files will not be overwritten.
1586 Existing files will not be overwritten.
1589 If no filename is specified, a temporary file is created.
1587 If no filename is specified, a temporary file is created.
1590 bz2 compression can be turned off.
1588 bz2 compression can be turned off.
1591 The bundle file will be deleted in case of errors.
1589 The bundle file will be deleted in case of errors.
1592 """
1590 """
1593
1591
1594 if bundletype == "HG20":
1592 if bundletype == "HG20":
1595 bundle = bundle20(ui)
1593 bundle = bundle20(ui)
1596 bundle.setcompression(compression, compopts)
1594 bundle.setcompression(compression, compopts)
1597 part = bundle.newpart('changegroup', data=cg.getchunks())
1595 part = bundle.newpart('changegroup', data=cg.getchunks())
1598 part.addparam('version', cg.version)
1596 part.addparam('version', cg.version)
1599 if 'clcount' in cg.extras:
1597 if 'clcount' in cg.extras:
1600 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1598 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1601 mandatory=False)
1599 mandatory=False)
1602 chunkiter = bundle.getchunks()
1600 chunkiter = bundle.getchunks()
1603 else:
1601 else:
1604 # compression argument is only for the bundle2 case
1602 # compression argument is only for the bundle2 case
1605 assert compression is None
1603 assert compression is None
1606 if cg.version != '01':
1604 if cg.version != '01':
1607 raise error.Abort(_('old bundle types only supports v1 '
1605 raise error.Abort(_('old bundle types only supports v1 '
1608 'changegroups'))
1606 'changegroups'))
1609 header, comp = bundletypes[bundletype]
1607 header, comp = bundletypes[bundletype]
1610 if comp not in util.compengines.supportedbundletypes:
1608 if comp not in util.compengines.supportedbundletypes:
1611 raise error.Abort(_('unknown stream compression type: %s')
1609 raise error.Abort(_('unknown stream compression type: %s')
1612 % comp)
1610 % comp)
1613 compengine = util.compengines.forbundletype(comp)
1611 compengine = util.compengines.forbundletype(comp)
1614 def chunkiter():
1612 def chunkiter():
1615 yield header
1613 yield header
1616 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1614 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1617 yield chunk
1615 yield chunk
1618 chunkiter = chunkiter()
1616 chunkiter = chunkiter()
1619
1617
1620 # parse the changegroup data, otherwise we will block
1618 # parse the changegroup data, otherwise we will block
1621 # in case of sshrepo because we don't know the end of the stream
1619 # in case of sshrepo because we don't know the end of the stream
1622 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1620 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1623
1621
1624 def combinechangegroupresults(op):
1622 def combinechangegroupresults(op):
1625 """logic to combine 0 or more addchangegroup results into one"""
1623 """logic to combine 0 or more addchangegroup results into one"""
1626 results = [r.get('return', 0)
1624 results = [r.get('return', 0)
1627 for r in op.records['changegroup']]
1625 for r in op.records['changegroup']]
1628 changedheads = 0
1626 changedheads = 0
1629 result = 1
1627 result = 1
1630 for ret in results:
1628 for ret in results:
1631 # If any changegroup result is 0, return 0
1629 # If any changegroup result is 0, return 0
1632 if ret == 0:
1630 if ret == 0:
1633 result = 0
1631 result = 0
1634 break
1632 break
1635 if ret < -1:
1633 if ret < -1:
1636 changedheads += ret + 1
1634 changedheads += ret + 1
1637 elif ret > 1:
1635 elif ret > 1:
1638 changedheads += ret - 1
1636 changedheads += ret - 1
1639 if changedheads > 0:
1637 if changedheads > 0:
1640 result = 1 + changedheads
1638 result = 1 + changedheads
1641 elif changedheads < 0:
1639 elif changedheads < 0:
1642 result = -1 + changedheads
1640 result = -1 + changedheads
1643 return result
1641 return result
1644
1642
1645 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1643 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1646 'targetphase'))
1644 'targetphase'))
1647 def handlechangegroup(op, inpart):
1645 def handlechangegroup(op, inpart):
1648 """apply a changegroup part on the repo
1646 """apply a changegroup part on the repo
1649
1647
1650 This is a very early implementation that will massive rework before being
1648 This is a very early implementation that will massive rework before being
1651 inflicted to any end-user.
1649 inflicted to any end-user.
1652 """
1650 """
1653 tr = op.gettransaction()
1651 tr = op.gettransaction()
1654 unpackerversion = inpart.params.get('version', '01')
1652 unpackerversion = inpart.params.get('version', '01')
1655 # We should raise an appropriate exception here
1653 # We should raise an appropriate exception here
1656 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1654 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1657 # the source and url passed here are overwritten by the one contained in
1655 # the source and url passed here are overwritten by the one contained in
1658 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1656 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1659 nbchangesets = None
1657 nbchangesets = None
1660 if 'nbchanges' in inpart.params:
1658 if 'nbchanges' in inpart.params:
1661 nbchangesets = int(inpart.params.get('nbchanges'))
1659 nbchangesets = int(inpart.params.get('nbchanges'))
1662 if ('treemanifest' in inpart.params and
1660 if ('treemanifest' in inpart.params and
1663 'treemanifest' not in op.repo.requirements):
1661 'treemanifest' not in op.repo.requirements):
1664 if len(op.repo.changelog) != 0:
1662 if len(op.repo.changelog) != 0:
1665 raise error.Abort(_(
1663 raise error.Abort(_(
1666 "bundle contains tree manifests, but local repo is "
1664 "bundle contains tree manifests, but local repo is "
1667 "non-empty and does not use tree manifests"))
1665 "non-empty and does not use tree manifests"))
1668 op.repo.requirements.add('treemanifest')
1666 op.repo.requirements.add('treemanifest')
1669 op.repo._applyopenerreqs()
1667 op.repo._applyopenerreqs()
1670 op.repo._writerequirements()
1668 op.repo._writerequirements()
1671 extrakwargs = {}
1669 extrakwargs = {}
1672 targetphase = inpart.params.get('targetphase')
1670 targetphase = inpart.params.get('targetphase')
1673 if targetphase is not None:
1671 if targetphase is not None:
1674 extrakwargs['targetphase'] = int(targetphase)
1672 extrakwargs['targetphase'] = int(targetphase)
1675 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1673 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1676 expectedtotal=nbchangesets, **extrakwargs)
1674 expectedtotal=nbchangesets, **extrakwargs)
1677 if op.reply is not None:
1675 if op.reply is not None:
1678 # This is definitely not the final form of this
1676 # This is definitely not the final form of this
1679 # return. But one need to start somewhere.
1677 # return. But one need to start somewhere.
1680 part = op.reply.newpart('reply:changegroup', mandatory=False)
1678 part = op.reply.newpart('reply:changegroup', mandatory=False)
1681 part.addparam(
1679 part.addparam(
1682 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1680 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1683 part.addparam('return', '%i' % ret, mandatory=False)
1681 part.addparam('return', '%i' % ret, mandatory=False)
1684 assert not inpart.read()
1682 assert not inpart.read()
1685
1683
1686 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1684 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1687 ['digest:%s' % k for k in util.DIGESTS.keys()])
1685 ['digest:%s' % k for k in util.DIGESTS.keys()])
1688 @parthandler('remote-changegroup', _remotechangegroupparams)
1686 @parthandler('remote-changegroup', _remotechangegroupparams)
1689 def handleremotechangegroup(op, inpart):
1687 def handleremotechangegroup(op, inpart):
1690 """apply a bundle10 on the repo, given an url and validation information
1688 """apply a bundle10 on the repo, given an url and validation information
1691
1689
1692 All the information about the remote bundle to import are given as
1690 All the information about the remote bundle to import are given as
1693 parameters. The parameters include:
1691 parameters. The parameters include:
1694 - url: the url to the bundle10.
1692 - url: the url to the bundle10.
1695 - size: the bundle10 file size. It is used to validate what was
1693 - size: the bundle10 file size. It is used to validate what was
1696 retrieved by the client matches the server knowledge about the bundle.
1694 retrieved by the client matches the server knowledge about the bundle.
1697 - digests: a space separated list of the digest types provided as
1695 - digests: a space separated list of the digest types provided as
1698 parameters.
1696 parameters.
1699 - digest:<digest-type>: the hexadecimal representation of the digest with
1697 - digest:<digest-type>: the hexadecimal representation of the digest with
1700 that name. Like the size, it is used to validate what was retrieved by
1698 that name. Like the size, it is used to validate what was retrieved by
1701 the client matches what the server knows about the bundle.
1699 the client matches what the server knows about the bundle.
1702
1700
1703 When multiple digest types are given, all of them are checked.
1701 When multiple digest types are given, all of them are checked.
1704 """
1702 """
1705 try:
1703 try:
1706 raw_url = inpart.params['url']
1704 raw_url = inpart.params['url']
1707 except KeyError:
1705 except KeyError:
1708 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1706 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1709 parsed_url = util.url(raw_url)
1707 parsed_url = util.url(raw_url)
1710 if parsed_url.scheme not in capabilities['remote-changegroup']:
1708 if parsed_url.scheme not in capabilities['remote-changegroup']:
1711 raise error.Abort(_('remote-changegroup does not support %s urls') %
1709 raise error.Abort(_('remote-changegroup does not support %s urls') %
1712 parsed_url.scheme)
1710 parsed_url.scheme)
1713
1711
1714 try:
1712 try:
1715 size = int(inpart.params['size'])
1713 size = int(inpart.params['size'])
1716 except ValueError:
1714 except ValueError:
1717 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1715 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1718 % 'size')
1716 % 'size')
1719 except KeyError:
1717 except KeyError:
1720 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1718 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1721
1719
1722 digests = {}
1720 digests = {}
1723 for typ in inpart.params.get('digests', '').split():
1721 for typ in inpart.params.get('digests', '').split():
1724 param = 'digest:%s' % typ
1722 param = 'digest:%s' % typ
1725 try:
1723 try:
1726 value = inpart.params[param]
1724 value = inpart.params[param]
1727 except KeyError:
1725 except KeyError:
1728 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1726 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1729 param)
1727 param)
1730 digests[typ] = value
1728 digests[typ] = value
1731
1729
1732 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1730 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1733
1731
1734 tr = op.gettransaction()
1732 tr = op.gettransaction()
1735 from . import exchange
1733 from . import exchange
1736 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1734 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1737 if not isinstance(cg, changegroup.cg1unpacker):
1735 if not isinstance(cg, changegroup.cg1unpacker):
1738 raise error.Abort(_('%s: not a bundle version 1.0') %
1736 raise error.Abort(_('%s: not a bundle version 1.0') %
1739 util.hidepassword(raw_url))
1737 util.hidepassword(raw_url))
1740 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1738 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1741 if op.reply is not None:
1739 if op.reply is not None:
1742 # This is definitely not the final form of this
1740 # This is definitely not the final form of this
1743 # return. But one need to start somewhere.
1741 # return. But one need to start somewhere.
1744 part = op.reply.newpart('reply:changegroup')
1742 part = op.reply.newpart('reply:changegroup')
1745 part.addparam(
1743 part.addparam(
1746 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1744 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1747 part.addparam('return', '%i' % ret, mandatory=False)
1745 part.addparam('return', '%i' % ret, mandatory=False)
1748 try:
1746 try:
1749 real_part.validate()
1747 real_part.validate()
1750 except error.Abort as e:
1748 except error.Abort as e:
1751 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1749 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1752 (util.hidepassword(raw_url), str(e)))
1750 (util.hidepassword(raw_url), str(e)))
1753 assert not inpart.read()
1751 assert not inpart.read()
1754
1752
1755 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1753 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1756 def handlereplychangegroup(op, inpart):
1754 def handlereplychangegroup(op, inpart):
1757 ret = int(inpart.params['return'])
1755 ret = int(inpart.params['return'])
1758 replyto = int(inpart.params['in-reply-to'])
1756 replyto = int(inpart.params['in-reply-to'])
1759 op.records.add('changegroup', {'return': ret}, replyto)
1757 op.records.add('changegroup', {'return': ret}, replyto)
1760
1758
1761 @parthandler('check:heads')
1759 @parthandler('check:heads')
1762 def handlecheckheads(op, inpart):
1760 def handlecheckheads(op, inpart):
1763 """check that head of the repo did not change
1761 """check that head of the repo did not change
1764
1762
1765 This is used to detect a push race when using unbundle.
1763 This is used to detect a push race when using unbundle.
1766 This replaces the "heads" argument of unbundle."""
1764 This replaces the "heads" argument of unbundle."""
1767 h = inpart.read(20)
1765 h = inpart.read(20)
1768 heads = []
1766 heads = []
1769 while len(h) == 20:
1767 while len(h) == 20:
1770 heads.append(h)
1768 heads.append(h)
1771 h = inpart.read(20)
1769 h = inpart.read(20)
1772 assert not h
1770 assert not h
1773 # Trigger a transaction so that we are guaranteed to have the lock now.
1771 # Trigger a transaction so that we are guaranteed to have the lock now.
1774 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1772 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1775 op.gettransaction()
1773 op.gettransaction()
1776 if sorted(heads) != sorted(op.repo.heads()):
1774 if sorted(heads) != sorted(op.repo.heads()):
1777 raise error.PushRaced('repository changed while pushing - '
1775 raise error.PushRaced('repository changed while pushing - '
1778 'please try again')
1776 'please try again')
1779
1777
1780 @parthandler('check:updated-heads')
1778 @parthandler('check:updated-heads')
1781 def handlecheckupdatedheads(op, inpart):
1779 def handlecheckupdatedheads(op, inpart):
1782 """check for race on the heads touched by a push
1780 """check for race on the heads touched by a push
1783
1781
1784 This is similar to 'check:heads' but focus on the heads actually updated
1782 This is similar to 'check:heads' but focus on the heads actually updated
1785 during the push. If other activities happen on unrelated heads, it is
1783 during the push. If other activities happen on unrelated heads, it is
1786 ignored.
1784 ignored.
1787
1785
1788 This allow server with high traffic to avoid push contention as long as
1786 This allow server with high traffic to avoid push contention as long as
1789 unrelated parts of the graph are involved."""
1787 unrelated parts of the graph are involved."""
1790 h = inpart.read(20)
1788 h = inpart.read(20)
1791 heads = []
1789 heads = []
1792 while len(h) == 20:
1790 while len(h) == 20:
1793 heads.append(h)
1791 heads.append(h)
1794 h = inpart.read(20)
1792 h = inpart.read(20)
1795 assert not h
1793 assert not h
1796 # trigger a transaction so that we are guaranteed to have the lock now.
1794 # trigger a transaction so that we are guaranteed to have the lock now.
1797 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1795 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1798 op.gettransaction()
1796 op.gettransaction()
1799
1797
1800 currentheads = set()
1798 currentheads = set()
1801 for ls in op.repo.branchmap().itervalues():
1799 for ls in op.repo.branchmap().itervalues():
1802 currentheads.update(ls)
1800 currentheads.update(ls)
1803
1801
1804 for h in heads:
1802 for h in heads:
1805 if h not in currentheads:
1803 if h not in currentheads:
1806 raise error.PushRaced('repository changed while pushing - '
1804 raise error.PushRaced('repository changed while pushing - '
1807 'please try again')
1805 'please try again')
1808
1806
1809 @parthandler('check:phases')
1807 @parthandler('check:phases')
1810 def handlecheckphases(op, inpart):
1808 def handlecheckphases(op, inpart):
1811 """check that phase boundaries of the repository did not change
1809 """check that phase boundaries of the repository did not change
1812
1810
1813 This is used to detect a push race.
1811 This is used to detect a push race.
1814 """
1812 """
1815 phasetonodes = phases.binarydecode(inpart)
1813 phasetonodes = phases.binarydecode(inpart)
1816 unfi = op.repo.unfiltered()
1814 unfi = op.repo.unfiltered()
1817 cl = unfi.changelog
1815 cl = unfi.changelog
1818 phasecache = unfi._phasecache
1816 phasecache = unfi._phasecache
1819 msg = ('repository changed while pushing - please try again '
1817 msg = ('repository changed while pushing - please try again '
1820 '(%s is %s expected %s)')
1818 '(%s is %s expected %s)')
1821 for expectedphase, nodes in enumerate(phasetonodes):
1819 for expectedphase, nodes in enumerate(phasetonodes):
1822 for n in nodes:
1820 for n in nodes:
1823 actualphase = phasecache.phase(unfi, cl.rev(n))
1821 actualphase = phasecache.phase(unfi, cl.rev(n))
1824 if actualphase != expectedphase:
1822 if actualphase != expectedphase:
1825 finalmsg = msg % (nodemod.short(n),
1823 finalmsg = msg % (nodemod.short(n),
1826 phases.phasenames[actualphase],
1824 phases.phasenames[actualphase],
1827 phases.phasenames[expectedphase])
1825 phases.phasenames[expectedphase])
1828 raise error.PushRaced(finalmsg)
1826 raise error.PushRaced(finalmsg)
1829
1827
1830 @parthandler('output')
1828 @parthandler('output')
1831 def handleoutput(op, inpart):
1829 def handleoutput(op, inpart):
1832 """forward output captured on the server to the client"""
1830 """forward output captured on the server to the client"""
1833 for line in inpart.read().splitlines():
1831 for line in inpart.read().splitlines():
1834 op.ui.status(_('remote: %s\n') % line)
1832 op.ui.status(_('remote: %s\n') % line)
1835
1833
1836 @parthandler('replycaps')
1834 @parthandler('replycaps')
1837 def handlereplycaps(op, inpart):
1835 def handlereplycaps(op, inpart):
1838 """Notify that a reply bundle should be created
1836 """Notify that a reply bundle should be created
1839
1837
1840 The payload contains the capabilities information for the reply"""
1838 The payload contains the capabilities information for the reply"""
1841 caps = decodecaps(inpart.read())
1839 caps = decodecaps(inpart.read())
1842 if op.reply is None:
1840 if op.reply is None:
1843 op.reply = bundle20(op.ui, caps)
1841 op.reply = bundle20(op.ui, caps)
1844
1842
1845 class AbortFromPart(error.Abort):
1843 class AbortFromPart(error.Abort):
1846 """Sub-class of Abort that denotes an error from a bundle2 part."""
1844 """Sub-class of Abort that denotes an error from a bundle2 part."""
1847
1845
1848 @parthandler('error:abort', ('message', 'hint'))
1846 @parthandler('error:abort', ('message', 'hint'))
1849 def handleerrorabort(op, inpart):
1847 def handleerrorabort(op, inpart):
1850 """Used to transmit abort error over the wire"""
1848 """Used to transmit abort error over the wire"""
1851 raise AbortFromPart(inpart.params['message'],
1849 raise AbortFromPart(inpart.params['message'],
1852 hint=inpart.params.get('hint'))
1850 hint=inpart.params.get('hint'))
1853
1851
1854 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1852 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1855 'in-reply-to'))
1853 'in-reply-to'))
1856 def handleerrorpushkey(op, inpart):
1854 def handleerrorpushkey(op, inpart):
1857 """Used to transmit failure of a mandatory pushkey over the wire"""
1855 """Used to transmit failure of a mandatory pushkey over the wire"""
1858 kwargs = {}
1856 kwargs = {}
1859 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1857 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1860 value = inpart.params.get(name)
1858 value = inpart.params.get(name)
1861 if value is not None:
1859 if value is not None:
1862 kwargs[name] = value
1860 kwargs[name] = value
1863 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1861 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1864
1862
1865 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1863 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1866 def handleerrorunsupportedcontent(op, inpart):
1864 def handleerrorunsupportedcontent(op, inpart):
1867 """Used to transmit unknown content error over the wire"""
1865 """Used to transmit unknown content error over the wire"""
1868 kwargs = {}
1866 kwargs = {}
1869 parttype = inpart.params.get('parttype')
1867 parttype = inpart.params.get('parttype')
1870 if parttype is not None:
1868 if parttype is not None:
1871 kwargs['parttype'] = parttype
1869 kwargs['parttype'] = parttype
1872 params = inpart.params.get('params')
1870 params = inpart.params.get('params')
1873 if params is not None:
1871 if params is not None:
1874 kwargs['params'] = params.split('\0')
1872 kwargs['params'] = params.split('\0')
1875
1873
1876 raise error.BundleUnknownFeatureError(**kwargs)
1874 raise error.BundleUnknownFeatureError(**kwargs)
1877
1875
1878 @parthandler('error:pushraced', ('message',))
1876 @parthandler('error:pushraced', ('message',))
1879 def handleerrorpushraced(op, inpart):
1877 def handleerrorpushraced(op, inpart):
1880 """Used to transmit push race error over the wire"""
1878 """Used to transmit push race error over the wire"""
1881 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1879 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1882
1880
1883 @parthandler('listkeys', ('namespace',))
1881 @parthandler('listkeys', ('namespace',))
1884 def handlelistkeys(op, inpart):
1882 def handlelistkeys(op, inpart):
1885 """retrieve pushkey namespace content stored in a bundle2"""
1883 """retrieve pushkey namespace content stored in a bundle2"""
1886 namespace = inpart.params['namespace']
1884 namespace = inpart.params['namespace']
1887 r = pushkey.decodekeys(inpart.read())
1885 r = pushkey.decodekeys(inpart.read())
1888 op.records.add('listkeys', (namespace, r))
1886 op.records.add('listkeys', (namespace, r))
1889
1887
1890 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1888 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1891 def handlepushkey(op, inpart):
1889 def handlepushkey(op, inpart):
1892 """process a pushkey request"""
1890 """process a pushkey request"""
1893 dec = pushkey.decode
1891 dec = pushkey.decode
1894 namespace = dec(inpart.params['namespace'])
1892 namespace = dec(inpart.params['namespace'])
1895 key = dec(inpart.params['key'])
1893 key = dec(inpart.params['key'])
1896 old = dec(inpart.params['old'])
1894 old = dec(inpart.params['old'])
1897 new = dec(inpart.params['new'])
1895 new = dec(inpart.params['new'])
1898 # Grab the transaction to ensure that we have the lock before performing the
1896 # Grab the transaction to ensure that we have the lock before performing the
1899 # pushkey.
1897 # pushkey.
1900 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1898 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1901 op.gettransaction()
1899 op.gettransaction()
1902 ret = op.repo.pushkey(namespace, key, old, new)
1900 ret = op.repo.pushkey(namespace, key, old, new)
1903 record = {'namespace': namespace,
1901 record = {'namespace': namespace,
1904 'key': key,
1902 'key': key,
1905 'old': old,
1903 'old': old,
1906 'new': new}
1904 'new': new}
1907 op.records.add('pushkey', record)
1905 op.records.add('pushkey', record)
1908 if op.reply is not None:
1906 if op.reply is not None:
1909 rpart = op.reply.newpart('reply:pushkey')
1907 rpart = op.reply.newpart('reply:pushkey')
1910 rpart.addparam(
1908 rpart.addparam(
1911 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1909 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1912 rpart.addparam('return', '%i' % ret, mandatory=False)
1910 rpart.addparam('return', '%i' % ret, mandatory=False)
1913 if inpart.mandatory and not ret:
1911 if inpart.mandatory and not ret:
1914 kwargs = {}
1912 kwargs = {}
1915 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1913 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1916 if key in inpart.params:
1914 if key in inpart.params:
1917 kwargs[key] = inpart.params[key]
1915 kwargs[key] = inpart.params[key]
1918 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1916 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1919
1917
1920 @parthandler('phase-heads')
1918 @parthandler('phase-heads')
1921 def handlephases(op, inpart):
1919 def handlephases(op, inpart):
1922 """apply phases from bundle part to repo"""
1920 """apply phases from bundle part to repo"""
1923 headsbyphase = phases.binarydecode(inpart)
1921 headsbyphase = phases.binarydecode(inpart)
1924 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
1922 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
1925
1923
1926 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1924 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1927 def handlepushkeyreply(op, inpart):
1925 def handlepushkeyreply(op, inpart):
1928 """retrieve the result of a pushkey request"""
1926 """retrieve the result of a pushkey request"""
1929 ret = int(inpart.params['return'])
1927 ret = int(inpart.params['return'])
1930 partid = int(inpart.params['in-reply-to'])
1928 partid = int(inpart.params['in-reply-to'])
1931 op.records.add('pushkey', {'return': ret}, partid)
1929 op.records.add('pushkey', {'return': ret}, partid)
1932
1930
1933 @parthandler('obsmarkers')
1931 @parthandler('obsmarkers')
1934 def handleobsmarker(op, inpart):
1932 def handleobsmarker(op, inpart):
1935 """add a stream of obsmarkers to the repo"""
1933 """add a stream of obsmarkers to the repo"""
1936 tr = op.gettransaction()
1934 tr = op.gettransaction()
1937 markerdata = inpart.read()
1935 markerdata = inpart.read()
1938 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1936 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1939 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1937 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1940 % len(markerdata))
1938 % len(markerdata))
1941 # The mergemarkers call will crash if marker creation is not enabled.
1939 # The mergemarkers call will crash if marker creation is not enabled.
1942 # we want to avoid this if the part is advisory.
1940 # we want to avoid this if the part is advisory.
1943 if not inpart.mandatory and op.repo.obsstore.readonly:
1941 if not inpart.mandatory and op.repo.obsstore.readonly:
1944 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1942 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1945 return
1943 return
1946 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1944 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1947 op.repo.invalidatevolatilesets()
1945 op.repo.invalidatevolatilesets()
1948 if new:
1946 if new:
1949 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1947 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1950 op.records.add('obsmarkers', {'new': new})
1948 op.records.add('obsmarkers', {'new': new})
1951 if op.reply is not None:
1949 if op.reply is not None:
1952 rpart = op.reply.newpart('reply:obsmarkers')
1950 rpart = op.reply.newpart('reply:obsmarkers')
1953 rpart.addparam(
1951 rpart.addparam(
1954 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1952 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1955 rpart.addparam('new', '%i' % new, mandatory=False)
1953 rpart.addparam('new', '%i' % new, mandatory=False)
1956
1954
1957
1955
1958 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1956 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1959 def handleobsmarkerreply(op, inpart):
1957 def handleobsmarkerreply(op, inpart):
1960 """retrieve the result of a pushkey request"""
1958 """retrieve the result of a pushkey request"""
1961 ret = int(inpart.params['new'])
1959 ret = int(inpart.params['new'])
1962 partid = int(inpart.params['in-reply-to'])
1960 partid = int(inpart.params['in-reply-to'])
1963 op.records.add('obsmarkers', {'new': ret}, partid)
1961 op.records.add('obsmarkers', {'new': ret}, partid)
1964
1962
1965 @parthandler('hgtagsfnodes')
1963 @parthandler('hgtagsfnodes')
1966 def handlehgtagsfnodes(op, inpart):
1964 def handlehgtagsfnodes(op, inpart):
1967 """Applies .hgtags fnodes cache entries to the local repo.
1965 """Applies .hgtags fnodes cache entries to the local repo.
1968
1966
1969 Payload is pairs of 20 byte changeset nodes and filenodes.
1967 Payload is pairs of 20 byte changeset nodes and filenodes.
1970 """
1968 """
1971 # Grab the transaction so we ensure that we have the lock at this point.
1969 # Grab the transaction so we ensure that we have the lock at this point.
1972 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1970 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1973 op.gettransaction()
1971 op.gettransaction()
1974 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1972 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1975
1973
1976 count = 0
1974 count = 0
1977 while True:
1975 while True:
1978 node = inpart.read(20)
1976 node = inpart.read(20)
1979 fnode = inpart.read(20)
1977 fnode = inpart.read(20)
1980 if len(node) < 20 or len(fnode) < 20:
1978 if len(node) < 20 or len(fnode) < 20:
1981 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1979 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1982 break
1980 break
1983 cache.setfnode(node, fnode)
1981 cache.setfnode(node, fnode)
1984 count += 1
1982 count += 1
1985
1983
1986 cache.write()
1984 cache.write()
1987 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1985 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1988
1986
1989 @parthandler('pushvars')
1987 @parthandler('pushvars')
1990 def bundle2getvars(op, part):
1988 def bundle2getvars(op, part):
1991 '''unbundle a bundle2 containing shellvars on the server'''
1989 '''unbundle a bundle2 containing shellvars on the server'''
1992 # An option to disable unbundling on server-side for security reasons
1990 # An option to disable unbundling on server-side for security reasons
1993 if op.ui.configbool('push', 'pushvars.server'):
1991 if op.ui.configbool('push', 'pushvars.server'):
1994 hookargs = {}
1992 hookargs = {}
1995 for key, value in part.advisoryparams:
1993 for key, value in part.advisoryparams:
1996 key = key.upper()
1994 key = key.upper()
1997 # We want pushed variables to have USERVAR_ prepended so we know
1995 # We want pushed variables to have USERVAR_ prepended so we know
1998 # they came from the --pushvar flag.
1996 # they came from the --pushvar flag.
1999 key = "USERVAR_" + key
1997 key = "USERVAR_" + key
2000 hookargs[key] = value
1998 hookargs[key] = value
2001 op.addhookargs(hookargs)
1999 op.addhookargs(hookargs)
@@ -1,590 +1,598
1 # bundlerepo.py - repository class for viewing uncompressed bundles
1 # bundlerepo.py - repository class for viewing uncompressed bundles
2 #
2 #
3 # Copyright 2006, 2007 Benoit Boissinot <bboissin@gmail.com>
3 # Copyright 2006, 2007 Benoit Boissinot <bboissin@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 """Repository class for viewing uncompressed bundles.
8 """Repository class for viewing uncompressed bundles.
9
9
10 This provides a read-only repository interface to bundles as if they
10 This provides a read-only repository interface to bundles as if they
11 were part of the actual repository.
11 were part of the actual repository.
12 """
12 """
13
13
14 from __future__ import absolute_import
14 from __future__ import absolute_import
15
15
16 import os
16 import os
17 import shutil
17 import shutil
18 import tempfile
18 import tempfile
19
19
20 from .i18n import _
20 from .i18n import _
21 from .node import nullid
21 from .node import nullid
22
22
23 from . import (
23 from . import (
24 bundle2,
24 bundle2,
25 changegroup,
25 changegroup,
26 changelog,
26 changelog,
27 cmdutil,
27 cmdutil,
28 discovery,
28 discovery,
29 error,
29 error,
30 exchange,
30 exchange,
31 filelog,
31 filelog,
32 localrepo,
32 localrepo,
33 manifest,
33 manifest,
34 mdiff,
34 mdiff,
35 node as nodemod,
35 node as nodemod,
36 pathutil,
36 pathutil,
37 phases,
37 phases,
38 pycompat,
38 pycompat,
39 revlog,
39 revlog,
40 util,
40 util,
41 vfs as vfsmod,
41 vfs as vfsmod,
42 )
42 )
43
43
44 class bundlerevlog(revlog.revlog):
44 class bundlerevlog(revlog.revlog):
45 def __init__(self, opener, indexfile, cgunpacker, linkmapper):
45 def __init__(self, opener, indexfile, cgunpacker, linkmapper):
46 # How it works:
46 # How it works:
47 # To retrieve a revision, we need to know the offset of the revision in
47 # To retrieve a revision, we need to know the offset of the revision in
48 # the bundle (an unbundle object). We store this offset in the index
48 # the bundle (an unbundle object). We store this offset in the index
49 # (start). The base of the delta is stored in the base field.
49 # (start). The base of the delta is stored in the base field.
50 #
50 #
51 # To differentiate a rev in the bundle from a rev in the revlog, we
51 # To differentiate a rev in the bundle from a rev in the revlog, we
52 # check revision against repotiprev.
52 # check revision against repotiprev.
53 opener = vfsmod.readonlyvfs(opener)
53 opener = vfsmod.readonlyvfs(opener)
54 revlog.revlog.__init__(self, opener, indexfile)
54 revlog.revlog.__init__(self, opener, indexfile)
55 self.bundle = cgunpacker
55 self.bundle = cgunpacker
56 n = len(self)
56 n = len(self)
57 self.repotiprev = n - 1
57 self.repotiprev = n - 1
58 self.bundlerevs = set() # used by 'bundle()' revset expression
58 self.bundlerevs = set() # used by 'bundle()' revset expression
59 for deltadata in cgunpacker.deltaiter():
59 for deltadata in cgunpacker.deltaiter():
60 node, p1, p2, cs, deltabase, delta, flags = deltadata
60 node, p1, p2, cs, deltabase, delta, flags = deltadata
61
61
62 size = len(delta)
62 size = len(delta)
63 start = cgunpacker.tell() - size
63 start = cgunpacker.tell() - size
64
64
65 link = linkmapper(cs)
65 link = linkmapper(cs)
66 if node in self.nodemap:
66 if node in self.nodemap:
67 # this can happen if two branches make the same change
67 # this can happen if two branches make the same change
68 self.bundlerevs.add(self.nodemap[node])
68 self.bundlerevs.add(self.nodemap[node])
69 continue
69 continue
70
70
71 for p in (p1, p2):
71 for p in (p1, p2):
72 if p not in self.nodemap:
72 if p not in self.nodemap:
73 raise error.LookupError(p, self.indexfile,
73 raise error.LookupError(p, self.indexfile,
74 _("unknown parent"))
74 _("unknown parent"))
75
75
76 if deltabase not in self.nodemap:
76 if deltabase not in self.nodemap:
77 raise LookupError(deltabase, self.indexfile,
77 raise LookupError(deltabase, self.indexfile,
78 _('unknown delta base'))
78 _('unknown delta base'))
79
79
80 baserev = self.rev(deltabase)
80 baserev = self.rev(deltabase)
81 # start, size, full unc. size, base (unused), link, p1, p2, node
81 # start, size, full unc. size, base (unused), link, p1, p2, node
82 e = (revlog.offset_type(start, flags), size, -1, baserev, link,
82 e = (revlog.offset_type(start, flags), size, -1, baserev, link,
83 self.rev(p1), self.rev(p2), node)
83 self.rev(p1), self.rev(p2), node)
84 self.index.insert(-1, e)
84 self.index.insert(-1, e)
85 self.nodemap[node] = n
85 self.nodemap[node] = n
86 self.bundlerevs.add(n)
86 self.bundlerevs.add(n)
87 n += 1
87 n += 1
88
88
89 def _chunk(self, rev, df=None):
89 def _chunk(self, rev, df=None):
90 # Warning: in case of bundle, the diff is against what we stored as
90 # Warning: in case of bundle, the diff is against what we stored as
91 # delta base, not against rev - 1
91 # delta base, not against rev - 1
92 # XXX: could use some caching
92 # XXX: could use some caching
93 if rev <= self.repotiprev:
93 if rev <= self.repotiprev:
94 return revlog.revlog._chunk(self, rev)
94 return revlog.revlog._chunk(self, rev)
95 self.bundle.seek(self.start(rev))
95 self.bundle.seek(self.start(rev))
96 return self.bundle.read(self.length(rev))
96 return self.bundle.read(self.length(rev))
97
97
98 def revdiff(self, rev1, rev2):
98 def revdiff(self, rev1, rev2):
99 """return or calculate a delta between two revisions"""
99 """return or calculate a delta between two revisions"""
100 if rev1 > self.repotiprev and rev2 > self.repotiprev:
100 if rev1 > self.repotiprev and rev2 > self.repotiprev:
101 # hot path for bundle
101 # hot path for bundle
102 revb = self.index[rev2][3]
102 revb = self.index[rev2][3]
103 if revb == rev1:
103 if revb == rev1:
104 return self._chunk(rev2)
104 return self._chunk(rev2)
105 elif rev1 <= self.repotiprev and rev2 <= self.repotiprev:
105 elif rev1 <= self.repotiprev and rev2 <= self.repotiprev:
106 return revlog.revlog.revdiff(self, rev1, rev2)
106 return revlog.revlog.revdiff(self, rev1, rev2)
107
107
108 return mdiff.textdiff(self.revision(rev1, raw=True),
108 return mdiff.textdiff(self.revision(rev1, raw=True),
109 self.revision(rev2, raw=True))
109 self.revision(rev2, raw=True))
110
110
111 def revision(self, nodeorrev, _df=None, raw=False):
111 def revision(self, nodeorrev, _df=None, raw=False):
112 """return an uncompressed revision of a given node or revision
112 """return an uncompressed revision of a given node or revision
113 number.
113 number.
114 """
114 """
115 if isinstance(nodeorrev, int):
115 if isinstance(nodeorrev, int):
116 rev = nodeorrev
116 rev = nodeorrev
117 node = self.node(rev)
117 node = self.node(rev)
118 else:
118 else:
119 node = nodeorrev
119 node = nodeorrev
120 rev = self.rev(node)
120 rev = self.rev(node)
121
121
122 if node == nullid:
122 if node == nullid:
123 return ""
123 return ""
124
124
125 rawtext = None
125 rawtext = None
126 chain = []
126 chain = []
127 iterrev = rev
127 iterrev = rev
128 # reconstruct the revision if it is from a changegroup
128 # reconstruct the revision if it is from a changegroup
129 while iterrev > self.repotiprev:
129 while iterrev > self.repotiprev:
130 if self._cache and self._cache[1] == iterrev:
130 if self._cache and self._cache[1] == iterrev:
131 rawtext = self._cache[2]
131 rawtext = self._cache[2]
132 break
132 break
133 chain.append(iterrev)
133 chain.append(iterrev)
134 iterrev = self.index[iterrev][3]
134 iterrev = self.index[iterrev][3]
135 if rawtext is None:
135 if rawtext is None:
136 rawtext = self.baserevision(iterrev)
136 rawtext = self.baserevision(iterrev)
137
137
138 while chain:
138 while chain:
139 delta = self._chunk(chain.pop())
139 delta = self._chunk(chain.pop())
140 rawtext = mdiff.patches(rawtext, [delta])
140 rawtext = mdiff.patches(rawtext, [delta])
141
141
142 text, validatehash = self._processflags(rawtext, self.flags(rev),
142 text, validatehash = self._processflags(rawtext, self.flags(rev),
143 'read', raw=raw)
143 'read', raw=raw)
144 if validatehash:
144 if validatehash:
145 self.checkhash(text, node, rev=rev)
145 self.checkhash(text, node, rev=rev)
146 self._cache = (node, rev, rawtext)
146 self._cache = (node, rev, rawtext)
147 return text
147 return text
148
148
149 def baserevision(self, nodeorrev):
149 def baserevision(self, nodeorrev):
150 # Revlog subclasses may override 'revision' method to modify format of
150 # Revlog subclasses may override 'revision' method to modify format of
151 # content retrieved from revlog. To use bundlerevlog with such class one
151 # content retrieved from revlog. To use bundlerevlog with such class one
152 # needs to override 'baserevision' and make more specific call here.
152 # needs to override 'baserevision' and make more specific call here.
153 return revlog.revlog.revision(self, nodeorrev, raw=True)
153 return revlog.revlog.revision(self, nodeorrev, raw=True)
154
154
155 def addrevision(self, *args, **kwargs):
155 def addrevision(self, *args, **kwargs):
156 raise NotImplementedError
156 raise NotImplementedError
157
157
158 def addgroup(self, *args, **kwargs):
158 def addgroup(self, *args, **kwargs):
159 raise NotImplementedError
159 raise NotImplementedError
160
160
161 def strip(self, *args, **kwargs):
161 def strip(self, *args, **kwargs):
162 raise NotImplementedError
162 raise NotImplementedError
163
163
164 def checksize(self):
164 def checksize(self):
165 raise NotImplementedError
165 raise NotImplementedError
166
166
167 class bundlechangelog(bundlerevlog, changelog.changelog):
167 class bundlechangelog(bundlerevlog, changelog.changelog):
168 def __init__(self, opener, cgunpacker):
168 def __init__(self, opener, cgunpacker):
169 changelog.changelog.__init__(self, opener)
169 changelog.changelog.__init__(self, opener)
170 linkmapper = lambda x: x
170 linkmapper = lambda x: x
171 bundlerevlog.__init__(self, opener, self.indexfile, cgunpacker,
171 bundlerevlog.__init__(self, opener, self.indexfile, cgunpacker,
172 linkmapper)
172 linkmapper)
173
173
174 def baserevision(self, nodeorrev):
174 def baserevision(self, nodeorrev):
175 # Although changelog doesn't override 'revision' method, some extensions
175 # Although changelog doesn't override 'revision' method, some extensions
176 # may replace this class with another that does. Same story with
176 # may replace this class with another that does. Same story with
177 # manifest and filelog classes.
177 # manifest and filelog classes.
178
178
179 # This bypasses filtering on changelog.node() and rev() because we need
179 # This bypasses filtering on changelog.node() and rev() because we need
180 # revision text of the bundle base even if it is hidden.
180 # revision text of the bundle base even if it is hidden.
181 oldfilter = self.filteredrevs
181 oldfilter = self.filteredrevs
182 try:
182 try:
183 self.filteredrevs = ()
183 self.filteredrevs = ()
184 return changelog.changelog.revision(self, nodeorrev, raw=True)
184 return changelog.changelog.revision(self, nodeorrev, raw=True)
185 finally:
185 finally:
186 self.filteredrevs = oldfilter
186 self.filteredrevs = oldfilter
187
187
188 class bundlemanifest(bundlerevlog, manifest.manifestrevlog):
188 class bundlemanifest(bundlerevlog, manifest.manifestrevlog):
189 def __init__(self, opener, cgunpacker, linkmapper, dirlogstarts=None,
189 def __init__(self, opener, cgunpacker, linkmapper, dirlogstarts=None,
190 dir=''):
190 dir=''):
191 manifest.manifestrevlog.__init__(self, opener, dir=dir)
191 manifest.manifestrevlog.__init__(self, opener, dir=dir)
192 bundlerevlog.__init__(self, opener, self.indexfile, cgunpacker,
192 bundlerevlog.__init__(self, opener, self.indexfile, cgunpacker,
193 linkmapper)
193 linkmapper)
194 if dirlogstarts is None:
194 if dirlogstarts is None:
195 dirlogstarts = {}
195 dirlogstarts = {}
196 if self.bundle.version == "03":
196 if self.bundle.version == "03":
197 dirlogstarts = _getfilestarts(self.bundle)
197 dirlogstarts = _getfilestarts(self.bundle)
198 self._dirlogstarts = dirlogstarts
198 self._dirlogstarts = dirlogstarts
199 self._linkmapper = linkmapper
199 self._linkmapper = linkmapper
200
200
201 def baserevision(self, nodeorrev):
201 def baserevision(self, nodeorrev):
202 node = nodeorrev
202 node = nodeorrev
203 if isinstance(node, int):
203 if isinstance(node, int):
204 node = self.node(node)
204 node = self.node(node)
205
205
206 if node in self.fulltextcache:
206 if node in self.fulltextcache:
207 result = '%s' % self.fulltextcache[node]
207 result = '%s' % self.fulltextcache[node]
208 else:
208 else:
209 result = manifest.manifestrevlog.revision(self, nodeorrev, raw=True)
209 result = manifest.manifestrevlog.revision(self, nodeorrev, raw=True)
210 return result
210 return result
211
211
212 def dirlog(self, d):
212 def dirlog(self, d):
213 if d in self._dirlogstarts:
213 if d in self._dirlogstarts:
214 self.bundle.seek(self._dirlogstarts[d])
214 self.bundle.seek(self._dirlogstarts[d])
215 return bundlemanifest(
215 return bundlemanifest(
216 self.opener, self.bundle, self._linkmapper,
216 self.opener, self.bundle, self._linkmapper,
217 self._dirlogstarts, dir=d)
217 self._dirlogstarts, dir=d)
218 return super(bundlemanifest, self).dirlog(d)
218 return super(bundlemanifest, self).dirlog(d)
219
219
220 class bundlefilelog(bundlerevlog, filelog.filelog):
220 class bundlefilelog(bundlerevlog, filelog.filelog):
221 def __init__(self, opener, path, cgunpacker, linkmapper):
221 def __init__(self, opener, path, cgunpacker, linkmapper):
222 filelog.filelog.__init__(self, opener, path)
222 filelog.filelog.__init__(self, opener, path)
223 bundlerevlog.__init__(self, opener, self.indexfile, cgunpacker,
223 bundlerevlog.__init__(self, opener, self.indexfile, cgunpacker,
224 linkmapper)
224 linkmapper)
225
225
226 def baserevision(self, nodeorrev):
226 def baserevision(self, nodeorrev):
227 return filelog.filelog.revision(self, nodeorrev, raw=True)
227 return filelog.filelog.revision(self, nodeorrev, raw=True)
228
228
229 class bundlepeer(localrepo.localpeer):
229 class bundlepeer(localrepo.localpeer):
230 def canpush(self):
230 def canpush(self):
231 return False
231 return False
232
232
233 class bundlephasecache(phases.phasecache):
233 class bundlephasecache(phases.phasecache):
234 def __init__(self, *args, **kwargs):
234 def __init__(self, *args, **kwargs):
235 super(bundlephasecache, self).__init__(*args, **kwargs)
235 super(bundlephasecache, self).__init__(*args, **kwargs)
236 if util.safehasattr(self, 'opener'):
236 if util.safehasattr(self, 'opener'):
237 self.opener = vfsmod.readonlyvfs(self.opener)
237 self.opener = vfsmod.readonlyvfs(self.opener)
238
238
239 def write(self):
239 def write(self):
240 raise NotImplementedError
240 raise NotImplementedError
241
241
242 def _write(self, fp):
242 def _write(self, fp):
243 raise NotImplementedError
243 raise NotImplementedError
244
244
245 def _updateroots(self, phase, newroots, tr):
245 def _updateroots(self, phase, newroots, tr):
246 self.phaseroots[phase] = newroots
246 self.phaseroots[phase] = newroots
247 self.invalidate()
247 self.invalidate()
248 self.dirty = True
248 self.dirty = True
249
249
250 def _getfilestarts(cgunpacker):
250 def _getfilestarts(cgunpacker):
251 filespos = {}
251 filespos = {}
252 for chunkdata in iter(cgunpacker.filelogheader, {}):
252 for chunkdata in iter(cgunpacker.filelogheader, {}):
253 fname = chunkdata['filename']
253 fname = chunkdata['filename']
254 filespos[fname] = cgunpacker.tell()
254 filespos[fname] = cgunpacker.tell()
255 for chunk in iter(lambda: cgunpacker.deltachunk(None), {}):
255 for chunk in iter(lambda: cgunpacker.deltachunk(None), {}):
256 pass
256 pass
257 return filespos
257 return filespos
258
258
259 class bundlerepository(localrepo.localrepository):
259 class bundlerepository(localrepo.localrepository):
260 """A repository instance that is a union of a local repo and a bundle.
260 """A repository instance that is a union of a local repo and a bundle.
261
261
262 Instances represent a read-only repository composed of a local repository
262 Instances represent a read-only repository composed of a local repository
263 with the contents of a bundle file applied. The repository instance is
263 with the contents of a bundle file applied. The repository instance is
264 conceptually similar to the state of a repository after an
264 conceptually similar to the state of a repository after an
265 ``hg unbundle`` operation. However, the contents of the bundle are never
265 ``hg unbundle`` operation. However, the contents of the bundle are never
266 applied to the actual base repository.
266 applied to the actual base repository.
267 """
267 """
268 def __init__(self, ui, repopath, bundlepath):
268 def __init__(self, ui, repopath, bundlepath):
269 self._tempparent = None
269 self._tempparent = None
270 try:
270 try:
271 localrepo.localrepository.__init__(self, ui, repopath)
271 localrepo.localrepository.__init__(self, ui, repopath)
272 except error.RepoError:
272 except error.RepoError:
273 self._tempparent = tempfile.mkdtemp()
273 self._tempparent = tempfile.mkdtemp()
274 localrepo.instance(ui, self._tempparent, 1)
274 localrepo.instance(ui, self._tempparent, 1)
275 localrepo.localrepository.__init__(self, ui, self._tempparent)
275 localrepo.localrepository.__init__(self, ui, self._tempparent)
276 self.ui.setconfig('phases', 'publish', False, 'bundlerepo')
276 self.ui.setconfig('phases', 'publish', False, 'bundlerepo')
277
277
278 if repopath:
278 if repopath:
279 self._url = 'bundle:' + util.expandpath(repopath) + '+' + bundlepath
279 self._url = 'bundle:' + util.expandpath(repopath) + '+' + bundlepath
280 else:
280 else:
281 self._url = 'bundle:' + bundlepath
281 self._url = 'bundle:' + bundlepath
282
282
283 self.tempfile = None
283 self.tempfile = None
284 f = util.posixfile(bundlepath, "rb")
284 f = util.posixfile(bundlepath, "rb")
285 bundle = exchange.readbundle(ui, f, bundlepath)
285 bundle = exchange.readbundle(ui, f, bundlepath)
286
286
287 if isinstance(bundle, bundle2.unbundle20):
287 if isinstance(bundle, bundle2.unbundle20):
288 self._bundlefile = bundle
288 self._bundlefile = bundle
289 self._cgunpacker = None
289 self._cgunpacker = None
290
290
291 hadchangegroup = False
291 cgpart = None
292 for part in bundle.iterparts():
292 for part in bundle.iterparts():
293 if part.type == 'changegroup':
293 if part.type == 'changegroup':
294 if hadchangegroup:
294 if cgpart:
295 raise NotImplementedError("can't process "
295 raise NotImplementedError("can't process "
296 "multiple changegroups")
296 "multiple changegroups")
297 hadchangegroup = True
297 cgpart = part
298
298
299 self._handlebundle2part(bundle, part)
299 self._handlebundle2part(bundle, part)
300
300
301 if not hadchangegroup:
301 if not cgpart:
302 raise error.Abort(_("No changegroups found"))
302 raise error.Abort(_("No changegroups found"))
303
304 # This is required to placate a later consumer, which expects
305 # the payload offset to be at the beginning of the changegroup.
306 # We need to do this after the iterparts() generator advances
307 # because iterparts() will seek to end of payload after the
308 # generator returns control to iterparts().
309 cgpart.seek(0, os.SEEK_SET)
310
303 elif isinstance(bundle, changegroup.cg1unpacker):
311 elif isinstance(bundle, changegroup.cg1unpacker):
304 if bundle.compressed():
312 if bundle.compressed():
305 f = self._writetempbundle(bundle.read, '.hg10un',
313 f = self._writetempbundle(bundle.read, '.hg10un',
306 header='HG10UN')
314 header='HG10UN')
307 bundle = exchange.readbundle(ui, f, bundlepath, self.vfs)
315 bundle = exchange.readbundle(ui, f, bundlepath, self.vfs)
308
316
309 self._bundlefile = bundle
317 self._bundlefile = bundle
310 self._cgunpacker = bundle
318 self._cgunpacker = bundle
311 else:
319 else:
312 raise error.Abort(_('bundle type %s cannot be read') %
320 raise error.Abort(_('bundle type %s cannot be read') %
313 type(bundle))
321 type(bundle))
314
322
315 # dict with the mapping 'filename' -> position in the changegroup.
323 # dict with the mapping 'filename' -> position in the changegroup.
316 self._cgfilespos = {}
324 self._cgfilespos = {}
317
325
318 self.firstnewrev = self.changelog.repotiprev + 1
326 self.firstnewrev = self.changelog.repotiprev + 1
319 phases.retractboundary(self, None, phases.draft,
327 phases.retractboundary(self, None, phases.draft,
320 [ctx.node() for ctx in self[self.firstnewrev:]])
328 [ctx.node() for ctx in self[self.firstnewrev:]])
321
329
322 def _handlebundle2part(self, bundle, part):
330 def _handlebundle2part(self, bundle, part):
323 if part.type != 'changegroup':
331 if part.type != 'changegroup':
324 return
332 return
325
333
326 cgstream = part
334 cgstream = part
327 version = part.params.get('version', '01')
335 version = part.params.get('version', '01')
328 legalcgvers = changegroup.supportedincomingversions(self)
336 legalcgvers = changegroup.supportedincomingversions(self)
329 if version not in legalcgvers:
337 if version not in legalcgvers:
330 msg = _('Unsupported changegroup version: %s')
338 msg = _('Unsupported changegroup version: %s')
331 raise error.Abort(msg % version)
339 raise error.Abort(msg % version)
332 if bundle.compressed():
340 if bundle.compressed():
333 cgstream = self._writetempbundle(part.read, '.cg%sun' % version)
341 cgstream = self._writetempbundle(part.read, '.cg%sun' % version)
334
342
335 self._cgunpacker = changegroup.getunbundler(version, cgstream, 'UN')
343 self._cgunpacker = changegroup.getunbundler(version, cgstream, 'UN')
336
344
337 def _writetempbundle(self, readfn, suffix, header=''):
345 def _writetempbundle(self, readfn, suffix, header=''):
338 """Write a temporary file to disk
346 """Write a temporary file to disk
339 """
347 """
340 fdtemp, temp = self.vfs.mkstemp(prefix="hg-bundle-",
348 fdtemp, temp = self.vfs.mkstemp(prefix="hg-bundle-",
341 suffix=suffix)
349 suffix=suffix)
342 self.tempfile = temp
350 self.tempfile = temp
343
351
344 with os.fdopen(fdtemp, pycompat.sysstr('wb')) as fptemp:
352 with os.fdopen(fdtemp, pycompat.sysstr('wb')) as fptemp:
345 fptemp.write(header)
353 fptemp.write(header)
346 while True:
354 while True:
347 chunk = readfn(2**18)
355 chunk = readfn(2**18)
348 if not chunk:
356 if not chunk:
349 break
357 break
350 fptemp.write(chunk)
358 fptemp.write(chunk)
351
359
352 return self.vfs.open(self.tempfile, mode="rb")
360 return self.vfs.open(self.tempfile, mode="rb")
353
361
354 @localrepo.unfilteredpropertycache
362 @localrepo.unfilteredpropertycache
355 def _phasecache(self):
363 def _phasecache(self):
356 return bundlephasecache(self, self._phasedefaults)
364 return bundlephasecache(self, self._phasedefaults)
357
365
358 @localrepo.unfilteredpropertycache
366 @localrepo.unfilteredpropertycache
359 def changelog(self):
367 def changelog(self):
360 # consume the header if it exists
368 # consume the header if it exists
361 self._cgunpacker.changelogheader()
369 self._cgunpacker.changelogheader()
362 c = bundlechangelog(self.svfs, self._cgunpacker)
370 c = bundlechangelog(self.svfs, self._cgunpacker)
363 self.manstart = self._cgunpacker.tell()
371 self.manstart = self._cgunpacker.tell()
364 return c
372 return c
365
373
366 def _constructmanifest(self):
374 def _constructmanifest(self):
367 self._cgunpacker.seek(self.manstart)
375 self._cgunpacker.seek(self.manstart)
368 # consume the header if it exists
376 # consume the header if it exists
369 self._cgunpacker.manifestheader()
377 self._cgunpacker.manifestheader()
370 linkmapper = self.unfiltered().changelog.rev
378 linkmapper = self.unfiltered().changelog.rev
371 m = bundlemanifest(self.svfs, self._cgunpacker, linkmapper)
379 m = bundlemanifest(self.svfs, self._cgunpacker, linkmapper)
372 self.filestart = self._cgunpacker.tell()
380 self.filestart = self._cgunpacker.tell()
373 return m
381 return m
374
382
375 def _consumemanifest(self):
383 def _consumemanifest(self):
376 """Consumes the manifest portion of the bundle, setting filestart so the
384 """Consumes the manifest portion of the bundle, setting filestart so the
377 file portion can be read."""
385 file portion can be read."""
378 self._cgunpacker.seek(self.manstart)
386 self._cgunpacker.seek(self.manstart)
379 self._cgunpacker.manifestheader()
387 self._cgunpacker.manifestheader()
380 for delta in self._cgunpacker.deltaiter():
388 for delta in self._cgunpacker.deltaiter():
381 pass
389 pass
382 self.filestart = self._cgunpacker.tell()
390 self.filestart = self._cgunpacker.tell()
383
391
384 @localrepo.unfilteredpropertycache
392 @localrepo.unfilteredpropertycache
385 def manstart(self):
393 def manstart(self):
386 self.changelog
394 self.changelog
387 return self.manstart
395 return self.manstart
388
396
389 @localrepo.unfilteredpropertycache
397 @localrepo.unfilteredpropertycache
390 def filestart(self):
398 def filestart(self):
391 self.manifestlog
399 self.manifestlog
392
400
393 # If filestart was not set by self.manifestlog, that means the
401 # If filestart was not set by self.manifestlog, that means the
394 # manifestlog implementation did not consume the manifests from the
402 # manifestlog implementation did not consume the manifests from the
395 # changegroup (ex: it might be consuming trees from a separate bundle2
403 # changegroup (ex: it might be consuming trees from a separate bundle2
396 # part instead). So we need to manually consume it.
404 # part instead). So we need to manually consume it.
397 if 'filestart' not in self.__dict__:
405 if 'filestart' not in self.__dict__:
398 self._consumemanifest()
406 self._consumemanifest()
399
407
400 return self.filestart
408 return self.filestart
401
409
402 def url(self):
410 def url(self):
403 return self._url
411 return self._url
404
412
405 def file(self, f):
413 def file(self, f):
406 if not self._cgfilespos:
414 if not self._cgfilespos:
407 self._cgunpacker.seek(self.filestart)
415 self._cgunpacker.seek(self.filestart)
408 self._cgfilespos = _getfilestarts(self._cgunpacker)
416 self._cgfilespos = _getfilestarts(self._cgunpacker)
409
417
410 if f in self._cgfilespos:
418 if f in self._cgfilespos:
411 self._cgunpacker.seek(self._cgfilespos[f])
419 self._cgunpacker.seek(self._cgfilespos[f])
412 linkmapper = self.unfiltered().changelog.rev
420 linkmapper = self.unfiltered().changelog.rev
413 return bundlefilelog(self.svfs, f, self._cgunpacker, linkmapper)
421 return bundlefilelog(self.svfs, f, self._cgunpacker, linkmapper)
414 else:
422 else:
415 return filelog.filelog(self.svfs, f)
423 return filelog.filelog(self.svfs, f)
416
424
417 def close(self):
425 def close(self):
418 """Close assigned bundle file immediately."""
426 """Close assigned bundle file immediately."""
419 self._bundlefile.close()
427 self._bundlefile.close()
420 if self.tempfile is not None:
428 if self.tempfile is not None:
421 self.vfs.unlink(self.tempfile)
429 self.vfs.unlink(self.tempfile)
422 if self._tempparent:
430 if self._tempparent:
423 shutil.rmtree(self._tempparent, True)
431 shutil.rmtree(self._tempparent, True)
424
432
425 def cancopy(self):
433 def cancopy(self):
426 return False
434 return False
427
435
428 def peer(self):
436 def peer(self):
429 return bundlepeer(self)
437 return bundlepeer(self)
430
438
431 def getcwd(self):
439 def getcwd(self):
432 return pycompat.getcwd() # always outside the repo
440 return pycompat.getcwd() # always outside the repo
433
441
434 # Check if parents exist in localrepo before setting
442 # Check if parents exist in localrepo before setting
435 def setparents(self, p1, p2=nullid):
443 def setparents(self, p1, p2=nullid):
436 p1rev = self.changelog.rev(p1)
444 p1rev = self.changelog.rev(p1)
437 p2rev = self.changelog.rev(p2)
445 p2rev = self.changelog.rev(p2)
438 msg = _("setting parent to node %s that only exists in the bundle\n")
446 msg = _("setting parent to node %s that only exists in the bundle\n")
439 if self.changelog.repotiprev < p1rev:
447 if self.changelog.repotiprev < p1rev:
440 self.ui.warn(msg % nodemod.hex(p1))
448 self.ui.warn(msg % nodemod.hex(p1))
441 if self.changelog.repotiprev < p2rev:
449 if self.changelog.repotiprev < p2rev:
442 self.ui.warn(msg % nodemod.hex(p2))
450 self.ui.warn(msg % nodemod.hex(p2))
443 return super(bundlerepository, self).setparents(p1, p2)
451 return super(bundlerepository, self).setparents(p1, p2)
444
452
445 def instance(ui, path, create):
453 def instance(ui, path, create):
446 if create:
454 if create:
447 raise error.Abort(_('cannot create new bundle repository'))
455 raise error.Abort(_('cannot create new bundle repository'))
448 # internal config: bundle.mainreporoot
456 # internal config: bundle.mainreporoot
449 parentpath = ui.config("bundle", "mainreporoot")
457 parentpath = ui.config("bundle", "mainreporoot")
450 if not parentpath:
458 if not parentpath:
451 # try to find the correct path to the working directory repo
459 # try to find the correct path to the working directory repo
452 parentpath = cmdutil.findrepo(pycompat.getcwd())
460 parentpath = cmdutil.findrepo(pycompat.getcwd())
453 if parentpath is None:
461 if parentpath is None:
454 parentpath = ''
462 parentpath = ''
455 if parentpath:
463 if parentpath:
456 # Try to make the full path relative so we get a nice, short URL.
464 # Try to make the full path relative so we get a nice, short URL.
457 # In particular, we don't want temp dir names in test outputs.
465 # In particular, we don't want temp dir names in test outputs.
458 cwd = pycompat.getcwd()
466 cwd = pycompat.getcwd()
459 if parentpath == cwd:
467 if parentpath == cwd:
460 parentpath = ''
468 parentpath = ''
461 else:
469 else:
462 cwd = pathutil.normasprefix(cwd)
470 cwd = pathutil.normasprefix(cwd)
463 if parentpath.startswith(cwd):
471 if parentpath.startswith(cwd):
464 parentpath = parentpath[len(cwd):]
472 parentpath = parentpath[len(cwd):]
465 u = util.url(path)
473 u = util.url(path)
466 path = u.localpath()
474 path = u.localpath()
467 if u.scheme == 'bundle':
475 if u.scheme == 'bundle':
468 s = path.split("+", 1)
476 s = path.split("+", 1)
469 if len(s) == 1:
477 if len(s) == 1:
470 repopath, bundlename = parentpath, s[0]
478 repopath, bundlename = parentpath, s[0]
471 else:
479 else:
472 repopath, bundlename = s
480 repopath, bundlename = s
473 else:
481 else:
474 repopath, bundlename = parentpath, path
482 repopath, bundlename = parentpath, path
475 return bundlerepository(ui, repopath, bundlename)
483 return bundlerepository(ui, repopath, bundlename)
476
484
477 class bundletransactionmanager(object):
485 class bundletransactionmanager(object):
478 def transaction(self):
486 def transaction(self):
479 return None
487 return None
480
488
481 def close(self):
489 def close(self):
482 raise NotImplementedError
490 raise NotImplementedError
483
491
484 def release(self):
492 def release(self):
485 raise NotImplementedError
493 raise NotImplementedError
486
494
487 def getremotechanges(ui, repo, other, onlyheads=None, bundlename=None,
495 def getremotechanges(ui, repo, other, onlyheads=None, bundlename=None,
488 force=False):
496 force=False):
489 '''obtains a bundle of changes incoming from other
497 '''obtains a bundle of changes incoming from other
490
498
491 "onlyheads" restricts the returned changes to those reachable from the
499 "onlyheads" restricts the returned changes to those reachable from the
492 specified heads.
500 specified heads.
493 "bundlename", if given, stores the bundle to this file path permanently;
501 "bundlename", if given, stores the bundle to this file path permanently;
494 otherwise it's stored to a temp file and gets deleted again when you call
502 otherwise it's stored to a temp file and gets deleted again when you call
495 the returned "cleanupfn".
503 the returned "cleanupfn".
496 "force" indicates whether to proceed on unrelated repos.
504 "force" indicates whether to proceed on unrelated repos.
497
505
498 Returns a tuple (local, csets, cleanupfn):
506 Returns a tuple (local, csets, cleanupfn):
499
507
500 "local" is a local repo from which to obtain the actual incoming
508 "local" is a local repo from which to obtain the actual incoming
501 changesets; it is a bundlerepo for the obtained bundle when the
509 changesets; it is a bundlerepo for the obtained bundle when the
502 original "other" is remote.
510 original "other" is remote.
503 "csets" lists the incoming changeset node ids.
511 "csets" lists the incoming changeset node ids.
504 "cleanupfn" must be called without arguments when you're done processing
512 "cleanupfn" must be called without arguments when you're done processing
505 the changes; it closes both the original "other" and the one returned
513 the changes; it closes both the original "other" and the one returned
506 here.
514 here.
507 '''
515 '''
508 tmp = discovery.findcommonincoming(repo, other, heads=onlyheads,
516 tmp = discovery.findcommonincoming(repo, other, heads=onlyheads,
509 force=force)
517 force=force)
510 common, incoming, rheads = tmp
518 common, incoming, rheads = tmp
511 if not incoming:
519 if not incoming:
512 try:
520 try:
513 if bundlename:
521 if bundlename:
514 os.unlink(bundlename)
522 os.unlink(bundlename)
515 except OSError:
523 except OSError:
516 pass
524 pass
517 return repo, [], other.close
525 return repo, [], other.close
518
526
519 commonset = set(common)
527 commonset = set(common)
520 rheads = [x for x in rheads if x not in commonset]
528 rheads = [x for x in rheads if x not in commonset]
521
529
522 bundle = None
530 bundle = None
523 bundlerepo = None
531 bundlerepo = None
524 localrepo = other.local()
532 localrepo = other.local()
525 if bundlename or not localrepo:
533 if bundlename or not localrepo:
526 # create a bundle (uncompressed if other repo is not local)
534 # create a bundle (uncompressed if other repo is not local)
527
535
528 # developer config: devel.legacy.exchange
536 # developer config: devel.legacy.exchange
529 legexc = ui.configlist('devel', 'legacy.exchange')
537 legexc = ui.configlist('devel', 'legacy.exchange')
530 forcebundle1 = 'bundle2' not in legexc and 'bundle1' in legexc
538 forcebundle1 = 'bundle2' not in legexc and 'bundle1' in legexc
531 canbundle2 = (not forcebundle1
539 canbundle2 = (not forcebundle1
532 and other.capable('getbundle')
540 and other.capable('getbundle')
533 and other.capable('bundle2'))
541 and other.capable('bundle2'))
534 if canbundle2:
542 if canbundle2:
535 kwargs = {}
543 kwargs = {}
536 kwargs['common'] = common
544 kwargs['common'] = common
537 kwargs['heads'] = rheads
545 kwargs['heads'] = rheads
538 kwargs['bundlecaps'] = exchange.caps20to10(repo)
546 kwargs['bundlecaps'] = exchange.caps20to10(repo)
539 kwargs['cg'] = True
547 kwargs['cg'] = True
540 b2 = other.getbundle('incoming', **kwargs)
548 b2 = other.getbundle('incoming', **kwargs)
541 fname = bundle = changegroup.writechunks(ui, b2._forwardchunks(),
549 fname = bundle = changegroup.writechunks(ui, b2._forwardchunks(),
542 bundlename)
550 bundlename)
543 else:
551 else:
544 if other.capable('getbundle'):
552 if other.capable('getbundle'):
545 cg = other.getbundle('incoming', common=common, heads=rheads)
553 cg = other.getbundle('incoming', common=common, heads=rheads)
546 elif onlyheads is None and not other.capable('changegroupsubset'):
554 elif onlyheads is None and not other.capable('changegroupsubset'):
547 # compat with older servers when pulling all remote heads
555 # compat with older servers when pulling all remote heads
548 cg = other.changegroup(incoming, "incoming")
556 cg = other.changegroup(incoming, "incoming")
549 rheads = None
557 rheads = None
550 else:
558 else:
551 cg = other.changegroupsubset(incoming, rheads, 'incoming')
559 cg = other.changegroupsubset(incoming, rheads, 'incoming')
552 if localrepo:
560 if localrepo:
553 bundletype = "HG10BZ"
561 bundletype = "HG10BZ"
554 else:
562 else:
555 bundletype = "HG10UN"
563 bundletype = "HG10UN"
556 fname = bundle = bundle2.writebundle(ui, cg, bundlename,
564 fname = bundle = bundle2.writebundle(ui, cg, bundlename,
557 bundletype)
565 bundletype)
558 # keep written bundle?
566 # keep written bundle?
559 if bundlename:
567 if bundlename:
560 bundle = None
568 bundle = None
561 if not localrepo:
569 if not localrepo:
562 # use the created uncompressed bundlerepo
570 # use the created uncompressed bundlerepo
563 localrepo = bundlerepo = bundlerepository(repo.baseui, repo.root,
571 localrepo = bundlerepo = bundlerepository(repo.baseui, repo.root,
564 fname)
572 fname)
565 # this repo contains local and other now, so filter out local again
573 # this repo contains local and other now, so filter out local again
566 common = repo.heads()
574 common = repo.heads()
567 if localrepo:
575 if localrepo:
568 # Part of common may be remotely filtered
576 # Part of common may be remotely filtered
569 # So use an unfiltered version
577 # So use an unfiltered version
570 # The discovery process probably need cleanup to avoid that
578 # The discovery process probably need cleanup to avoid that
571 localrepo = localrepo.unfiltered()
579 localrepo = localrepo.unfiltered()
572
580
573 csets = localrepo.changelog.findmissing(common, rheads)
581 csets = localrepo.changelog.findmissing(common, rheads)
574
582
575 if bundlerepo:
583 if bundlerepo:
576 reponodes = [ctx.node() for ctx in bundlerepo[bundlerepo.firstnewrev:]]
584 reponodes = [ctx.node() for ctx in bundlerepo[bundlerepo.firstnewrev:]]
577 remotephases = other.listkeys('phases')
585 remotephases = other.listkeys('phases')
578
586
579 pullop = exchange.pulloperation(bundlerepo, other, heads=reponodes)
587 pullop = exchange.pulloperation(bundlerepo, other, heads=reponodes)
580 pullop.trmanager = bundletransactionmanager()
588 pullop.trmanager = bundletransactionmanager()
581 exchange._pullapplyphases(pullop, remotephases)
589 exchange._pullapplyphases(pullop, remotephases)
582
590
583 def cleanup():
591 def cleanup():
584 if bundlerepo:
592 if bundlerepo:
585 bundlerepo.close()
593 bundlerepo.close()
586 if bundle:
594 if bundle:
587 os.unlink(bundle)
595 os.unlink(bundle)
588 other.close()
596 other.close()
589
597
590 return (localrepo, csets, cleanup)
598 return (localrepo, csets, cleanup)
General Comments 0
You need to be logged in to leave comments. Login now